feat: support PivotFirst aggregate for the optimized PIVOT fast path#4798
Open
andygrove wants to merge 2 commits into
Open
feat: support PivotFirst aggregate for the optimized PIVOT fast path#4798andygrove wants to merge 2 commits into
andygrove wants to merge 2 commits into
Conversation
Wire the Spark-internal PivotFirst aggregate through Comet so that PIVOT queries which trigger Spark's two-phase fast path (12+ pivot values with an aggregate whose output type is in PivotFirst.supportsDataType) run natively instead of falling back. Scaffolded with the implement-comet-expression skill.
- Delegate value-type gate to Spark's `PivotFirst.supportsDataType` instead of maintaining a parallel list in the serde. - Wrap the plan-time pivot map/vector in `Arc` so per-group accumulator init bumps a refcount instead of deep-cloning. - Drop the redundant `pivot_values` field from the accumulator (only the map and `slots.len()` are read at runtime); the accumulator now derives its slot count from the shared map. - Use the shared `format_state_name` helper for state field names. - Delegate `evaluate()` to `state()` so the two paths share their slot materialization. - Walk each state column backwards in `merge_batch` and stop at the first non-null so we build at most one `ScalarValue` per slot per merge. - Drop the "pad every IN clause to 12 values" workaround from the SQL tests and the CometAggregateSuite test - the fast-path gate is supported-data-type only, no minimum count.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
Spark's optimized
PIVOTfast path (PivotTransformer) emits an internalPivotFirstaggregate for the second phase whenever every aggregate output type is inPivotFirst.supportsDataTypeand there are enough pivot values to make the two-phase plan cheaper than the standard filtered-aggregate rewrite.PivotFirstwas previously unsupported in Comet, so any query that hit the fast path fell back to Spark on the second-phaseHashAggregate. This change wires the aggregate end to end so the whole pivot plan runs natively.What changes are included in this PR?
Scaffolded with the
implement-comet-expressionskill.PivotFirst { pivot_column, value_column, pivot_values[], value_datatype }message underAggExpr.CometPivotFirstinspark/src/main/scala/org/apache/comet/serde/aggregates.scala, registered inaggrSerdeMap. Support is gated to the same value types Spark'sPivotFirst.supportsDataTypeaccepts (Boolean,Byte,Short,Int,Long,Float,Double,Decimal);getUnsupportedReasons()and sharedprivate valreason strings keep the Compatibility Guide and EXPLAIN output aligned.SparkPivotFirstUDAF andPivotFirstAccumulatorinnative/spark-expr/src/agg_funcs/pivot_first.rs. State layout is one scalar column per pivot slot (matching Spark'saggBufferAttributes) so the shuffle exchange schema betweenPartialandFinalstays consistent;evaluate()reassembles the slots into aListArraymatchingPivotFirst.dataType = ArrayType(valueDataType).native/core/src/execution/planner.rsdecodes the serialized pivot-valueLiteralexpressions back intoScalarValues and constructs the UDAF.docs/source/user-guide/latest/expressions.mdand audit notes indocs/source/contributor-guide/expression-audits/agg_funcs.md(Spark 3.4.3 / 3.5.8 / 4.0.1 / 4.1.1 all behave identically for our purposes; a defensivefindPivotIndexwrapper only appears in Sparkmasterand does not affect Comet because we look up viaHashMap<ScalarValue, usize>which handlesScalarValue::Nullsafely).How are these changes tested?
pivot_first.rscover: write by matching pivot index, ignore unmatched pivot values, ignore null value column,evaluate()produces aListArraywith typed nulls in empty slots,state()returns one scalar per slot, andmerge_batchcombines partial states across the shuffle.spark/src/test/resources/sql-tests/expressions/aggregate/PivotFirst.sqlruns 14PIVOTqueries covering every supported value type (Boolean, Byte, Short, Int, Long, Float, Double, Decimal), unmatched pivot values, groups with no matching rows, null in the value column, null in the pivot column (Spark's "should not throw NPE" case), non-string pivot columns (Int, Date), and multiple aggregates in one pivot.CometAggregateSuite."PivotFirst runs natively on the optimized pivot fast path"inspects the physical plan and asserts aCometHashAggregateExecactually wraps aPivotFirst, so a regression that pushes the aggregate back to Spark fails loudly.