Skip to content

feat: support approx_percentile / percentile_approx aggregate#4801

Open
andygrove wants to merge 12 commits into
apache:mainfrom
andygrove:feat/approx-percentile
Open

feat: support approx_percentile / percentile_approx aggregate#4801
andygrove wants to merge 12 commits into
apache:mainfrom
andygrove:feat/approx-percentile

Conversation

@andygrove

@andygrove andygrove commented Jul 2, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

There is no dedicated tracking issue for the feature itself. Prior to this PR the compatibility guide explicitly listed approx_percentile / percentile_approx as not planned; this PR implements them and removes that note.

This PR also fixes #4813, a pre-existing mixed-execution bug that approx_percentile surfaced (see below).

Rationale for this change

approx_percentile / percentile_approx is a common Spark aggregate that had no Comet support. It was previously considered out of scope on the assumption that its approximate results could not be made bit-identical to Spark, because Spark uses the Greenwald-Khanna (GK) QuantileSummaries algorithm while DataFusion's built-in approx_percentile_cont uses t-digest, which produces different values.

This PR takes the faithful-port approach instead: it reimplements Spark's GK QuantileSummaries in Rust. Because Comet feeds values to the accumulator in the same intra-partition order as Spark and the GK arithmetic/traversal is reproduced exactly, results are bit-identical to Spark for deterministic plans, and otherwise stay within Spark's own accuracy guarantee (Spark's own result already varies across partitionings for this aggregate). The expression is therefore marked Compatible, with no need for spark.comet.expr.allowIncompatible.

What changes are included in this PR?

Native, all-native (partial and final) implementation wired through the standard aggregate layers:

  • Native GK core (native/spark-expr/src/agg_funcs/quantile_summaries.rs): a bit-for-bit port of Spark's QuantileSummaries (insert with the 50000-value head-buffer flush, backward compress traversal that preserves the minimum, the non-commutative merge with asymmetric delta adjustment and tie-break, and the ceil-rank query with the ascending multi-percentile sweep). Serialization uses a Comet-internal little-endian layout.
  • DataFusion UDAF + accumulator (native/spark-expr/src/agg_funcs/approx_percentile.rs): wraps one summary per group, carries the digest across partial/final as a single Binary state value, and casts each result back to the input type (exact, since GK returns an actual inserted value).
  • Protobuf + planner (native/proto/src/proto/expr.proto, native/core/src/execution/planner.rs): a new ApproxPercentile message and planner arm.
  • Scala serde (spark/src/main/scala/org/apache/comet/serde/aggregates.scala, QueryPlanSerde.scala): CometApproxPercentile, marked Compatible for byte/short/int/long/float/double, supporting the scalar, array-of-percentiles, and explicit-accuracy signatures. A single percentage maps to a scalar result; an array maps to an array result.
  • Docs: removes the "not planned" note and adds approx_percentile to the aggregate function tables.

Scope: numeric input types byte/short/int/long/float/double. Decimal, date, timestamp, TimestampNTZ, and interval inputs are left for a follow-on PR.

Mixed-execution fix (#4813). approx_percentile surfaced a pre-existing bug: in a distinct-aggregate rewrite the non-distinct aggregate's Partial is separated from its Final by intermediate PartialMerge stages, and the existing #1389 guards only covered a direct Partial → Final pair. So an aggregate with an incompatible intermediate buffer could run part of the chain in Comet and part in Spark, handing a Comet-encoded buffer to a Spark aggregate (or vice versa) — a hard crash for percentile_approx / collect_set, and a silent-wrong-result risk for others. Fixed with two dual guards:

  • CometExecRule: findPartialAggInPlan now walks through the intermediate PartialMerge stages to tag the bottom Partial, so the whole chain falls back to Spark.
  • CometBaseAggregate.doConvert: the sparkFinalMode guard is generalized so a Comet PartialMerge that merges buffers (not just a Final) requires a Comet producer below it, covering the Spark-Partial → Comet-Merge direction.

How are these changes tested?

  • Rust unit tests on QuantileSummaries (query returns actual inserted values, results within the relative-error bound, multi-percentile matches single, merge within bound, serialization round-trip) and on the accumulator (scalar/array output, empty input yields null, split-then-merge matches single-shot).

  • End-to-end SQL file tests (spark/src/test/resources/sql-tests/expressions/aggregate/approx_percentile.sql) covering scalar, array of percentiles, non-default accuracy, group-by, and byte/short/int/long/float/double inputs, plus nulls-ignored and empty-input-yields-null. These run each query in Spark with and without Comet and diff the results, and assert native execution (a fallback fails the test), so they are the authoritative Spark-compatibility check. Passing on the Spark 3.5 and 4.1 profiles.

  • Mixed-execution regression tests for Distinct-aggregate rewrite can split an incompatible-buffer aggregate across Comet and Spark, causing crash or wrong results #4813: CometExecRuleSuite asserts the whole distinct-rewrite chain falls back to Spark when either the partial or the final is forced non-native (via the comet.exec.*HashAggregate.enabled debug configs), and CometAggregateSuite runs percentile_approx + count(DISTINCT ...) across the partial/final split matrix and checks results match Spark.

Benchmark results

Added approx_percentile cases to CometAggregateExpressionBenchmark. Because
approx_percentile is a TypedImperativeAggregate (planned as
ObjectHashAggregate), which only runs natively when Comet shuffle is enabled,
the shared CometBenchmarkBase now sets the Comet shuffle manager at startup so
the benchmark measures native execution.

Local run, single node (local[1]), 1,048,576 rows, GROUP BY ~1000 groups
unless noted. Best time, lower is better.

Case Spark (ms) Comet (ms) Speedup
approx_percentile(int, 0.5) 347 80 4.4x
approx_percentile(long, 0.5) 400 96 4.2x
approx_percentile(double, 0.5) 332 73 4.6x
approx_percentile(double, 0.9) 384 76 5.1x
approx_percentile(double, array(0.25,0.5,0.75)) 354 74 4.8x
approx_percentile(double, 0.5, 100) 316 60 5.2x
approx_percentile(double, 0.5) (no group by) 239 31 7.7x
approx_percentile(double, 0.5) (high cardinality) 470 305 1.5x

andygrove added 8 commits July 2, 2026 14:36
Wrap the QuantileSummaries (Greenwald-Khanna) sketch in a DataFusion
AggregateUDFImpl and per-row Accumulator for Spark's approx_percentile /
percentile_approx. State is carried as a single Binary ScalarValue using
the Comet-internal little-endian digest format. Supports byte, short,
int, long, float, and double inputs, casting the query result back to
the original input type. groups_accumulator_supported is false; this
runs as a row accumulator per group.
Add a SQL file test that runs approx_percentile scalar, array, group-by,
accuracy, doubles, floats, null, and empty-input forms through both Spark
and Comet natively, then update the compatibility docs to reflect that
approx_percentile now runs natively for the six supported numeric types.
…ounting

Add heap_size() to QuantileSummaries so the accumulator's size() reflects
actual sampled/head buffer memory instead of only the struct footprint.
Document the intentional i64-vs-toInt delta deviation from Spark. Extend
the SQL file test with byte and short input coverage, a non-default
accuracy case, and fix a comment describing range().id as int.
- Use SingleRowListArrayBuilder for the array output branch.
- Inline the trivial output_element_type wrapper.
- Fast path in update_batch when the batch has no nulls.
- Honor the compressed flag in QuantileSummaries::compress to avoid
  redundant recompression, matching Spark's compress-once semantics.
- Move the peer digest into an empty accumulator instead of cloning.
- Pre-size the merged sampled vector.
- Keep the QuantileSummaries GK helper private (mirror welford).
@andygrove andygrove marked this pull request as ready for review July 2, 2026 21:40
andygrove added 2 commits July 2, 2026 16:24
ObjectHashAggregate-based aggregates (e.g. approx_percentile) only run
natively when Comet shuffle is enabled, which requires the Comet shuffle
manager. Set it on the SparkConf at startup so all benchmarks using this
base measure native execution instead of falling back to Spark.
…t and Spark in distinct rewrite

A distinct-aggregate rewrite separates a non-distinct aggregate's Partial
from its Final by intermediate PartialMerge stages. The existing mixed-
execution guards (apache#1389) only covered a direct Partial -> Final pair, so an
aggregate with an incompatible intermediate buffer (e.g. percentile_approx)
could run part of the chain in Comet and part in Spark, handing a Comet-
encoded buffer to a Spark aggregate (or vice versa) and crashing.

- CometExecRule: walk findPartialAggInPlan through intermediate PartialMerge
  stages to tag the bottom Partial so the whole chain falls back to Spark.
- CometBaseAggregate.doConvert: generalize the sparkFinalMode guard so a Comet
  PartialMerge that merges buffers (not just a Final) requires a Comet producer
  below it, covering the Spark-Partial -> Comet-Merge direction.

Fixes the ObjectHashAggregateSuite "[typed, with distinct]" crash. See apache#4813.
@andygrove andygrove added this to the 1.0.0 milestone Jul 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Distinct-aggregate rewrite can split an incompatible-buffer aggregate across Comet and Spark, causing crash or wrong results

1 participant