Skip to content

feat: support PivotFirst aggregate for the optimized PIVOT fast path#4798

Open
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:feat/pivot-first
Open

feat: support PivotFirst aggregate for the optimized PIVOT fast path#4798
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:feat/pivot-first

Conversation

@andygrove

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #.

Rationale for this change

Spark's optimized PIVOT fast path (PivotTransformer) emits an internal PivotFirst aggregate for the second phase whenever every aggregate output type is in PivotFirst.supportsDataType and there are enough pivot values to make the two-phase plan cheaper than the standard filtered-aggregate rewrite. PivotFirst was previously unsupported in Comet, so any query that hit the fast path fell back to Spark on the second-phase HashAggregate. This change wires the aggregate end to end so the whole pivot plan runs natively.

What changes are included in this PR?

Scaffolded with the implement-comet-expression skill.

  • Protobuf: new PivotFirst { pivot_column, value_column, pivot_values[], value_datatype } message under AggExpr.
  • Scala serde: CometPivotFirst in spark/src/main/scala/org/apache/comet/serde/aggregates.scala, registered in aggrSerdeMap. Support is gated to the same value types Spark's PivotFirst.supportsDataType accepts (Boolean, Byte, Short, Int, Long, Float, Double, Decimal); getUnsupportedReasons() and shared private val reason strings keep the Compatibility Guide and EXPLAIN output aligned.
  • Native aggregate: SparkPivotFirst UDAF and PivotFirstAccumulator in native/spark-expr/src/agg_funcs/pivot_first.rs. State layout is one scalar column per pivot slot (matching Spark's aggBufferAttributes) so the shuffle exchange schema between Partial and Final stays consistent; evaluate() reassembles the slots into a ListArray matching PivotFirst.dataType = ArrayType(valueDataType).
  • Planner wire-up: native/core/src/execution/planner.rs decodes the serialized pivot-value Literal expressions back into ScalarValues and constructs the UDAF.
  • Docs: entry in docs/source/user-guide/latest/expressions.md and audit notes in docs/source/contributor-guide/expression-audits/agg_funcs.md (Spark 3.4.3 / 3.5.8 / 4.0.1 / 4.1.1 all behave identically for our purposes; a defensive findPivotIndex wrapper only appears in Spark master and does not affect Comet because we look up via HashMap<ScalarValue, usize> which handles ScalarValue::Null safely).

How are these changes tested?

  • 6 Rust unit tests in pivot_first.rs cover: write by matching pivot index, ignore unmatched pivot values, ignore null value column, evaluate() produces a ListArray with typed nulls in empty slots, state() returns one scalar per slot, and merge_batch combines partial states across the shuffle.
  • spark/src/test/resources/sql-tests/expressions/aggregate/PivotFirst.sql runs 14 PIVOT queries covering every supported value type (Boolean, Byte, Short, Int, Long, Float, Double, Decimal), unmatched pivot values, groups with no matching rows, null in the value column, null in the pivot column (Spark's "should not throw NPE" case), non-string pivot columns (Int, Date), and multiple aggregates in one pivot.
  • CometAggregateSuite."PivotFirst runs natively on the optimized pivot fast path" inspects the physical plan and asserts a CometHashAggregateExec actually wraps a PivotFirst, so a regression that pushes the aggregate back to Spark fails loudly.

andygrove added 2 commits July 2, 2026 14:13
Wire the Spark-internal PivotFirst aggregate through Comet so that
PIVOT queries which trigger Spark's two-phase fast path (12+ pivot
values with an aggregate whose output type is in
PivotFirst.supportsDataType) run natively instead of falling back.

Scaffolded with the implement-comet-expression skill.
- Delegate value-type gate to Spark's `PivotFirst.supportsDataType` instead
  of maintaining a parallel list in the serde.
- Wrap the plan-time pivot map/vector in `Arc` so per-group accumulator
  init bumps a refcount instead of deep-cloning.
- Drop the redundant `pivot_values` field from the accumulator (only the
  map and `slots.len()` are read at runtime); the accumulator now derives
  its slot count from the shared map.
- Use the shared `format_state_name` helper for state field names.
- Delegate `evaluate()` to `state()` so the two paths share their slot
  materialization.
- Walk each state column backwards in `merge_batch` and stop at the first
  non-null so we build at most one `ScalarValue` per slot per merge.
- Drop the "pad every IN clause to 12 values" workaround from the SQL
  tests and the CometAggregateSuite test - the fast-path gate is
  supported-data-type only, no minimum count.
@andygrove andygrove added this to the 1.0.0 milestone Jul 3, 2026
@andygrove andygrove marked this pull request as ready for review July 3, 2026 22:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant