Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/source/contributor-guide/expression-audits/agg_funcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@
- Spark 3.5.8 (2026-05-26)
- Spark 4.0.1 (2026-05-26)

## kurtosis

- Spark 3.4.3 (audited 2026-07-03): `Kurtosis(child, nullOnDivideByZero)` extends `CentralMomentAgg` with `momentOrder = 4`. Excess kurtosis (Fisher), formula `n * m4 / (m2 * m2) - 3.0`; empty group → `NULL`; `m2 == 0` → `NULL` when `nullOnDivideByZero=true` (default when `spark.sql.legacy.statisticalAggregate=false`) else `NaN`. Any numeric input is cast to `Double` by `ImplicitCastInputTypes`.
- Spark 3.5.8 (audited 2026-07-03): identical to 3.4.3.
- Spark 4.0.1 (audited 2026-07-03): identical to 3.4.3. No collation involvement.
- Spark 4.1.1 (audited 2026-07-03): identical to 3.4.3.
- `CometKurtosis` maps the aggregate to a Comet-owned `Kurtosis` UDAF whose intermediate state (`[n, avg, m2, m3, m4]` Float64) mirrors Spark's `CentralMomentAgg` buffer for `momentOrder = 4`, so Partial output produced by either engine has the same wire format. The Rust update/merge kernels are a direct port of Spark's `updateExpressionsDef` and `mergeExpressions`. `supportsMixedPartialFinal` is left at the default `false`, matching the conservative policy the other `CentralMomentAgg` serdes (`Variance`, `Stddev`) already use in the same file.
- Window use (`kurtosis(x) OVER (...)`) currently falls back to Spark: the window path doesn't wire the Comet aggregate for kurtosis today.

## median

- Spark 3.4.3 (audited 2026-06-24): `Median(child)` is a `RuntimeReplaceableAggregate` with `replacement = Percentile(child, Literal(0.5))`. Catalyst rewrites `median(x)` to `percentile(x, 0.5)` before Comet sees the plan, so it is served by `CometPercentile`.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ The tables below list every Spark built-in expression with its current status.
| `first_value` | ✅ | |
| `grouping` | 🔜 | Grouping indicator for ROLLUP/CUBE/GROUPING SETS |
| `grouping_id` | 🔜 | Grouping indicator for ROLLUP/CUBE/GROUPING SETS |
| `kurtosis` | 🔜 | tracking [#4098](https://github.com/apache/datafusion-comet/issues/4098) |
| `kurtosis` | | Excess kurtosis (Fisher definition). |
| `last` | ✅ | |
| `last_value` | ✅ | |
| `listagg` | 🔜 | String aggregation |
Expand Down
12 changes: 10 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ use datafusion_comet_proto::{
use datafusion_comet_spark_expr::{
jvm_udf::JvmScalarUdfExpr, ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation,
Covariance, CreateNamedStruct, DecimalRescaleCheckOverflow, GetArrayStructFields,
GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal,
ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
GetStructField, IfExpr, Kurtosis, ListExtract, NormalizeNaNAndZero, SparkCastOptions, Stddev,
SumDecimal, ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
};
use itertools::Itertools;
use jni::objects::{Global, JObject};
Expand Down Expand Up @@ -2653,6 +2653,14 @@ impl PhysicalPlanner {
let func = AggregateUDF::new_from_impl(SparkCollectSet::new());
Self::create_aggr_func_expr("collect_set", schema, vec![child], func)
}
AggExprStruct::Kurtosis(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?;
let func = AggregateUDF::new_from_impl(Kurtosis::new(
"kurtosis",
expr.null_on_divide_by_zero,
));
Self::create_aggr_func_expr("kurtosis", schema, vec![child], func)
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message AggExpr {
BloomFilterAgg bloomFilterAgg = 16;
CollectSet collectSet = 17;
Percentile percentile = 18;
Kurtosis kurtosis = 19;
}

// Optional filter expression for SQL FILTER (WHERE ...) clause.
Expand Down Expand Up @@ -240,6 +241,15 @@ message Stddev {
StatisticsType stats_type = 4;
}

// Excess kurtosis (Fisher definition: normal distribution -> 0). Spark's
// intermediate buffer is `[n, avg, m2, m3, m4]` of Float64 to match the
// `CentralMomentAgg` DeclarativeAggregate wire format for mixed
// Partial/Final execution with Spark.
message Kurtosis {
Expr child = 1;
bool null_on_divide_by_zero = 2;
}

message Correlation {
Expr child1 = 1;
Expr child2 = 2;
Expand Down
Loading
Loading