Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/source/contributor-guide/expression-audits/agg_funcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,12 @@
- Spark 4.1.1 (audited 2026-06-24): identical to 4.0.1.
- `CometPercentile` reports `Incompatible` for the otherwise-supported form because DataFusion's `percentile_cont` quantizes the interpolation weight to 6 decimal places (`INTERPOLATION_PRECISION = 1e6`), so a deeply-interpolated value can differ from Spark by up to roughly `(upper - lower) * 1e-6`. The native path is opt-in via `spark.comet.expression.Percentile.allowIncompatible=true` ([#4719](https://github.com/apache/datafusion-comet/issues/4719)).

## pivot_first

- Spark 3.4.3 (audited 2026-07-02): `PivotFirst(pivotColumn, valueColumn, pivotColumnValues)` is an internal `ImperativeAggregate` emitted only by the optimized-pivot fast path in `Analyzer.ResolvePivot`. It buckets `valueColumn` into an array of length `pivotColumnValues.size` indexed by matching `pivotColumn`. Null value columns are ignored, unmatched pivot values are dropped. Value types are gated by `PivotFirst.supportsDataType` (Boolean, Byte, Short, Int, Long, Float, Double, Decimal).
- Spark 3.5.8 (audited 2026-07-02): swaps `StructType.fromAttributes` for `DataTypeUtils.fromAttributes`; no behavior change.
- Spark 4.0.1 (audited 2026-07-02): identical to 3.5.8.
- Spark 4.1.1 (audited 2026-07-02): identical to 4.0.1. (Spark `master` adds a defensive `findPivotIndex` wrapper that returns `-1` for null keys on the non-`AtomicType` `TreeMap` path; not present in any released version Comet builds against, and Comet's HashMap-backed lookup handles `ScalarValue::Null` safely on all types.)
- `CometPivotFirst` (in `spark/src/main/scala/org/apache/comet/serde/aggregates.scala`) forwards the aggregate to the native `SparkPivotFirst` UDAF (`native/spark-expr/src/agg_funcs/pivot_first.rs`) when the value type is in the supported set. State layout matches Spark's `aggBufferAttributes` (one scalar column per pivot slot) so the shuffle schema between Partial and Final stays consistent. `evaluate()` reassembles the slots into a `ListArray` matching `PivotFirst.dataType = ArrayType(valueDataType)`.

[Spark Expression Support]: ../../user-guide/latest/expressions.md
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The tables below list every Spark built-in expression with its current status.
| `percentile` | ✅ | Single literal percentage on numeric input; array of percentages and a frequency argument fall back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `percentile_cont` | ✅ | Spark 4.0+ `WITHIN GROUP (ORDER BY ...)`; ascending only, `DESC` falls back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `percentile_disc` | 🔜 | Percentile aggregate |
| `pivot_first` | ✅ | Internal aggregate for the optimized `PIVOT` fast path. Value type must be in `PivotFirst.supportsDataType` (Boolean, Byte, Short, Int, Long, Float, Double, Decimal); other types keep Spark's standard filtered-aggregate path. |
| `regr_avgx` | ✅ | Native: Spark rewrites to `Average` (tests in [#4551](https://github.com/apache/datafusion-comet/issues/4551)) |
| `regr_avgy` | ✅ | Native: Spark rewrites to `Average` (tests in [#4551](https://github.com/apache/datafusion-comet/issues/4551)) |
| `regr_count` | ✅ | Native: Spark rewrites to `Count` (tests in [#4551](https://github.com/apache/datafusion-comet/issues/4551)) |
Expand Down
26 changes: 25 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use datafusion::{
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SparkArraysZipFunc,
SparkBloomFilterVersion, SumInteger, ToCsv,
SparkBloomFilterVersion, SparkPivotFirst, SumInteger, ToCsv,
};
use datafusion_spark::function::aggregate::collect::SparkCollectSet;
use iceberg::expr::Bind;
Expand Down Expand Up @@ -2653,6 +2653,30 @@ impl PhysicalPlanner {
let func = AggregateUDF::new_from_impl(SparkCollectSet::new());
Self::create_aggr_func_expr("collect_set", schema, vec![child], func)
}
AggExprStruct::PivotFirst(expr) => {
let pivot_col =
self.create_expr(expr.pivot_column.as_ref().unwrap(), Arc::clone(&schema))?;
let value_col =
self.create_expr(expr.value_column.as_ref().unwrap(), Arc::clone(&schema))?;
let value_type = to_arrow_datatype(expr.value_datatype.as_ref().unwrap());
// Reconstruct pivot values as ScalarValues from the serialized Literal exprs.
// The Scala serde builds them as Literal(v, pivot_column.dataType), so extracting
// via Literal downcast gives us the exact ScalarValue the update path compares
// against per input row.
let mut pivot_values = Vec::with_capacity(expr.pivot_values.len());
for lit_expr in &expr.pivot_values {
let physical = self.create_expr(lit_expr, Arc::clone(&schema))?;
let literal = physical.downcast_ref::<Literal>().ok_or_else(|| {
GeneralError(
"PivotFirst pivot_values must be Literal expressions".to_string(),
)
})?;
pivot_values.push(literal.value().clone());
}
let func =
AggregateUDF::new_from_impl(SparkPivotFirst::new(value_type, pivot_values));
Self::create_aggr_func_expr("pivot_first", schema, vec![pivot_col, value_col], func)
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ message AggExpr {
BloomFilterAgg bloomFilterAgg = 16;
CollectSet collectSet = 17;
Percentile percentile = 18;
PivotFirst pivotFirst = 19;
}

// Optional filter expression for SQL FILTER (WHERE ...) clause.
Expand Down Expand Up @@ -276,6 +277,21 @@ message CollectSet {
DataType datatype = 2;
}

// Optimized Pivot second-phase aggregate. Given a `pivot_column` and a
// pre-computed list of `pivot_values`, buckets the incoming `value_column`
// into an array whose length is `pivot_values.size()`. Rows whose pivot
// value is not in `pivot_values` are ignored; if multiple rows in the same
// group map to the same bucket, the last non-null value wins (matches
// Spark's `PivotFirst.update`/`merge`). Output type is
// ArrayType(value_datatype). The pivot_values are serialized as Literal
// Exprs so the value bytes and data type reach native code faithfully.
message PivotFirst {
Expr pivot_column = 1;
Expr value_column = 2;
repeated Expr pivot_values = 3;
DataType value_datatype = 4;
}

enum EvalMode {
LEGACY = 0;
TRY = 1;
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/agg_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod avg;
mod avg_decimal;
mod correlation;
mod covariance;
mod pivot_first;
mod stddev;
mod sum_decimal;
mod sum_int;
Expand All @@ -29,6 +30,7 @@ pub use avg::Avg;
pub use avg_decimal::AvgDecimal;
pub use correlation::Correlation;
pub use covariance::Covariance;
pub use pivot_first::SparkPivotFirst;
pub use stddev::Stddev;
pub use sum_decimal::SumDecimal;
pub use sum_int::SumInteger;
Expand Down
Loading
Loading