From 7c1a49d5d17c53351aa38945a6fd112f62524646 Mon Sep 17 00:00:00 2001 From: Chris Whited Date: Fri, 15 May 2026 10:36:42 -1000 Subject: [PATCH 1/3] feat(explain query): add explain analyze query and results to ArrowFlight svc --- packages/amp/src/arrow-flight.ts | 1 + packages/amp/src/arrow-flight/service.ts | 279 +++++++++++------- packages/amp/src/arrow-flight/types.ts | 44 ++- packages/amp/src/internal/explain/parser.ts | 234 +++++++++++++++ .../amp/test/arrow-flight/service.test.ts | 74 ++++- .../amp/test/internal/explain/parser.test.ts | 149 ++++++++++ 6 files changed, 678 insertions(+), 103 deletions(-) create mode 100644 packages/amp/src/internal/explain/parser.ts create mode 100644 packages/amp/test/internal/explain/parser.test.ts diff --git a/packages/amp/src/arrow-flight.ts b/packages/amp/src/arrow-flight.ts index 66305c3..a20df75 100644 --- a/packages/amp/src/arrow-flight.ts +++ b/packages/amp/src/arrow-flight.ts @@ -33,6 +33,7 @@ export { // Types // ============================================================================= +export { ExplainCell, ExplainRow, PlanNode } from "./arrow-flight/types.ts" export { type ExtractQueryResult, type QueryOptions, type QueryResult } from "./arrow-flight/types.ts" // ============================================================================= diff --git a/packages/amp/src/arrow-flight/service.ts b/packages/amp/src/arrow-flight/service.ts index 3f9f5a9..2ee97a0 100644 --- a/packages/amp/src/arrow-flight/service.ts +++ b/packages/amp/src/arrow-flight/service.ts @@ -5,7 +5,7 @@ import * as Cause from "effect/Cause" import * as Context from "effect/Context" import * as Effect from "effect/Effect" import * as Filter from "effect/Filter" -import { identity } from "effect/Function" +import * as Fn from "effect/Function" import * as Layer from "effect/Layer" import * as Option from "effect/Option" import * as Predicate from "effect/Predicate" @@ -23,6 +23,7 @@ import { MessageHeaderType, parseSchema } from "../internal/arrow-flight-ipc/Schema.ts" +import { ExplainResultRow, parsePlan, planToTable } from "../internal/explain/parser.ts" import { FlightDescriptor_DescriptorType, FlightDescriptorSchema, FlightService } from "../protobuf/Flight_pb.ts" import { CommandStatementQuerySchema } from "../protobuf/FlightSql_pb.ts" import { @@ -35,7 +36,7 @@ import { TicketNotFoundError } from "./errors.ts" import { AuthInfoContextKey, Transport } from "./transport.ts" -import type { ExtractQueryResult, QueryOptions, QueryResult } from "./types.ts" +import type { ExplainRow, ExtractQueryResult, QueryOptions } from "./types.ts" // ============================================================================= // Arrow Flight Service @@ -45,44 +46,72 @@ import type { ExtractQueryResult, QueryOptions, QueryResult } from "./types.ts" /** * A service which can be used to execute queries against an Arrow Flight API. */ -export class ArrowFlight extends Context.Service +export class ArrowFlight extends Context.Service< + ArrowFlight, + { + /** + * The Connect `Client` that will be used to execute Arrow Flight queries. + */ + readonly client: Client - /** - * Executes an Arrow Flight SQL query and returns a all results as an array. - */ - readonly query: ( - sql: string, - options?: Options - ) => Effect.Effect>, ArrowFlightError> + /** + * Executes an Arrow Flight SQL query and returns a all results as an array. + */ + readonly query: ( + sql: string, + options?: Options + ) => Effect.Effect< + ReadonlyArray>, + ArrowFlightError + > - /** - * Executes an Arrow Flight SQL query and returns a stream of results. - */ - readonly streamQuery: ( - sql: string, - options?: Options - ) => Stream.Stream, ArrowFlightError> -}>()("Amp/ArrowFlight") {} + /** + * Executes an Arrow Flight SQL query and returns a stream of results. + */ + readonly streamQuery: ( + sql: string, + options?: Options + ) => Stream.Stream, ArrowFlightError> + + /** + * Executes the given SQL with `EXPLAIN` (or `EXPLAIN ANALYZE` when + * `analyze` is true) prepended, parses the returned plan text, and + * returns a normalized table of plan nodes — one row per node, with + * numeric properties and metrics expanded into columns. Durations are + * converted to seconds and their column names suffixed with `_secs`. + */ + readonly explain: ( + sql: string, + options?: { readonly analyze?: boolean | undefined } + ) => Effect.Effect, ArrowFlightError> + } +>()("Amp/ArrowFlight") {} const make = Effect.gen(function*() { const auth = yield* Effect.serviceOption(Auth) const transport = yield* Transport const client = createClient(FlightService, transport) - const decodeRecordBatchMetadata = Schema.decodeEffect(RecordBatchMetadataFromUint8Array) + const decodeRecordBatchMetadata = Schema.decodeEffect( + RecordBatchMetadataFromUint8Array + ) /** * Execute a SQL query and return a stream of rows. */ - const streamQuery = (query: string, options?: QueryOptions) => + const streamQuery = ( + query: string, + options?: Options + ): Stream.Stream, ArrowFlightError> => Effect.gen(function*() { const contextValues = createContextValues() + // A cache failure ("can't read the saved token") is not an Arrow Flight + // error and shouldn't surface as one — fall back to "no cached auth" + // and let the server's auth response (if any) be the source of truth. const authInfo = Option.isSome(auth) - ? yield* auth.value.getCachedAuthInfo + ? yield* auth.value.getCachedAuthInfo.pipe( + Effect.orElseSucceed(() => Option.none()) + ) : Option.none() // Setup the query context with authentication information, if available @@ -103,7 +132,10 @@ const make = Effect.gen(function*() { headers.set("amp-stream", "true") } if (Predicate.isNotUndefined(options?.resumeWatermark)) { - headers.set("amp-resume", blockRangesToResumeWatermark(options.resumeWatermark)) + headers.set( + "amp-resume", + blockRangesToResumeWatermark(options.resumeWatermark) + ) } const flightInfo = yield* Effect.tryPromise({ @@ -123,93 +155,140 @@ const make = Effect.gen(function*() { return yield* new TicketNotFoundError({ query }) } - const flightDataStream = Stream.unwrap(Effect.gen(function*() { - const controller = yield* Effect.acquireRelease( - Effect.sync(() => new AbortController()), - (abort) => Effect.sync(() => abort.abort()) - ) - return Stream.fromAsyncIterable( - client.doGet(ticket, { signal: controller.signal, contextValues }), - (cause) => new RpcError({ cause, method: "doGet" }) - ) - })) + const flightDataStream = Stream.unwrap( + Effect.gen(function*() { + const controller = yield* Effect.acquireRelease( + Effect.sync(() => new AbortController()), + (abort) => Effect.sync(() => abort.abort()) + ) + return Stream.fromAsyncIterable( + client.doGet(ticket, { signal: controller.signal, contextValues }), + (cause) => new RpcError({ cause, method: "doGet" }) + ) + }) + ) let schema: ArrowSchema | undefined const dictionaryRegistry = new DictionaryRegistry() - const dataSchema: Schema.$Array< - Schema.$Record< - typeof Schema.String, - typeof Schema.Unknown - > - > = Schema.Array(options?.schema ?? Schema.Record(Schema.String, Schema.Unknown) as any) + const dataSchema = Schema.Array( + options?.schema ?? Schema.Record(Schema.String, Schema.Unknown) + ) const decodeRecordBatchData = Schema.decodeEffect(dataSchema) + type Result = ExtractQueryResult // Convert FlightData stream to a stream of rows return flightDataStream.pipe( - Stream.mapEffect(Effect.fnUntraced(function*(flightData): Effect.fn.Return< - Option.Option>, - ArrowFlightError - > { - const messageType = yield* Effect.orDie(getMessageType(flightData)) - - switch (messageType) { - case MessageHeaderType.SCHEMA: { - schema = yield* parseSchema(flightData).pipe( - Effect.mapError((cause) => new ParseSchemaError({ cause })) - ) - return Option.none>() - } - case MessageHeaderType.DICTIONARY_BATCH: { - // TODO: figure out what to do (if anything) with dictionary batches - // const dictionaryBatch = yield* parseDictionaryBatch(flightData).pipe( - // Effect.mapError((cause) => new ParseDictionaryBatchError({ cause })) - // ) - // decodeDictionaryBatch(dictionaryBatch, flightData.dataBody, schema!, dictionaryRegistry, readColumnValues) - return Option.none>() - } - case MessageHeaderType.RECORD_BATCH: { - const metadata = yield* decodeRecordBatchMetadata(flightData.appMetadata).pipe( - Effect.mapError((cause) => new ParseRecordBatchError({ cause })) - ) - const recordBatch = yield* parseRecordBatch(flightData).pipe( - Effect.mapError((cause) => new ParseRecordBatchError({ cause })) - ) - const decodedRecordBatch = decodeRecordBatch(recordBatch, flightData.dataBody, schema!) - const jsonOptions: RecordBatchToJsonOptions = { dictionaryRegistry } - if (options?.bigIntHandling !== undefined) jsonOptions.bigIntHandling = options.bigIntHandling - if (options?.binaryHandling !== undefined) jsonOptions.binaryHandling = options.binaryHandling - if (options?.dateHandling !== undefined) jsonOptions.dateHandling = options.dateHandling - if (options?.includeNulls !== undefined) jsonOptions.includeNulls = options.includeNulls - - const json = recordBatchToJson(decodedRecordBatch, jsonOptions) - const data = yield* decodeRecordBatchData(json).pipe( - Effect.mapError((cause) => new ParseRecordBatchError({ cause })) - ) - return Option.some({ data, metadata }) + Stream.mapEffect( + Effect.fnUntraced(function*(flightData): Effect.fn.Return< + Option.Option, + ArrowFlightError + > { + const messageType = yield* Effect.orDie(getMessageType(flightData)) + + switch (messageType) { + case MessageHeaderType.SCHEMA: { + schema = yield* parseSchema(flightData).pipe( + Effect.mapError((cause) => new ParseSchemaError({ cause })) + ) + return Option.none() + } + case MessageHeaderType.DICTIONARY_BATCH: { + // TODO: figure out what to do (if anything) with dictionary batches + // const dictionaryBatch = yield* parseDictionaryBatch(flightData).pipe( + // Effect.mapError((cause) => new ParseDictionaryBatchError({ cause })) + // ) + // decodeDictionaryBatch(dictionaryBatch, flightData.dataBody, schema!, dictionaryRegistry, readColumnValues) + return Option.none() + } + case MessageHeaderType.RECORD_BATCH: { + const metadata = yield* decodeRecordBatchMetadata( + flightData.appMetadata + ).pipe( + Effect.mapError( + (cause) => new ParseRecordBatchError({ cause }) + ) + ) + const recordBatch = yield* parseRecordBatch(flightData).pipe( + Effect.mapError( + (cause) => new ParseRecordBatchError({ cause }) + ) + ) + const decodedRecordBatch = decodeRecordBatch( + recordBatch, + flightData.dataBody, + schema! + ) + const jsonOptions: RecordBatchToJsonOptions = { + dictionaryRegistry + } + if (options?.bigIntHandling !== undefined) { + jsonOptions.bigIntHandling = options.bigIntHandling + } + if (options?.binaryHandling !== undefined) { + jsonOptions.binaryHandling = options.binaryHandling + } + if (options?.dateHandling !== undefined) { + jsonOptions.dateHandling = options.dateHandling + } + if (options?.includeNulls !== undefined) { + jsonOptions.includeNulls = options.includeNulls + } + + const json = recordBatchToJson(decodedRecordBatch, jsonOptions) + const data = yield* decodeRecordBatchData(json).pipe( + Effect.mapError( + (cause) => new ParseRecordBatchError({ cause }) + ) + ) + return Option.some({ data, metadata } as Result) + } } - } - return yield* Effect.die(new Cause.IllegalArgumentError(`Invalid message type received: ${messageType}`)) - })), - Stream.filterMap(Filter.fromPredicateOption(identity)) + return yield* Effect.die( + new Cause.IllegalArgumentError( + `Invalid message type received: ${messageType}` + ) + ) + }) + ), + Stream.filterMap(Filter.fromPredicateOption(Fn.identity)) ) - }).pipe( - Stream.unwrap, - Stream.withSpan("ArrowFlight.stream") - ) as any - - const query = Effect.fn("ArrowFlight.query")( - function*(sql: string, options?: QueryOptions) { - const chunk = yield* Stream.runCollect(streamQuery(sql, options)) - return Array.from(chunk) - } - ) as any + }).pipe(Stream.unwrap, Stream.withSpan("ArrowFlight.stream")) + + const query = ( + sql: string, + options?: Options + ): Effect.Effect< + ReadonlyArray>, + ArrowFlightError + > => + Stream.runCollect(streamQuery(sql, options)).pipe( + Effect.map((chunk) => Array.from(chunk)), + Effect.withSpan("ArrowFlight.query") + ) + + const explain = Effect.fn("ArrowFlight.explain")(function*( + sql: string, + options?: { readonly analyze?: boolean | undefined } + ) { + const prefix = options?.analyze ? "EXPLAIN ANALYZE " : "EXPLAIN " + const head = yield* streamQuery(prefix + sql, { + schema: ExplainResultRow + }).pipe(Stream.runHead) + const planText = head.pipe( + Option.flatMap((result) => Option.fromNullishOr(result.data[0])), + Option.map((row) => row.plan), + Option.getOrElse(() => "") + ) + return planToTable(parsePlan(planText)) + }) return { client, query, - streamQuery + streamQuery, + explain } as const }) @@ -229,7 +308,9 @@ export const layer: Layer.Layer = Laye * @param ranges - The block ranges to convert. * @returns A resume watermark string. */ -const blockRangesToResumeWatermark = (ranges: ReadonlyArray): string => { +const blockRangesToResumeWatermark = ( + ranges: ReadonlyArray +): string => { const watermarks: Record = {} for (const range of ranges) { watermarks[range.network] = { diff --git a/packages/amp/src/arrow-flight/types.ts b/packages/amp/src/arrow-flight/types.ts index a22239c..4eff98b 100644 --- a/packages/amp/src/arrow-flight/types.ts +++ b/packages/amp/src/arrow-flight/types.ts @@ -1,5 +1,5 @@ -import type * as Schema from "effect/Schema" -import type { BlockRange, RecordBatchMetadata } from "../core/domain.ts" +import * as Schema from "effect/Schema" +import { type BlockRange, NonNegativeInt, type RecordBatchMetadata } from "../core/domain.ts" // ============================================================================= // Types @@ -19,7 +19,7 @@ export interface QueryResult { * the query is executed. */ export interface QueryOptions { - readonly schema?: Schema.Any | undefined + readonly schema?: Schema.Codec | undefined /** * Controls how BigInt values are represented in decoded query output. * @default "string" @@ -58,3 +58,41 @@ export type ExtractQueryResult = Options extends { readonly schema: Schema.Top } ? QueryResult> : QueryResult> + +// ============================================================================= +// Explain +// ============================================================================= + +/** + * A single normalized cell value in an `ExplainRow`. EXPLAIN output values are + * either parsed numbers, raw strings (when no parser matches), or `null` (when + * the metric is `N/A`). + */ +export const ExplainCell = Schema.NullOr( + Schema.Union([Schema.String, Schema.Number]) +).annotate({ identifier: "ExplainCell" }) +export type ExplainCell = typeof ExplainCell.Type + +/** + * A single row in the table returned by `ArrowFlight.explain` — one row per + * plan node. Always contains `node` (operator name) and `depth` (tree level); + * additional columns are derived per-node from numeric properties and from + * expanded metrics (durations renamed with a `_secs` suffix, `total → matched` + * split into `_total` / `_matched`, etc.). + */ +export const ExplainRow = Schema.Record(Schema.String, ExplainCell) + .annotate({ identifier: "ExplainRow" }) +export type ExplainRow = typeof ExplainRow.Type + +/** + * The intermediate parsed shape of a single plan node, before normalization + * into the table form. `properties` and `metrics` are kept as raw strings here + * so callers that need the unprocessed values can access them. + */ +export const PlanNode = Schema.Struct({ + name: Schema.String, + depth: NonNegativeInt, + properties: Schema.Record(Schema.String, Schema.String), + metrics: Schema.Record(Schema.String, Schema.String) +}).annotate({ identifier: "PlanNode" }) +export type PlanNode = typeof PlanNode.Type diff --git a/packages/amp/src/internal/explain/parser.ts b/packages/amp/src/internal/explain/parser.ts new file mode 100644 index 0000000..7f18ffa --- /dev/null +++ b/packages/amp/src/internal/explain/parser.ts @@ -0,0 +1,234 @@ +import * as Schema from "effect/Schema" +import { type ExplainCell, type ExplainRow, PlanNode } from "../../arrow-flight/types.ts" + +// ============================================================================= +// Internal Schemas +// ============================================================================= + +/** + * The wire shape of a single row returned by the Amp Arrow Flight backend + * when an `EXPLAIN` / `EXPLAIN ANALYZE` query is executed. + */ +export const ExplainResultRow = Schema.Struct({ + plan_type: Schema.String, + plan: Schema.String +}).annotate({ identifier: "Amp/ArrowFlight/Explain/ResultRow" }) +export type ExplainResultRow = typeof ExplainResultRow.Type + +// ============================================================================= +// Constants +// ============================================================================= + +const INDENT_WIDTH = 2 + +const METRICS_RE = /,\s*metrics=\[(.+)\]$/ +const DURATION_RE = /^([\d.]+(?:e[+-]?\d+)?)\s*(ns|µs|us|ms|s)$/ +const NUMBER_UNIT_RE = /^([\d.]+(?:e[+-]?\d+)?)\s*(KB|MB|GB|TB|K|M|G)$/i +const TOTAL_MATCHED_RE = + /^([\d.]+(?:e[+-]?\d+)?\s*(?:KB|MB|GB|TB|K|M|G)?)\s+total\s*→\s*([\d.]+(?:e[+-]?\d+)?\s*(?:KB|MB|GB|TB|K|M|G)?)\s+matched$/ +const PCT_RATIO_RE = /^([\d.]+(?:e[+-]?\d+)?)%\s*\([^)]*\)$/ +const LEADING_NUMBER_RE = /^\{?\s*([\d.]+(?:e[+-]?\d+)?)\s*[KMG]?\b/ + +const DURATION_UNITS: Record = { + ns: 1e-9, + µs: 1e-6, + us: 1e-6, + ms: 1e-3, + s: 1 +} + +const MULTIPLIER_UNITS: Record = { + K: 1e3, + M: 1e6, + G: 1e9, + KB: 1e3, + MB: 1e6, + GB: 1e9, + TB: 1e12 +} + +// ============================================================================= +// Plan Parsing +// ============================================================================= + +/** + * Parse a DataFusion EXPLAIN / EXPLAIN ANALYZE plan text into a list of + * `PlanNode` entries. Indentation determines tree depth (2 spaces per level). + */ +export const parsePlan = (planText: string): ReadonlyArray => { + const text = planText.trim() + if (text.length === 0) return [] + + const nodes: Array = [] + for (const line of text.split("\n")) { + const stripped = line.replace(/^[ ]+/, "") + const depth = (line.length - stripped.length) / INDENT_WIDTH + + const match = METRICS_RE.exec(stripped) + let nodePart: string + let metrics: Record + if (match !== null) { + nodePart = stripped.slice(0, match.index) + metrics = parseMetrics(match[1]!) + } else { + nodePart = stripped + metrics = {} + } + + const { name, properties } = splitNameAndProperties(nodePart) + nodes.push(PlanNode.make({ name, depth, properties, metrics })) + } + return nodes +} + +/** + * Convert a list of `PlanNode`s into a flat table of rows (one row per node) + * with normalized units. Mirrors the Python `ExplainResult.to_dataframe()` + * shape: numeric properties only, expanded metrics, durations in seconds with + * `_secs` suffix, `total → matched` split into two columns. + */ +export const planToTable = ( + nodes: ReadonlyArray +): ReadonlyArray => { + const rows: Array> = [] + for (const node of nodes) { + const row: Record = { + node: node.name, + depth: node.depth + } + for (const key in node.properties) { + const num = extractNumeric(node.properties[key]!) + if (num !== null) row[key] = num + } + for (const key in node.metrics) { + expandMetric(key, node.metrics[key]!, row) + } + rows.push(row) + } + return rows +} + +// ============================================================================= +// Internal Helpers +// ============================================================================= + +const splitNameAndProperties = ( + nodePart: string +): { readonly name: string; readonly properties: Record } => { + const colon = nodePart.indexOf(":") + if (colon > 0 && nodePart.slice(0, colon).indexOf(" ") === -1) { + const name = nodePart.slice(0, colon) + const propsStr = nodePart + .slice(colon + 1) + .trim() + .replace(/,$/, "") + return { + name, + properties: propsStr.length === 0 ? {} : parseMetrics(propsStr) + } + } + return { name: nodePart.replace(/,$/, ""), properties: {} } +} + +/** + * Parse a `key=value, key=value` string. Walks character-by-character so values + * can contain commas inside `()` or `[]` without being split. + */ +const parseMetrics = (s: string): Record => { + const result: Record = {} + const n = s.length + let i = 0 + while (i < n) { + const eq = s.indexOf("=", i) + if (eq === -1) break + const key = s.slice(i, eq).trim() + + let depthP = 0 + let depthB = 0 + let j = eq + 1 + while (j < n) { + const c = s.charCodeAt(j) + if (c === 40 /* ( */) depthP++ + else if (c === 41 /* ) */) depthP-- + else if (c === 91 /* [ */) depthB++ + else if (c === 93 /* ] */) depthB-- + else if (c === 44 /* , */ && depthP === 0 && depthB === 0) break + j++ + } + + result[key] = s.slice(eq + 1, j).trim() + i = j + 1 + } + return result +} + +const parseNumberWithUnit = (raw: string): number | null => { + const s = raw.trim() + const m = NUMBER_UNIT_RE.exec(s) + if (m !== null) { + const mult = MULTIPLIER_UNITS[m[2]!.toUpperCase()]! + return parseFloat(m[1]!) * mult + } + if (/^-?\d+$/.test(s)) return parseInt(s, 10) + const f = parseFloat(s) + if (!Number.isNaN(f) && /^-?[\d.]+(?:e[+-]?\d+)?$/.test(s)) return f + return null +} + +const extractNumeric = (raw: string): number | null => { + const value = raw.trim() + const num = parseNumberWithUnit(value) + if (num !== null) return num + const m = LEADING_NUMBER_RE.exec(value) + if (m !== null) { + return parseNumberWithUnit(m[0].replace(/^\{/, "").trim()) + } + return null +} + +/** + * Expand a single metric into one or more cells written into `row`. + * + * Mirrors Python `_expand_metric` plus the `_secs` rename that Python applies + * post-hoc via `df.rename`. We rename inline because the per-row Record we + * build *is* the table — there's no second pass. + */ +const expandMetric = ( + key: string, + raw: string, + row: Record +): void => { + const value = raw.trim() + + if (value.startsWith("N/A")) { + row[key] = null + return + } + + const tm = TOTAL_MATCHED_RE.exec(value) + if (tm !== null) { + row[`${key}_total`] = parseNumberWithUnit(tm[1]!) + row[`${key}_matched`] = parseNumberWithUnit(tm[2]!) + return + } + + const pct = PCT_RATIO_RE.exec(value) + if (pct !== null) { + row[key] = parseFloat(pct[1]!) + return + } + + const dur = DURATION_RE.exec(value) + if (dur !== null) { + row[`${key}_secs`] = parseFloat(dur[1]!) * DURATION_UNITS[dur[2]!]! + return + } + + const num = parseNumberWithUnit(value) + if (num !== null) { + row[key] = num + return + } + + row[key] = value +} diff --git a/packages/amp/test/arrow-flight/service.test.ts b/packages/amp/test/arrow-flight/service.test.ts index 5c0c131..18272d6 100644 --- a/packages/amp/test/arrow-flight/service.test.ts +++ b/packages/amp/test/arrow-flight/service.test.ts @@ -1,14 +1,17 @@ -import { create } from "@bufbuild/protobuf" +import { create, fromBinary } from "@bufbuild/protobuf" +import { AnySchema, anyUnpack } from "@bufbuild/protobuf/wkt" import { createRouterTransport } from "@connectrpc/connect" import * as ArrowFlight from "@edgeandnode/amp/arrow-flight" import { type FlightData, FlightDataSchema, + FlightDescriptor_DescriptorType, FlightEndpointSchema, FlightInfoSchema, FlightService, TicketSchema } from "@edgeandnode/amp/protobuf/Flight_pb" +import { CommandStatementQuerySchema } from "@edgeandnode/amp/protobuf/FlightSql_pb" import { describe, it } from "@effect/vitest" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" @@ -62,7 +65,76 @@ const toProtoFlightData = ( dataHeader: flightData.dataHeader }) +const makeCapturingTransport = (capturedSql: { value: string }) => + createRouterTransport((router) => { + const ticket = create(TicketSchema, { ticket: encoder.encode("ticket") }) + const endpoint = create(FlightEndpointSchema, { + appMetadata: new Uint8Array(0), + location: [], + ticket + }) + const flightInfo = create(FlightInfoSchema, { + appMetadata: new Uint8Array(0), + endpoint: [endpoint], + ordered: true, + schema: new Uint8Array(0), + totalBytes: 0n, + totalRecords: 0n + }) + + router.service(FlightService, { + doGet() { + async function* messages(): AsyncGenerator { + // Yield no flight data — `explain` should return an empty table. + } + return messages() + }, + getFlightInfo(req) { + if (req.type === FlightDescriptor_DescriptorType.CMD) { + const any = fromBinary(AnySchema, req.cmd) + const cmd = anyUnpack(any, CommandStatementQuerySchema) + if (cmd !== undefined) capturedSql.value = cmd.query + } + return flightInfo + } + }) + }) + describe("ArrowFlight", () => { + it.effect("explain prepends EXPLAIN to the SQL by default", ({ expect }) => + Effect.gen(function*() { + const captured = { value: "" } + const transport = makeCapturingTransport(captured) + const layer = ArrowFlight.layer.pipe( + Layer.provide(Layer.succeed(ArrowFlight.Transport, transport)) + ) + + const rows = yield* Effect.gen(function*() { + const flight = yield* ArrowFlight.ArrowFlight + return yield* flight.explain("SELECT * FROM foo LIMIT 10") + }).pipe(Effect.provide(layer)) + + expect(captured.value).toBe("EXPLAIN SELECT * FROM foo LIMIT 10") + expect(rows).toEqual([]) + })) + + it.effect("explain prepends EXPLAIN ANALYZE when analyze is true", ({ expect }) => + Effect.gen(function*() { + const captured = { value: "" } + const transport = makeCapturingTransport(captured) + const layer = ArrowFlight.layer.pipe( + Layer.provide(Layer.succeed(ArrowFlight.Transport, transport)) + ) + + const rows = yield* Effect.gen(function*() { + const flight = yield* ArrowFlight.ArrowFlight + return yield* flight.explain("SELECT 1", { analyze: true }) + }).pipe(Effect.provide(layer)) + + expect(captured.value).toBe("EXPLAIN ANALYZE SELECT 1") + expect(rows).toEqual([]) + })) + it.effect("passes binaryHandling to query output conversion", ({ expect }) => Effect.gen(function*() { const testSchema = SchemaBuilder.schema() diff --git a/packages/amp/test/internal/explain/parser.test.ts b/packages/amp/test/internal/explain/parser.test.ts new file mode 100644 index 0000000..bdeb94e --- /dev/null +++ b/packages/amp/test/internal/explain/parser.test.ts @@ -0,0 +1,149 @@ +import type { ExplainRow, PlanNode } from "@edgeandnode/amp/arrow-flight" +import { parsePlan, planToTable } from "@edgeandnode/amp/internal/explain/parser" +import { describe, expect, it } from "vitest" + +// Real EXPLAIN ANALYZE output captured from Amp. Two nodes: +// `CoalescePartitionsExec` at depth 0, `DataSourceExec` at depth 1. +const REAL_PLAN = + "CoalescePartitionsExec: fetch=10, metrics=[output_rows=10, elapsed_compute=9.59µs, output_bytes=1376.0 B, output_batches=1]\n" + + " DataSourceExec: file_groups={4 groups: [[000000000-4b156bc5f8c4b5c9.parquet:0..34765529, 046041222-7dc8e8e88196fb16.parquet:0..3289, 046041316-214bfa0c4faa7b3c.parquet:0..12946, 046041384-7686b0192d63ea56.parquet:0..3188, 046041393-58281870e72806fe.parquet:0..3189, ...], [035000000-3804cfa716034dd4.parquet:2062293511..4159378225], [035000000-3804cfa716034dd4.parquet:4159378225..6256462939], [035000000-3804cfa716034dd4.parquet:6256462939..6599305879, 046041223-c398ae24960243cf.parquet:0..3190, 046041379-42c6761e45c71a94.parquet:0..3188, 046041385-9130b047b7c4a25f.parquet:0..3061, 046041394-cf85f4d98090f0a3.parquet:0..3062, ...]]}, projection=[_block_num, timestamp, block_num, tx_hash, buyer_address, seller_address, value_usdc, nonce], limit=10, file_type=parquet, metrics=[output_rows=10, elapsed_compute=4ns, output_bytes=1376.0 B, output_batches=1, files_ranges_pruned_statistics=6 total → 6 matched, row_groups_pruned_statistics=74 total → 74 matched, row_groups_pruned_bloom_filter=74 total → 74 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, bytes_scanned=13.37 M, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, bloom_filter_eval_time=12ns, metadata_load_time=288.01µs, page_index_eval_time=12ns, row_pushdown_eval_time=12ns, statistics_eval_time=12ns, time_elapsed_opening=415.35µs, time_elapsed_processing=43.90ms, time_elapsed_scanning_total=230.61ms, time_elapsed_scanning_until_data=230.60ms, scan_efficiency_ratio=N/A (0/0)]\n" + +describe("parsePlan", () => { + it("returns an empty list for empty input", () => { + expect(parsePlan("")).toEqual([]) + expect(parsePlan(" \n \n")).toEqual([]) + }) + + it("parses the real plan into 2 nodes with depths and properties", () => { + const nodes = parsePlan(REAL_PLAN) + expect(nodes).toHaveLength(2) + + const root = nodes[0]! + expect(root.name).toBe("CoalescePartitionsExec") + expect(root.depth).toBe(0) + expect(root.properties).toEqual({ fetch: "10" }) + expect(Object.keys(root.metrics)).toEqual([ + "output_rows", + "elapsed_compute", + "output_bytes", + "output_batches" + ]) + + const child = nodes[1]! + expect(child.name).toBe("DataSourceExec") + expect(child.depth).toBe(1) + expect(child.properties["limit"]).toBe("10") + expect(child.properties["file_type"]).toBe("parquet") + expect(child.properties["projection"]).toBe( + "[_block_num, timestamp, block_num, tx_hash, buyer_address, seller_address, value_usdc, nonce]" + ) + // file_groups is a structured value with nested brackets — must survive + // the parser without being split on internal commas. + expect(child.properties["file_groups"]?.startsWith("{4 groups:")).toBe(true) + expect(child.properties["file_groups"]?.endsWith("]]}")).toBe(true) + }) + + it("parses a node with no properties", () => { + const nodes = parsePlan("ProjectionExec\n") + expect(nodes).toHaveLength(1) + expect(nodes[0]).toMatchObject>({ + name: "ProjectionExec", + depth: 0, + properties: {}, + metrics: {} + }) + }) + + it("parses a node with metrics but no extra properties", () => { + const nodes = parsePlan("CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=42]\n") + expect(nodes[0]!.properties).toEqual({ target_batch_size: "8192" }) + expect(nodes[0]!.metrics).toEqual({ output_rows: "42" }) + }) +}) + +describe("planToTable", () => { + it("normalizes the real plan into a table with expected cells", () => { + const table = planToTable(parsePlan(REAL_PLAN)) + expect(table).toHaveLength(2) + + const row0 = table[0] as ExplainRow & Record + expect(row0["node"]).toBe("CoalescePartitionsExec") + expect(row0["depth"]).toBe(0) + expect(row0["fetch"]).toBe(10) + expect(row0["output_rows"]).toBe(10) + expect(row0["elapsed_compute_secs"]).toBeCloseTo(9.59e-6, 12) + expect(row0["output_bytes"]).toBe("1376.0 B") + expect(row0["output_batches"]).toBe(1) + // Duration was renamed; the unsuffixed key must not also be present. + expect("elapsed_compute" in row0).toBe(false) + + const row1 = table[1] as ExplainRow & Record + expect(row1["node"]).toBe("DataSourceExec") + expect(row1["depth"]).toBe(1) + expect(row1["limit"]).toBe(10) + // file_groups is "{4 groups: ..." — leading-number extraction yields 4. + expect(row1["file_groups"]).toBe(4) + // Non-numeric properties are dropped from the table. + expect("file_type" in row1).toBe(false) + expect("projection" in row1).toBe(false) + + expect(row1["output_rows"]).toBe(10) + expect(row1["output_bytes"]).toBe("1376.0 B") + expect(row1["output_batches"]).toBe(1) + expect(row1["elapsed_compute_secs"]).toBeCloseTo(4e-9, 18) + + // total → matched expansions + expect(row1["files_ranges_pruned_statistics_total"]).toBe(6) + expect(row1["files_ranges_pruned_statistics_matched"]).toBe(6) + expect(row1["row_groups_pruned_statistics_total"]).toBe(74) + expect(row1["row_groups_pruned_statistics_matched"]).toBe(74) + expect(row1["row_groups_pruned_bloom_filter_total"]).toBe(74) + expect(row1["row_groups_pruned_bloom_filter_matched"]).toBe(74) + expect(row1["page_index_pages_pruned_total"]).toBe(0) + expect(row1["page_index_pages_pruned_matched"]).toBe(0) + expect(row1["page_index_rows_pruned_total"]).toBe(0) + expect(row1["page_index_rows_pruned_matched"]).toBe(0) + expect(row1["limit_pruned_row_groups_total"]).toBe(0) + expect(row1["limit_pruned_row_groups_matched"]).toBe(0) + + // Plain numbers + expect(row1["batches_split"]).toBe(0) + expect(row1["file_open_errors"]).toBe(0) + expect(row1["file_scan_errors"]).toBe(0) + expect(row1["num_predicate_creation_errors"]).toBe(0) + expect(row1["predicate_evaluation_errors"]).toBe(0) + expect(row1["pushdown_rows_matched"]).toBe(0) + expect(row1["pushdown_rows_pruned"]).toBe(0) + expect(row1["predicate_cache_inner_records"]).toBe(0) + expect(row1["predicate_cache_records"]).toBe(0) + + // M-suffix expansion + expect(row1["bytes_scanned"]).toBe(13.37e6) + + // Duration metrics + expect(row1["bloom_filter_eval_time_secs"]).toBeCloseTo(12e-9, 18) + expect(row1["metadata_load_time_secs"]).toBeCloseTo(288.01e-6, 12) + expect(row1["page_index_eval_time_secs"]).toBeCloseTo(12e-9, 18) + expect(row1["row_pushdown_eval_time_secs"]).toBeCloseTo(12e-9, 18) + expect(row1["statistics_eval_time_secs"]).toBeCloseTo(12e-9, 18) + expect(row1["time_elapsed_opening_secs"]).toBeCloseTo(415.35e-6, 12) + expect(row1["time_elapsed_processing_secs"]).toBeCloseTo(43.9e-3, 9) + expect(row1["time_elapsed_scanning_total_secs"]).toBeCloseTo(230.61e-3, 9) + expect(row1["time_elapsed_scanning_until_data_secs"]).toBeCloseTo(230.6e-3, 9) + + // N/A → null + expect(row1["scan_efficiency_ratio"]).toBeNull() + }) + + it("parses 'P% (out/in)' selectivity metrics as plain percent floats", () => { + const nodes = parsePlan("Foo: x=1, metrics=[selectivity=12.5% (1/8)]\n") + const table = planToTable(nodes) + expect(table[0]!["selectivity"]).toBe(12.5) + }) + + it("falls back to the raw string for an unparseable metric", () => { + const nodes = parsePlan("Foo: x=1, metrics=[shape=triangle]\n") + const table = planToTable(nodes) + expect(table[0]!["shape"]).toBe("triangle") + }) +}) From b904d0fde9ac2534f75c858ab87e7fb917e1afc6 Mon Sep 17 00:00:00 2001 From: Chris Whited Date: Fri, 15 May 2026 11:14:12 -1000 Subject: [PATCH 2/3] feat(explain query): build columns and rows result for explain result --- packages/amp/src/arrow-flight.ts | 2 +- packages/amp/src/arrow-flight/service.ts | 4 +- packages/amp/src/arrow-flight/types.ts | 13 ++ packages/amp/src/internal/explain/parser.ts | 62 +++++--- .../amp/test/arrow-flight/service.test.ts | 10 +- .../amp/test/internal/explain/parser.test.ts | 149 +++++++++++------- 6 files changed, 157 insertions(+), 83 deletions(-) diff --git a/packages/amp/src/arrow-flight.ts b/packages/amp/src/arrow-flight.ts index a20df75..7658f11 100644 --- a/packages/amp/src/arrow-flight.ts +++ b/packages/amp/src/arrow-flight.ts @@ -33,7 +33,7 @@ export { // Types // ============================================================================= -export { ExplainCell, ExplainRow, PlanNode } from "./arrow-flight/types.ts" +export { ExplainCell, ExplainResult, ExplainRow, PlanNode } from "./arrow-flight/types.ts" export { type ExtractQueryResult, type QueryOptions, type QueryResult } from "./arrow-flight/types.ts" // ============================================================================= diff --git a/packages/amp/src/arrow-flight/service.ts b/packages/amp/src/arrow-flight/service.ts index 2ee97a0..5de9336 100644 --- a/packages/amp/src/arrow-flight/service.ts +++ b/packages/amp/src/arrow-flight/service.ts @@ -36,7 +36,7 @@ import { TicketNotFoundError } from "./errors.ts" import { AuthInfoContextKey, Transport } from "./transport.ts" -import type { ExplainRow, ExtractQueryResult, QueryOptions } from "./types.ts" +import type { ExplainResult, ExtractQueryResult, QueryOptions } from "./types.ts" // ============================================================================= // Arrow Flight Service @@ -83,7 +83,7 @@ export class ArrowFlight extends Context.Service< readonly explain: ( sql: string, options?: { readonly analyze?: boolean | undefined } - ) => Effect.Effect, ArrowFlightError> + ) => Effect.Effect } >()("Amp/ArrowFlight") {} diff --git a/packages/amp/src/arrow-flight/types.ts b/packages/amp/src/arrow-flight/types.ts index 4eff98b..788e913 100644 --- a/packages/amp/src/arrow-flight/types.ts +++ b/packages/amp/src/arrow-flight/types.ts @@ -96,3 +96,16 @@ export const PlanNode = Schema.Struct({ metrics: Schema.Record(Schema.String, Schema.String) }).annotate({ identifier: "PlanNode" }) export type PlanNode = typeof PlanNode.Type + +/** + * The result returned by `ArrowFlight.explain` — a tabular view of the parsed + * plan suitable for direct rendering or serialization. `columns` is the union + * of all keys across `rows` in first-appearance order (`node`, `depth`, then + * properties, then metric columns), so callers can drive a table render + * without recomputing the header set themselves. + */ +export const ExplainResult = Schema.Struct({ + columns: Schema.Array(Schema.String), + rows: Schema.Array(ExplainRow) +}).annotate({ identifier: "ExplainResult" }) +export type ExplainResult = typeof ExplainResult.Type diff --git a/packages/amp/src/internal/explain/parser.ts b/packages/amp/src/internal/explain/parser.ts index 7f18ffa..3d00567 100644 --- a/packages/amp/src/internal/explain/parser.ts +++ b/packages/amp/src/internal/explain/parser.ts @@ -1,5 +1,5 @@ import * as Schema from "effect/Schema" -import { type ExplainCell, type ExplainRow, PlanNode } from "../../arrow-flight/types.ts" +import { type ExplainCell, ExplainResult, PlanNode } from "../../arrow-flight/types.ts" // ============================================================================= // Internal Schemas @@ -82,30 +82,43 @@ export const parsePlan = (planText: string): ReadonlyArray => { } /** - * Convert a list of `PlanNode`s into a flat table of rows (one row per node) - * with normalized units. Mirrors the Python `ExplainResult.to_dataframe()` + * Convert a list of `PlanNode`s into the table form returned by + * `ArrowFlight.explain`. Mirrors the Python `ExplainResult.to_dataframe()` * shape: numeric properties only, expanded metrics, durations in seconds with * `_secs` suffix, `total → matched` split into two columns. + * + * `columns` is the union of all keys across rows, in first-appearance order + * (`node`, `depth`, then properties, then metric columns). */ -export const planToTable = ( - nodes: ReadonlyArray -): ReadonlyArray => { +export const planToTable = (nodes: ReadonlyArray): ExplainResult => { const rows: Array> = [] + const seen = new Set() + const columns: Array = [] + const recordKey = (key: string): void => { + if (seen.has(key)) return + seen.add(key) + columns.push(key) + } + for (const node of nodes) { - const row: Record = { - node: node.name, - depth: node.depth - } + const row: Record = {} + row["node"] = node.name + recordKey("node") + row["depth"] = node.depth + recordKey("depth") for (const key in node.properties) { const num = extractNumeric(node.properties[key]!) - if (num !== null) row[key] = num + if (num !== null) { + row[key] = num + recordKey(key) + } } for (const key in node.metrics) { - expandMetric(key, node.metrics[key]!, row) + expandMetric(key, node.metrics[key]!, row, recordKey) } rows.push(row) } - return rows + return ExplainResult.make({ columns, rows }) } // ============================================================================= @@ -187,7 +200,9 @@ const extractNumeric = (raw: string): number | null => { } /** - * Expand a single metric into one or more cells written into `row`. + * Expand a single metric into one or more cells written into `row`, recording + * each emitted column key via `recordKey` so the caller can build a stable, + * first-appearance ordering of the table's columns. * * Mirrors Python `_expand_metric` plus the `_secs` rename that Python applies * post-hoc via `df.rename`. We rename inline because the per-row Record we @@ -196,39 +211,50 @@ const extractNumeric = (raw: string): number | null => { const expandMetric = ( key: string, raw: string, - row: Record + row: Record, + recordKey: (key: string) => void ): void => { const value = raw.trim() if (value.startsWith("N/A")) { row[key] = null + recordKey(key) return } const tm = TOTAL_MATCHED_RE.exec(value) if (tm !== null) { - row[`${key}_total`] = parseNumberWithUnit(tm[1]!) - row[`${key}_matched`] = parseNumberWithUnit(tm[2]!) + const totalKey = `${key}_total` + const matchedKey = `${key}_matched` + row[totalKey] = parseNumberWithUnit(tm[1]!) + row[matchedKey] = parseNumberWithUnit(tm[2]!) + recordKey(totalKey) + recordKey(matchedKey) return } const pct = PCT_RATIO_RE.exec(value) if (pct !== null) { row[key] = parseFloat(pct[1]!) + recordKey(key) return } const dur = DURATION_RE.exec(value) if (dur !== null) { - row[`${key}_secs`] = parseFloat(dur[1]!) * DURATION_UNITS[dur[2]!]! + const secsKey = `${key}_secs` + row[secsKey] = parseFloat(dur[1]!) * DURATION_UNITS[dur[2]!]! + recordKey(secsKey) return } const num = parseNumberWithUnit(value) if (num !== null) { row[key] = num + recordKey(key) return } row[key] = value + recordKey(key) } diff --git a/packages/amp/test/arrow-flight/service.test.ts b/packages/amp/test/arrow-flight/service.test.ts index 18272d6..6f3c7d9 100644 --- a/packages/amp/test/arrow-flight/service.test.ts +++ b/packages/amp/test/arrow-flight/service.test.ts @@ -109,13 +109,14 @@ describe("ArrowFlight", () => { Layer.provide(Layer.succeed(ArrowFlight.Transport, transport)) ) - const rows = yield* Effect.gen(function*() { + const result = yield* Effect.gen(function*() { const flight = yield* ArrowFlight.ArrowFlight return yield* flight.explain("SELECT * FROM foo LIMIT 10") }).pipe(Effect.provide(layer)) expect(captured.value).toBe("EXPLAIN SELECT * FROM foo LIMIT 10") - expect(rows).toEqual([]) + expect(result.rows).toEqual([]) + expect(result.columns).toEqual([]) })) it.effect("explain prepends EXPLAIN ANALYZE when analyze is true", ({ expect }) => @@ -126,13 +127,14 @@ describe("ArrowFlight", () => { Layer.provide(Layer.succeed(ArrowFlight.Transport, transport)) ) - const rows = yield* Effect.gen(function*() { + const result = yield* Effect.gen(function*() { const flight = yield* ArrowFlight.ArrowFlight return yield* flight.explain("SELECT 1", { analyze: true }) }).pipe(Effect.provide(layer)) expect(captured.value).toBe("EXPLAIN ANALYZE SELECT 1") - expect(rows).toEqual([]) + expect(result.rows).toEqual([]) + expect(result.columns).toEqual([]) })) it.effect("passes binaryHandling to query output conversion", ({ expect }) => diff --git a/packages/amp/test/internal/explain/parser.test.ts b/packages/amp/test/internal/explain/parser.test.ts index bdeb94e..fb220d9 100644 --- a/packages/amp/test/internal/explain/parser.test.ts +++ b/packages/amp/test/internal/explain/parser.test.ts @@ -2,6 +2,8 @@ import type { ExplainRow, PlanNode } from "@edgeandnode/amp/arrow-flight" import { parsePlan, planToTable } from "@edgeandnode/amp/internal/explain/parser" import { describe, expect, it } from "vitest" +const cell = (row: ExplainRow, key: string): unknown => (row as Record)[key] + // Real EXPLAIN ANALYZE output captured from Amp. Two nodes: // `CoalescePartitionsExec` at depth 0, `DataSourceExec` at depth 1. const REAL_PLAN = @@ -62,88 +64,119 @@ describe("parsePlan", () => { }) describe("planToTable", () => { + it("returns empty rows + columns for an empty plan", () => { + const result = planToTable([]) + expect(result.rows).toEqual([]) + expect(result.columns).toEqual([]) + }) + it("normalizes the real plan into a table with expected cells", () => { - const table = planToTable(parsePlan(REAL_PLAN)) - expect(table).toHaveLength(2) - - const row0 = table[0] as ExplainRow & Record - expect(row0["node"]).toBe("CoalescePartitionsExec") - expect(row0["depth"]).toBe(0) - expect(row0["fetch"]).toBe(10) - expect(row0["output_rows"]).toBe(10) - expect(row0["elapsed_compute_secs"]).toBeCloseTo(9.59e-6, 12) - expect(row0["output_bytes"]).toBe("1376.0 B") - expect(row0["output_batches"]).toBe(1) + const result = planToTable(parsePlan(REAL_PLAN)) + expect(result.rows).toHaveLength(2) + + const row0 = result.rows[0]! + expect(cell(row0, "node")).toBe("CoalescePartitionsExec") + expect(cell(row0, "depth")).toBe(0) + expect(cell(row0, "fetch")).toBe(10) + expect(cell(row0, "output_rows")).toBe(10) + expect(cell(row0, "elapsed_compute_secs")).toBeCloseTo(9.59e-6, 12) + expect(cell(row0, "output_bytes")).toBe("1376.0 B") + expect(cell(row0, "output_batches")).toBe(1) // Duration was renamed; the unsuffixed key must not also be present. expect("elapsed_compute" in row0).toBe(false) - const row1 = table[1] as ExplainRow & Record - expect(row1["node"]).toBe("DataSourceExec") - expect(row1["depth"]).toBe(1) - expect(row1["limit"]).toBe(10) + const row1 = result.rows[1]! + expect(cell(row1, "node")).toBe("DataSourceExec") + expect(cell(row1, "depth")).toBe(1) + expect(cell(row1, "limit")).toBe(10) // file_groups is "{4 groups: ..." — leading-number extraction yields 4. - expect(row1["file_groups"]).toBe(4) + expect(cell(row1, "file_groups")).toBe(4) // Non-numeric properties are dropped from the table. expect("file_type" in row1).toBe(false) expect("projection" in row1).toBe(false) - expect(row1["output_rows"]).toBe(10) - expect(row1["output_bytes"]).toBe("1376.0 B") - expect(row1["output_batches"]).toBe(1) - expect(row1["elapsed_compute_secs"]).toBeCloseTo(4e-9, 18) + expect(cell(row1, "output_rows")).toBe(10) + expect(cell(row1, "output_bytes")).toBe("1376.0 B") + expect(cell(row1, "output_batches")).toBe(1) + expect(cell(row1, "elapsed_compute_secs")).toBeCloseTo(4e-9, 18) // total → matched expansions - expect(row1["files_ranges_pruned_statistics_total"]).toBe(6) - expect(row1["files_ranges_pruned_statistics_matched"]).toBe(6) - expect(row1["row_groups_pruned_statistics_total"]).toBe(74) - expect(row1["row_groups_pruned_statistics_matched"]).toBe(74) - expect(row1["row_groups_pruned_bloom_filter_total"]).toBe(74) - expect(row1["row_groups_pruned_bloom_filter_matched"]).toBe(74) - expect(row1["page_index_pages_pruned_total"]).toBe(0) - expect(row1["page_index_pages_pruned_matched"]).toBe(0) - expect(row1["page_index_rows_pruned_total"]).toBe(0) - expect(row1["page_index_rows_pruned_matched"]).toBe(0) - expect(row1["limit_pruned_row_groups_total"]).toBe(0) - expect(row1["limit_pruned_row_groups_matched"]).toBe(0) + expect(cell(row1, "files_ranges_pruned_statistics_total")).toBe(6) + expect(cell(row1, "files_ranges_pruned_statistics_matched")).toBe(6) + expect(cell(row1, "row_groups_pruned_statistics_total")).toBe(74) + expect(cell(row1, "row_groups_pruned_statistics_matched")).toBe(74) + expect(cell(row1, "row_groups_pruned_bloom_filter_total")).toBe(74) + expect(cell(row1, "row_groups_pruned_bloom_filter_matched")).toBe(74) + expect(cell(row1, "page_index_pages_pruned_total")).toBe(0) + expect(cell(row1, "page_index_pages_pruned_matched")).toBe(0) + expect(cell(row1, "page_index_rows_pruned_total")).toBe(0) + expect(cell(row1, "page_index_rows_pruned_matched")).toBe(0) + expect(cell(row1, "limit_pruned_row_groups_total")).toBe(0) + expect(cell(row1, "limit_pruned_row_groups_matched")).toBe(0) // Plain numbers - expect(row1["batches_split"]).toBe(0) - expect(row1["file_open_errors"]).toBe(0) - expect(row1["file_scan_errors"]).toBe(0) - expect(row1["num_predicate_creation_errors"]).toBe(0) - expect(row1["predicate_evaluation_errors"]).toBe(0) - expect(row1["pushdown_rows_matched"]).toBe(0) - expect(row1["pushdown_rows_pruned"]).toBe(0) - expect(row1["predicate_cache_inner_records"]).toBe(0) - expect(row1["predicate_cache_records"]).toBe(0) + expect(cell(row1, "batches_split")).toBe(0) + expect(cell(row1, "file_open_errors")).toBe(0) + expect(cell(row1, "file_scan_errors")).toBe(0) + expect(cell(row1, "num_predicate_creation_errors")).toBe(0) + expect(cell(row1, "predicate_evaluation_errors")).toBe(0) + expect(cell(row1, "pushdown_rows_matched")).toBe(0) + expect(cell(row1, "pushdown_rows_pruned")).toBe(0) + expect(cell(row1, "predicate_cache_inner_records")).toBe(0) + expect(cell(row1, "predicate_cache_records")).toBe(0) // M-suffix expansion - expect(row1["bytes_scanned"]).toBe(13.37e6) + expect(cell(row1, "bytes_scanned")).toBe(13.37e6) // Duration metrics - expect(row1["bloom_filter_eval_time_secs"]).toBeCloseTo(12e-9, 18) - expect(row1["metadata_load_time_secs"]).toBeCloseTo(288.01e-6, 12) - expect(row1["page_index_eval_time_secs"]).toBeCloseTo(12e-9, 18) - expect(row1["row_pushdown_eval_time_secs"]).toBeCloseTo(12e-9, 18) - expect(row1["statistics_eval_time_secs"]).toBeCloseTo(12e-9, 18) - expect(row1["time_elapsed_opening_secs"]).toBeCloseTo(415.35e-6, 12) - expect(row1["time_elapsed_processing_secs"]).toBeCloseTo(43.9e-3, 9) - expect(row1["time_elapsed_scanning_total_secs"]).toBeCloseTo(230.61e-3, 9) - expect(row1["time_elapsed_scanning_until_data_secs"]).toBeCloseTo(230.6e-3, 9) + expect(cell(row1, "bloom_filter_eval_time_secs")).toBeCloseTo(12e-9, 18) + expect(cell(row1, "metadata_load_time_secs")).toBeCloseTo(288.01e-6, 12) + expect(cell(row1, "page_index_eval_time_secs")).toBeCloseTo(12e-9, 18) + expect(cell(row1, "row_pushdown_eval_time_secs")).toBeCloseTo(12e-9, 18) + expect(cell(row1, "statistics_eval_time_secs")).toBeCloseTo(12e-9, 18) + expect(cell(row1, "time_elapsed_opening_secs")).toBeCloseTo(415.35e-6, 12) + expect(cell(row1, "time_elapsed_processing_secs")).toBeCloseTo(43.9e-3, 9) + expect(cell(row1, "time_elapsed_scanning_total_secs")).toBeCloseTo(230.61e-3, 9) + expect(cell(row1, "time_elapsed_scanning_until_data_secs")).toBeCloseTo(230.6e-3, 9) // N/A → null - expect(row1["scan_efficiency_ratio"]).toBeNull() + expect(cell(row1, "scan_efficiency_ratio")).toBeNull() + }) + + it("emits columns in first-appearance order across rows", () => { + const result = planToTable(parsePlan(REAL_PLAN)) + // Root-only columns come first, then DataSourceExec-only columns appear + // when they're first introduced by row 1. + expect(result.columns.slice(0, 7)).toEqual([ + "node", + "depth", + "fetch", + "output_rows", + "elapsed_compute_secs", + "output_bytes", + "output_batches" + ]) + // `limit` is the first row-1-only property, immediately after the shared + // root metric columns. + const limitIdx = result.columns.indexOf("limit") + expect(limitIdx).toBeGreaterThan(6) + // No duplicates. + expect(result.columns.length).toBe(new Set(result.columns).size) + // Every row's keys are a subset of `columns`. + for (const row of result.rows) { + for (const key of Object.keys(row)) { + expect(result.columns).toContain(key) + } + } }) it("parses 'P% (out/in)' selectivity metrics as plain percent floats", () => { - const nodes = parsePlan("Foo: x=1, metrics=[selectivity=12.5% (1/8)]\n") - const table = planToTable(nodes) - expect(table[0]!["selectivity"]).toBe(12.5) + const result = planToTable(parsePlan("Foo: x=1, metrics=[selectivity=12.5% (1/8)]\n")) + expect(cell(result.rows[0]!, "selectivity")).toBe(12.5) }) it("falls back to the raw string for an unparseable metric", () => { - const nodes = parsePlan("Foo: x=1, metrics=[shape=triangle]\n") - const table = planToTable(nodes) - expect(table[0]!["shape"]).toBe("triangle") + const result = planToTable(parsePlan("Foo: x=1, metrics=[shape=triangle]\n")) + expect(cell(result.rows[0]!, "shape")).toBe("triangle") }) }) From 9f64482245ff53354a3c97ad0372a65096770468 Mon Sep 17 00:00:00 2001 From: Chris Whited Date: Fri, 15 May 2026 12:03:02 -1000 Subject: [PATCH 3/3] feat(explain query): validate explain query does not already have explain prepended --- packages/amp/src/arrow-flight/service.ts | 6 ++-- packages/amp/src/internal/explain/parser.ts | 18 ++++++++++ .../amp/test/internal/explain/parser.test.ts | 34 ++++++++++++++++++- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/packages/amp/src/arrow-flight/service.ts b/packages/amp/src/arrow-flight/service.ts index 5de9336..fc74766 100644 --- a/packages/amp/src/arrow-flight/service.ts +++ b/packages/amp/src/arrow-flight/service.ts @@ -23,7 +23,7 @@ import { MessageHeaderType, parseSchema } from "../internal/arrow-flight-ipc/Schema.ts" -import { ExplainResultRow, parsePlan, planToTable } from "../internal/explain/parser.ts" +import { ExplainResultRow, parsePlan, planToTable, prefixExplain } from "../internal/explain/parser.ts" import { FlightDescriptor_DescriptorType, FlightDescriptorSchema, FlightService } from "../protobuf/Flight_pb.ts" import { CommandStatementQuerySchema } from "../protobuf/FlightSql_pb.ts" import { @@ -272,8 +272,8 @@ const make = Effect.gen(function*() { sql: string, options?: { readonly analyze?: boolean | undefined } ) { - const prefix = options?.analyze ? "EXPLAIN ANALYZE " : "EXPLAIN " - const head = yield* streamQuery(prefix + sql, { + const prepared = prefixExplain(sql, options?.analyze ?? false) + const head = yield* streamQuery(prepared, { schema: ExplainResultRow }).pipe(Stream.runHead) const planText = head.pipe( diff --git a/packages/amp/src/internal/explain/parser.ts b/packages/amp/src/internal/explain/parser.ts index 3d00567..6e5cde1 100644 --- a/packages/amp/src/internal/explain/parser.ts +++ b/packages/amp/src/internal/explain/parser.ts @@ -47,6 +47,24 @@ const MULTIPLIER_UNITS: Record = { TB: 1e12 } +// ============================================================================= +// SQL Prefix +// ============================================================================= + +const LEADING_EXPLAIN_RE = /^\s*EXPLAIN\b/i + +/** + * Prepend `EXPLAIN` (or `EXPLAIN ANALYZE` when `analyze` is true) to the + * given SQL, unless the SQL already starts with `EXPLAIN` (in any case), + * in which case it is returned unchanged. This avoids producing invalid + * statements like `EXPLAIN EXPLAIN ANALYZE SELECT ...` when callers paste + * a query that was already prefixed. + */ +export const prefixExplain = (sql: string, analyze: boolean): string => { + if (LEADING_EXPLAIN_RE.test(sql)) return sql + return (analyze ? "EXPLAIN ANALYZE " : "EXPLAIN ") + sql +} + // ============================================================================= // Plan Parsing // ============================================================================= diff --git a/packages/amp/test/internal/explain/parser.test.ts b/packages/amp/test/internal/explain/parser.test.ts index fb220d9..27a5896 100644 --- a/packages/amp/test/internal/explain/parser.test.ts +++ b/packages/amp/test/internal/explain/parser.test.ts @@ -1,5 +1,5 @@ import type { ExplainRow, PlanNode } from "@edgeandnode/amp/arrow-flight" -import { parsePlan, planToTable } from "@edgeandnode/amp/internal/explain/parser" +import { parsePlan, planToTable, prefixExplain } from "@edgeandnode/amp/internal/explain/parser" import { describe, expect, it } from "vitest" const cell = (row: ExplainRow, key: string): unknown => (row as Record)[key] @@ -180,3 +180,35 @@ describe("planToTable", () => { expect(cell(result.rows[0]!, "shape")).toBe("triangle") }) }) + +describe("prefixExplain", () => { + it("prepends EXPLAIN when the SQL has no leading EXPLAIN", () => { + expect(prefixExplain("SELECT 1", false)).toBe("EXPLAIN SELECT 1") + }) + + it("prepends EXPLAIN ANALYZE when analyze is true", () => { + expect(prefixExplain("SELECT 1", true)).toBe("EXPLAIN ANALYZE SELECT 1") + }) + + it("returns the SQL unchanged if it already starts with EXPLAIN", () => { + expect(prefixExplain("EXPLAIN SELECT 1", false)).toBe("EXPLAIN SELECT 1") + // The user's own EXPLAIN wins, even when analyze is requested — we don't + // try to upgrade EXPLAIN to EXPLAIN ANALYZE silently. + expect(prefixExplain("EXPLAIN SELECT 1", true)).toBe("EXPLAIN SELECT 1") + }) + + it("returns the SQL unchanged if it already starts with EXPLAIN ANALYZE", () => { + expect(prefixExplain("EXPLAIN ANALYZE SELECT 1", true)).toBe("EXPLAIN ANALYZE SELECT 1") + expect(prefixExplain("EXPLAIN ANALYZE SELECT 1", false)).toBe("EXPLAIN ANALYZE SELECT 1") + }) + + it("matches case-insensitively and tolerates leading whitespace", () => { + expect(prefixExplain("explain analyze select 1", false)).toBe("explain analyze select 1") + expect(prefixExplain(" ExPlAiN SELECT 1", true)).toBe(" ExPlAiN SELECT 1") + }) + + it("does not match identifiers that merely start with the letters EXPLAIN", () => { + // No word boundary after EXPLAIN — should be treated as a normal query. + expect(prefixExplain("EXPLAINER FROM foo", false)).toBe("EXPLAIN EXPLAINER FROM foo") + }) +})