feat: support approx_percentile / percentile_approx aggregate#4801
Open
andygrove wants to merge 12 commits into
Open
feat: support approx_percentile / percentile_approx aggregate#4801andygrove wants to merge 12 commits into
andygrove wants to merge 12 commits into
Conversation
Wrap the QuantileSummaries (Greenwald-Khanna) sketch in a DataFusion AggregateUDFImpl and per-row Accumulator for Spark's approx_percentile / percentile_approx. State is carried as a single Binary ScalarValue using the Comet-internal little-endian digest format. Supports byte, short, int, long, float, and double inputs, casting the query result back to the original input type. groups_accumulator_supported is false; this runs as a row accumulator per group.
Add a SQL file test that runs approx_percentile scalar, array, group-by, accuracy, doubles, floats, null, and empty-input forms through both Spark and Comet natively, then update the compatibility docs to reflect that approx_percentile now runs natively for the six supported numeric types.
…ounting Add heap_size() to QuantileSummaries so the accumulator's size() reflects actual sampled/head buffer memory instead of only the struct footprint. Document the intentional i64-vs-toInt delta deviation from Spark. Extend the SQL file test with byte and short input coverage, a non-default accuracy case, and fix a comment describing range().id as int.
- Use SingleRowListArrayBuilder for the array output branch. - Inline the trivial output_element_type wrapper. - Fast path in update_batch when the batch has no nulls. - Honor the compressed flag in QuantileSummaries::compress to avoid redundant recompression, matching Spark's compress-once semantics. - Move the peer digest into an empty accumulator instead of cloning. - Pre-size the merged sampled vector. - Keep the QuantileSummaries GK helper private (mirror welford).
ObjectHashAggregate-based aggregates (e.g. approx_percentile) only run natively when Comet shuffle is enabled, which requires the Comet shuffle manager. Set it on the SparkConf at startup so all benchmarks using this base measure native execution instead of falling back to Spark.
…t and Spark in distinct rewrite A distinct-aggregate rewrite separates a non-distinct aggregate's Partial from its Final by intermediate PartialMerge stages. The existing mixed- execution guards (apache#1389) only covered a direct Partial -> Final pair, so an aggregate with an incompatible intermediate buffer (e.g. percentile_approx) could run part of the chain in Comet and part in Spark, handing a Comet- encoded buffer to a Spark aggregate (or vice versa) and crashing. - CometExecRule: walk findPartialAggInPlan through intermediate PartialMerge stages to tag the bottom Partial so the whole chain falls back to Spark. - CometBaseAggregate.doConvert: generalize the sparkFinalMode guard so a Comet PartialMerge that merges buffers (not just a Final) requires a Comet producer below it, covering the Spark-Partial -> Comet-Merge direction. Fixes the ObjectHashAggregateSuite "[typed, with distinct]" crash. See apache#4813.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
There is no dedicated tracking issue for the feature itself. Prior to this PR the compatibility guide explicitly listed
approx_percentile/percentile_approxas not planned; this PR implements them and removes that note.This PR also fixes #4813, a pre-existing mixed-execution bug that
approx_percentilesurfaced (see below).Rationale for this change
approx_percentile/percentile_approxis a common Spark aggregate that had no Comet support. It was previously considered out of scope on the assumption that its approximate results could not be made bit-identical to Spark, because Spark uses the Greenwald-Khanna (GK)QuantileSummariesalgorithm while DataFusion's built-inapprox_percentile_contuses t-digest, which produces different values.This PR takes the faithful-port approach instead: it reimplements Spark's GK
QuantileSummariesin Rust. Because Comet feeds values to the accumulator in the same intra-partition order as Spark and the GK arithmetic/traversal is reproduced exactly, results are bit-identical to Spark for deterministic plans, and otherwise stay within Spark's own accuracy guarantee (Spark's own result already varies across partitionings for this aggregate). The expression is therefore markedCompatible, with no need forspark.comet.expr.allowIncompatible.What changes are included in this PR?
Native, all-native (partial and final) implementation wired through the standard aggregate layers:
native/spark-expr/src/agg_funcs/quantile_summaries.rs): a bit-for-bit port of Spark'sQuantileSummaries(insert with the 50000-value head-buffer flush, backward compress traversal that preserves the minimum, the non-commutative merge with asymmetric delta adjustment and tie-break, and the ceil-rank query with the ascending multi-percentile sweep). Serialization uses a Comet-internal little-endian layout.native/spark-expr/src/agg_funcs/approx_percentile.rs): wraps one summary per group, carries the digest across partial/final as a singleBinarystate value, and casts each result back to the input type (exact, since GK returns an actual inserted value).native/proto/src/proto/expr.proto,native/core/src/execution/planner.rs): a newApproxPercentilemessage and planner arm.spark/src/main/scala/org/apache/comet/serde/aggregates.scala,QueryPlanSerde.scala):CometApproxPercentile, markedCompatiblefor byte/short/int/long/float/double, supporting the scalar, array-of-percentiles, and explicit-accuracy signatures. A single percentage maps to a scalar result; an array maps to an array result.approx_percentileto the aggregate function tables.Scope: numeric input types byte/short/int/long/float/double. Decimal, date, timestamp, TimestampNTZ, and interval inputs are left for a follow-on PR.
Mixed-execution fix (#4813).
approx_percentilesurfaced a pre-existing bug: in a distinct-aggregate rewrite the non-distinct aggregate'sPartialis separated from itsFinalby intermediatePartialMergestages, and the existing#1389guards only covered a directPartial → Finalpair. So an aggregate with an incompatible intermediate buffer could run part of the chain in Comet and part in Spark, handing a Comet-encoded buffer to a Spark aggregate (or vice versa) — a hard crash forpercentile_approx/collect_set, and a silent-wrong-result risk for others. Fixed with two dual guards:CometExecRule:findPartialAggInPlannow walks through the intermediatePartialMergestages to tag the bottomPartial, so the whole chain falls back to Spark.CometBaseAggregate.doConvert: thesparkFinalModeguard is generalized so a CometPartialMergethat merges buffers (not just aFinal) requires a Comet producer below it, covering the Spark-Partial → Comet-Merge direction.How are these changes tested?
Rust unit tests on
QuantileSummaries(query returns actual inserted values, results within the relative-error bound, multi-percentile matches single, merge within bound, serialization round-trip) and on the accumulator (scalar/array output, empty input yields null, split-then-merge matches single-shot).End-to-end SQL file tests (
spark/src/test/resources/sql-tests/expressions/aggregate/approx_percentile.sql) covering scalar, array of percentiles, non-default accuracy, group-by, and byte/short/int/long/float/double inputs, plus nulls-ignored and empty-input-yields-null. These run each query in Spark with and without Comet and diff the results, and assert native execution (a fallback fails the test), so they are the authoritative Spark-compatibility check. Passing on the Spark 3.5 and 4.1 profiles.Mixed-execution regression tests for Distinct-aggregate rewrite can split an incompatible-buffer aggregate across Comet and Spark, causing crash or wrong results #4813:
CometExecRuleSuiteasserts the whole distinct-rewrite chain falls back to Spark when either the partial or the final is forced non-native (via thecomet.exec.*HashAggregate.enableddebug configs), andCometAggregateSuiterunspercentile_approx+count(DISTINCT ...)across the partial/final split matrix and checks results match Spark.Benchmark results
Added
approx_percentilecases toCometAggregateExpressionBenchmark. Becauseapprox_percentileis aTypedImperativeAggregate(planned asObjectHashAggregate), which only runs natively when Comet shuffle is enabled,the shared
CometBenchmarkBasenow sets the Comet shuffle manager at startup sothe benchmark measures native execution.
Local run, single node (
local[1]), 1,048,576 rows,GROUP BY~1000 groupsunless noted. Best time, lower is better.
approx_percentile(int, 0.5)approx_percentile(long, 0.5)approx_percentile(double, 0.5)approx_percentile(double, 0.9)approx_percentile(double, array(0.25,0.5,0.75))approx_percentile(double, 0.5, 100)approx_percentile(double, 0.5)(no group by)approx_percentile(double, 0.5)(high cardinality)