From 31e75fe69c1b599424cfbff28b7f78c7c5116358 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 Jul 2026 10:10:14 -0600 Subject: [PATCH] feat: support kurtosis aggregate Adds a Comet-owned `Kurtosis` UDAF and `CometKurtosis` serde so that Spark's excess-kurtosis aggregate runs natively. The native accumulator stores `[n, avg, m2, m3, m4]` `Float64` state to mirror Spark's `CentralMomentAgg` (momentOrder = 4) buffer, and the update/merge kernels are a direct port of Spark's `updateExpressionsDef` / `mergeExpressions` expressions. `nullOnDivideByZero` is threaded through so `spark.sql.legacy.statisticalAggregate` both branches behave the same as Spark (NULL vs NaN when `m2 == 0`). Window use falls back to Spark today: the window path doesn't wire the Comet aggregate for kurtosis. Captured as an `expect_fallback` case in the SQL test. Scaffolding produced by the `implement-comet-expression` skill; audit performed by the `audit-comet-expression` skill. --- .../expression-audits/agg_funcs.md | 9 + docs/source/user-guide/latest/expressions.md | 2 +- native/core/src/execution/planner.rs | 12 +- native/proto/src/proto/expr.proto | 10 + native/spark-expr/src/agg_funcs/kurtosis.rs | 363 ++++++++++++++++++ native/spark-expr/src/agg_funcs/mod.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/serde/aggregates.scala | 35 +- .../expressions/aggregate/kurtosis.sql | 273 +++++++++++++ .../expressions/aggregate/kurtosis_legacy.sql | 41 ++ 10 files changed, 744 insertions(+), 4 deletions(-) create mode 100644 native/spark-expr/src/agg_funcs/kurtosis.rs create mode 100644 spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis_legacy.sql diff --git a/docs/source/contributor-guide/expression-audits/agg_funcs.md b/docs/source/contributor-guide/expression-audits/agg_funcs.md index fb27662b49..f1d163800e 100644 --- a/docs/source/contributor-guide/expression-audits/agg_funcs.md +++ b/docs/source/contributor-guide/expression-audits/agg_funcs.md @@ -39,6 +39,15 @@ - Spark 3.5.8 (2026-05-26) - Spark 4.0.1 (2026-05-26) +## kurtosis + +- Spark 3.4.3 (audited 2026-07-03): `Kurtosis(child, nullOnDivideByZero)` extends `CentralMomentAgg` with `momentOrder = 4`. Excess kurtosis (Fisher), formula `n * m4 / (m2 * m2) - 3.0`; empty group → `NULL`; `m2 == 0` → `NULL` when `nullOnDivideByZero=true` (default when `spark.sql.legacy.statisticalAggregate=false`) else `NaN`. Any numeric input is cast to `Double` by `ImplicitCastInputTypes`. +- Spark 3.5.8 (audited 2026-07-03): identical to 3.4.3. +- Spark 4.0.1 (audited 2026-07-03): identical to 3.4.3. No collation involvement. +- Spark 4.1.1 (audited 2026-07-03): identical to 3.4.3. +- `CometKurtosis` maps the aggregate to a Comet-owned `Kurtosis` UDAF whose intermediate state (`[n, avg, m2, m3, m4]` Float64) mirrors Spark's `CentralMomentAgg` buffer for `momentOrder = 4`, so Partial output produced by either engine has the same wire format. The Rust update/merge kernels are a direct port of Spark's `updateExpressionsDef` and `mergeExpressions`. `supportsMixedPartialFinal` is left at the default `false`, matching the conservative policy the other `CentralMomentAgg` serdes (`Variance`, `Stddev`) already use in the same file. +- Window use (`kurtosis(x) OVER (...)`) currently falls back to Spark: the window path doesn't wire the Comet aggregate for kurtosis today. + ## median - Spark 3.4.3 (audited 2026-06-24): `Median(child)` is a `RuntimeReplaceableAggregate` with `replacement = Percentile(child, Literal(0.5))`. Catalyst rewrites `median(x)` to `percentile(x, 0.5)` before Comet sees the plan, so it is served by `CometPercentile`. diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index d7c86aad1f..605c00c913 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -90,7 +90,7 @@ The tables below list every Spark built-in expression with its current status. | `first_value` | ✅ | | | `grouping` | 🔜 | Grouping indicator for ROLLUP/CUBE/GROUPING SETS | | `grouping_id` | 🔜 | Grouping indicator for ROLLUP/CUBE/GROUPING SETS | -| `kurtosis` | 🔜 | tracking [#4098](https://github.com/apache/datafusion-comet/issues/4098) | +| `kurtosis` | ✅ | Excess kurtosis (Fisher definition). | | `last` | ✅ | | | `last_value` | ✅ | | | `listagg` | 🔜 | String aggregation | diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 25162332fd..26e7a0b2a9 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -130,8 +130,8 @@ use datafusion_comet_proto::{ use datafusion_comet_spark_expr::{ jvm_udf::JvmScalarUdfExpr, ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct, DecimalRescaleCheckOverflow, GetArrayStructFields, - GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal, - ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp, + GetStructField, IfExpr, Kurtosis, ListExtract, NormalizeNaNAndZero, SparkCastOptions, Stddev, + SumDecimal, ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp, }; use itertools::Itertools; use jni::objects::{Global, JObject}; @@ -2653,6 +2653,14 @@ impl PhysicalPlanner { let func = AggregateUDF::new_from_impl(SparkCollectSet::new()); Self::create_aggr_func_expr("collect_set", schema, vec![child], func) } + AggExprStruct::Kurtosis(expr) => { + let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; + let func = AggregateUDF::new_from_impl(Kurtosis::new( + "kurtosis", + expr.null_on_divide_by_zero, + )); + Self::create_aggr_func_expr("kurtosis", schema, vec![child], func) + } } } diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 32adc16b72..f6d4320f88 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -146,6 +146,7 @@ message AggExpr { BloomFilterAgg bloomFilterAgg = 16; CollectSet collectSet = 17; Percentile percentile = 18; + Kurtosis kurtosis = 19; } // Optional filter expression for SQL FILTER (WHERE ...) clause. @@ -240,6 +241,15 @@ message Stddev { StatisticsType stats_type = 4; } +// Excess kurtosis (Fisher definition: normal distribution -> 0). Spark's +// intermediate buffer is `[n, avg, m2, m3, m4]` of Float64 to match the +// `CentralMomentAgg` DeclarativeAggregate wire format for mixed +// Partial/Final execution with Spark. +message Kurtosis { + Expr child = 1; + bool null_on_divide_by_zero = 2; +} + message Correlation { Expr child1 = 1; Expr child2 = 2; diff --git a/native/spark-expr/src/agg_funcs/kurtosis.rs b/native/spark-expr/src/agg_funcs/kurtosis.rs new file mode 100644 index 0000000000..c85ab7482c --- /dev/null +++ b/native/spark-expr/src/agg_funcs/kurtosis.rs @@ -0,0 +1,363 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spark-compatible excess-kurtosis aggregate. +//! +//! Spark's `Kurtosis` is a `CentralMomentAgg` (`DeclarativeAggregate`) whose +//! intermediate buffer is `[n, avg, m2, m3, m4]` of Float64. This accumulator +//! mirrors that buffer exactly, using the same higher-order online update / +//! merge recurrences (Meng 2015) that `CentralMomentAgg` compiles into +//! catalyst expressions. Matching the wire format lets Spark's Partial and +//! Comet's Final (or vice versa) share intermediate state without a cast. +//! +//! Result formula (excess kurtosis, Fisher definition): +//! +//! * `n == 0` -> NULL +//! * `m2 == 0` -> NULL when `null_on_divide_by_zero`, else NaN +//! * otherwise -> `n * m4 / (m2 * m2) - 3.0` + +use std::mem::size_of; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Float64Array}; +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion::common::{downcast_value, Result, ScalarValue}; +use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion::logical_expr::Volatility::Immutable; +use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature}; +use datafusion::physical_expr::expressions::format_state_name; + +#[derive(Debug, PartialEq, Eq)] +pub struct Kurtosis { + name: String, + signature: Signature, + null_on_divide_by_zero: bool, +} + +impl std::hash::Hash for Kurtosis { + fn hash(&self, state: &mut H) { + self.name.hash(state); + self.signature.hash(state); + self.null_on_divide_by_zero.hash(state); + } +} + +impl Kurtosis { + pub fn new(name: impl Into, null_on_divide_by_zero: bool) -> Self { + Self { + name: name.into(), + signature: Signature::numeric(1, Immutable), + null_on_divide_by_zero, + } + } +} + +impl AggregateUDFImpl for Kurtosis { + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { + Ok(Box::new(KurtosisAccumulator::new( + self.null_on_divide_by_zero, + ))) + } + + // Fields ordered to match Spark's `[n, avg, m2, m3, m4]` buffer so that a + // Spark-produced Partial state can be merged into a Comet-produced Final + // (and vice versa) without a schema conversion. + fn state_fields(&self, _args: StateFieldsArgs) -> Result> { + Ok(vec![ + Arc::new(Field::new( + format_state_name(&self.name, "n"), + DataType::Float64, + true, + )), + Arc::new(Field::new( + format_state_name(&self.name, "avg"), + DataType::Float64, + true, + )), + Arc::new(Field::new( + format_state_name(&self.name, "m2"), + DataType::Float64, + true, + )), + Arc::new(Field::new( + format_state_name(&self.name, "m3"), + DataType::Float64, + true, + )), + Arc::new(Field::new( + format_state_name(&self.name, "m4"), + DataType::Float64, + true, + )), + ]) + } + + fn default_value(&self, _data_type: &DataType) -> Result { + Ok(ScalarValue::Float64(None)) + } +} + +/// Online update for the first four central moments. Direct port of Spark's +/// `CentralMomentAgg.updateExpressionsDef` for `momentOrder = 4`. +#[inline] +fn kurtosis_update( + n: f64, + avg: f64, + m2: f64, + m3: f64, + m4: f64, + value: f64, +) -> (f64, f64, f64, f64, f64) { + let new_n = n + 1.0; + let delta = value - avg; + let delta_n = delta / new_n; + let new_avg = avg + delta_n; + let new_m2 = m2 + delta * (delta - delta_n); + let delta2 = delta * delta; + let delta_n2 = delta_n * delta_n; + let new_m3 = m3 - 3.0 * delta_n * new_m2 + delta * (delta2 - delta_n2); + let new_m4 = m4 - 4.0 * delta_n * new_m3 - 6.0 * delta_n2 * new_m2 + + delta * (delta * delta2 - delta_n * delta_n2); + (new_n, new_avg, new_m2, new_m3, new_m4) +} + +/// Merge two partial states. Direct port of Spark's +/// `CentralMomentAgg.mergeExpressions` for `momentOrder = 4`. +#[inline] +#[allow(clippy::too_many_arguments)] +fn kurtosis_merge( + n1: f64, + avg1: f64, + m2_1: f64, + m3_1: f64, + m4_1: f64, + n2: f64, + avg2: f64, + m2_2: f64, + m3_2: f64, + m4_2: f64, +) -> (f64, f64, f64, f64, f64) { + let new_n = n1 + n2; + let delta = avg2 - avg1; + let delta_n = if new_n == 0.0 { 0.0 } else { delta / new_n }; + let new_avg = avg1 + delta_n * n2; + let new_m2 = m2_1 + m2_2 + delta * delta_n * n1 * n2; + let new_m3 = m3_1 + + m3_2 + + delta_n * delta_n * delta * n1 * n2 * (n1 - n2) + + 3.0 * delta_n * (n1 * m2_2 - n2 * m2_1); + let new_m4 = m4_1 + + m4_2 + + delta_n * delta_n * delta_n * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) + + 6.0 * delta_n * delta_n * (n1 * n1 * m2_2 + n2 * n2 * m2_1) + + 4.0 * delta_n * (n1 * m3_2 - n2 * m3_1); + (new_n, new_avg, new_m2, new_m3, new_m4) +} + +#[derive(Debug)] +pub struct KurtosisAccumulator { + n: f64, + avg: f64, + m2: f64, + m3: f64, + m4: f64, + null_on_divide_by_zero: bool, +} + +impl KurtosisAccumulator { + pub fn new(null_on_divide_by_zero: bool) -> Self { + Self { + n: 0.0, + avg: 0.0, + m2: 0.0, + m3: 0.0, + m4: 0.0, + null_on_divide_by_zero, + } + } +} + +impl Accumulator for KurtosisAccumulator { + fn state(&mut self) -> Result> { + Ok(vec![ + ScalarValue::from(self.n), + ScalarValue::from(self.avg), + ScalarValue::from(self.m2), + ScalarValue::from(self.m3), + ScalarValue::from(self.m4), + ]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let arr = downcast_value!(&values[0], Float64Array).iter().flatten(); + for value in arr { + let (n, avg, m2, m3, m4) = + kurtosis_update(self.n, self.avg, self.m2, self.m3, self.m4, value); + self.n = n; + self.avg = avg; + self.m2 = m2; + self.m3 = m3; + self.m4 = m4; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let ns = downcast_value!(states[0], Float64Array); + let avgs = downcast_value!(states[1], Float64Array); + let m2s = downcast_value!(states[2], Float64Array); + let m3s = downcast_value!(states[3], Float64Array); + let m4s = downcast_value!(states[4], Float64Array); + + for i in 0..ns.len() { + let n2 = ns.value(i); + if n2 == 0.0 { + // Empty partial state contributes nothing and would produce + // divide-by-zero garbage in `delta_n`; skip it. + continue; + } + let (n, avg, m2, m3, m4) = kurtosis_merge( + self.n, + self.avg, + self.m2, + self.m3, + self.m4, + n2, + avgs.value(i), + m2s.value(i), + m3s.value(i), + m4s.value(i), + ); + self.n = n; + self.avg = avg; + self.m2 = m2; + self.m3 = m3; + self.m4 = m4; + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::Float64(if self.n == 0.0 { + None + } else if self.m2 == 0.0 { + if self.null_on_divide_by_zero { + None + } else { + Some(f64::NAN) + } + } else { + Some(self.n * self.m4 / (self.m2 * self.m2) - 3.0) + })) + } + + fn size(&self) -> usize { + size_of::() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn eval(values: &[f64], null_on_divide_by_zero: bool) -> Option { + let mut acc = KurtosisAccumulator::new(null_on_divide_by_zero); + let arr: ArrayRef = Arc::new(Float64Array::from(values.to_vec())); + acc.update_batch(&[arr]).unwrap(); + match acc.evaluate().unwrap() { + ScalarValue::Float64(v) => v, + other => panic!("expected Float64, got {other:?}"), + } + } + + #[test] + fn empty_group_returns_null() { + assert_eq!(eval(&[], true), None); + } + + #[test] + fn single_value_returns_divide_by_zero_result() { + // m2 == 0 with a single value => NULL when null_on_divide_by_zero, else NaN. + assert_eq!(eval(&[42.0], true), None); + let nan = eval(&[42.0], false).unwrap(); + assert!(nan.is_nan(), "expected NaN, got {nan}"); + } + + #[test] + fn matches_spark_example() { + // Spark's own example from ExpressionDescription: + // SELECT kurtosis(col) FROM VALUES (-10), (-20), (100), (1000) AS tab(col); + // => -0.7014368047529627 + let got = eval(&[-10.0, -20.0, 100.0, 1000.0], true).unwrap(); + assert!((got - -0.7014368047529627_f64).abs() < 1e-12, "got {got}"); + } + + #[test] + fn matches_spark_second_example() { + // SELECT kurtosis(col) FROM VALUES (1), (10), (100), (10), (1) as tab(col); + // => 0.19432323191699075 + let got = eval(&[1.0, 10.0, 100.0, 10.0, 1.0], true).unwrap(); + assert!((got - 0.19432323191699075_f64).abs() < 1e-12, "got {got}"); + } + + #[test] + fn merge_produces_same_result_as_single_batch() { + // Merging two partitions must reproduce the single-batch result. + let values = [-10.0_f64, -20.0, 100.0, 1000.0]; + let full = eval(&values, true).unwrap(); + + let arr_a: ArrayRef = Arc::new(Float64Array::from(values[..2].to_vec())); + let arr_b: ArrayRef = Arc::new(Float64Array::from(values[2..].to_vec())); + + let mut a = KurtosisAccumulator::new(true); + a.update_batch(&[arr_a]).unwrap(); + let state_a = a.state().unwrap(); + + let mut b = KurtosisAccumulator::new(true); + b.update_batch(&[arr_b]).unwrap(); + + // Represent partition-A state as five single-row Float64 arrays and merge. + let state_arrays: Vec = state_a + .into_iter() + .map(|sv| match sv { + ScalarValue::Float64(v) => { + Arc::new(Float64Array::from(vec![v.unwrap()])) as ArrayRef + } + other => panic!("unexpected state scalar {other:?}"), + }) + .collect(); + b.merge_batch(&state_arrays).unwrap(); + + let merged = match b.evaluate().unwrap() { + ScalarValue::Float64(Some(v)) => v, + other => panic!("expected Float64(Some(_)), got {other:?}"), + }; + assert!((merged - full).abs() < 1e-9, "merged={merged}, full={full}"); + } +} diff --git a/native/spark-expr/src/agg_funcs/mod.rs b/native/spark-expr/src/agg_funcs/mod.rs index 2a0322e46c..71e279ba34 100644 --- a/native/spark-expr/src/agg_funcs/mod.rs +++ b/native/spark-expr/src/agg_funcs/mod.rs @@ -19,6 +19,7 @@ mod avg; mod avg_decimal; mod correlation; mod covariance; +mod kurtosis; mod stddev; mod sum_decimal; mod sum_int; @@ -29,6 +30,7 @@ pub use avg::Avg; pub use avg_decimal::AvgDecimal; pub use correlation::Correlation; pub use covariance::Covariance; +pub use kurtosis::Kurtosis; pub use stddev::Stddev; pub use sum_decimal::SumDecimal; pub use sum_int::SumInteger; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7146eaec9b..19973f5cf8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -397,6 +397,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[CovPopulation] -> CometCovPopulation, classOf[CovSample] -> CometCovSample, classOf[First] -> CometFirst, + classOf[Kurtosis] -> CometKurtosis, classOf[Last] -> CometLast, classOf[Max] -> CometMax, classOf[Min] -> CometMin, diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index 5710232cb4..867faccbf2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -22,7 +22,7 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, CollectSet, Corr, Count, Covariance, CovPopulation, CovSample, First, Last, Max, Min, Percentile, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, CollectSet, Corr, Count, Covariance, CovPopulation, CovSample, First, Kurtosis, Last, Max, Min, Percentile, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ByteType, DecimalType, DoubleType, IntegerType, LongType, NumericType, ShortType, StringType} @@ -823,6 +823,39 @@ object CometCollectSet extends CometAggregateExpressionSerde[CollectSet] { } } +object CometKurtosis extends CometAggregateExpressionSerde[Kurtosis] { + + // Not marked safe for mixed partial/final: follows the same policy as `Variance` / `Stddev`, + // whose complex `[n, avg, m2, ...]` buffer is not certified compatible across engines. The + // native accumulator does mirror Spark's `[n, avg, m2, m3, m4]` wire format, so lifting this + // to `true` should be considered together with the other `CentralMomentAgg` serdes. + + override def convert( + aggExpr: AggregateExpression, + kurtosis: Kurtosis, + inputs: Seq[Attribute], + binding: Boolean, + conf: SQLConf): Option[ExprOuterClass.AggExpr] = { + val child = kurtosis.child + val childExpr = exprToProto(child, inputs, binding) + + if (childExpr.isDefined) { + val builder = ExprOuterClass.Kurtosis.newBuilder() + builder.setChild(childExpr.get) + builder.setNullOnDivideByZero(kurtosis.nullOnDivideByZero) + + Some( + ExprOuterClass.AggExpr + .newBuilder() + .setKurtosis(builder) + .build()) + } else { + withFallbackReason(aggExpr, child) + None + } + } +} + object AggSerde { import org.apache.spark.sql.types._ diff --git a/spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis.sql b/spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis.sql new file mode 100644 index 0000000000..ba8b370a4f --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis.sql @@ -0,0 +1,273 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- ============================================================ +-- Setup +-- ============================================================ + +statement +CREATE TABLE k_dbl(v double, grp string) USING parquet + +statement +INSERT INTO k_dbl VALUES + (-10.0, 'g1'), (-20.0, 'g1'), (100.0, 'g1'), (1000.0, 'g1'), + (1.0, 'g2'), (10.0, 'g2'), (100.0, 'g2'), (10.0, 'g2'), (1.0, 'g2'), + (42.0, 'g3'), + (NULL, 'g4'), (NULL, 'g4'), + (7.0, 'g5'), (7.0, 'g5'), (7.0, 'g5') + +statement +CREATE TABLE k_int(v int, grp string) USING parquet + +statement +INSERT INTO k_int VALUES + (1, 'g1'), (10, 'g1'), (100, 'g1'), (10, 'g1'), (1, 'g1'), + (NULL, 'g2'), (5, 'g2') + +statement +CREATE TABLE k_dec(v decimal(10,2), grp string) USING parquet + +statement +INSERT INTO k_dec VALUES + (1.50, 'g1'), (2.50, 'g1'), (3.50, 'g1'), (4.50, 'g1') + +statement +CREATE TABLE k_empty(v double) USING parquet + +statement +CREATE TABLE k_spark_ex1(v double) USING parquet + +statement +INSERT INTO k_spark_ex1 VALUES (-10.0), (-20.0), (100.0), (1000.0) + +statement +CREATE TABLE k_spark_ex2(v double) USING parquet + +statement +INSERT INTO k_spark_ex2 VALUES (1.0), (10.0), (100.0), (10.0), (1.0) + +statement +CREATE TABLE k_single(v double) USING parquet + +statement +INSERT INTO k_single VALUES (42.0) + +statement +CREATE TABLE k_const(v double) USING parquet + +statement +INSERT INTO k_const VALUES (7.0), (7.0), (7.0) + +statement +CREATE TABLE k_lit(x int) USING parquet + +statement +INSERT INTO k_lit VALUES (1) + +-- ============================================================ +-- Spark's own example: matches -0.7014368047529627. +-- ============================================================ + +query +SELECT kurtosis(v) FROM k_spark_ex1 + +-- Spark's second example: matches 0.19432323191699075. +query +SELECT kurtosis(v) FROM k_spark_ex2 + +-- ============================================================ +-- GROUP BY over doubles: covers a "normal" group (g1), a heavier +-- group (g2), a single-value group (g3, m2=0 => NULL by default), +-- an all-NULL group (g4 => NULL), and constants (g5, m2=0). +-- ============================================================ + +query +SELECT grp, kurtosis(v) FROM k_dbl GROUP BY grp ORDER BY grp + +-- ============================================================ +-- Global aggregate (no GROUP BY). +-- ============================================================ + +query +SELECT kurtosis(v) FROM k_dbl + +-- Empty table returns NULL. +query +SELECT kurtosis(v) FROM k_empty + +-- ============================================================ +-- Integer input: promoted to Double by Spark's ImplicitCastInputTypes. +-- ============================================================ + +query +SELECT grp, kurtosis(v) FROM k_int GROUP BY grp ORDER BY grp + +-- ============================================================ +-- Decimal input. +-- ============================================================ + +query +SELECT grp, kurtosis(v) FROM k_dec GROUP BY grp ORDER BY grp + +-- ============================================================ +-- Literal argument (constant folded; still exercises planning). +-- ============================================================ + +query +SELECT kurtosis(1.0) FROM k_lit + +query +SELECT kurtosis(NULL) FROM k_lit + +-- ============================================================ +-- Divide-by-zero cases under default (nullOnDivideByZero=true): +-- single-value and all-equal groups both yield NULL. See +-- kurtosis_legacy.sql for the `legacyStatisticalAggregate=true` +-- variant that returns NaN instead. +-- ============================================================ + +query +SELECT kurtosis(v) FROM k_single + +query +SELECT kurtosis(v) FROM k_const + +-- ============================================================ +-- FILTER (WHERE ...) — Partial only carries the filter. +-- ============================================================ + +query +SELECT grp, kurtosis(v) FILTER (WHERE v > 0) FROM k_dbl GROUP BY grp ORDER BY grp + +-- ============================================================ +-- Skewness is Spark's sibling in CentralMomentAgg; we don't +-- implement it here, so it should fall back. (This documents the +-- boundary; if we add skewness later, the expect_fallback +-- becomes a plain query.) +-- ============================================================ + +query expect_fallback(unsupported Spark aggregate function: skewness) +SELECT skewness(v) FROM k_dbl + +-- ============================================================ +-- Additional coverage requested by the audit. +-- ============================================================ + +statement +CREATE TABLE k_float(v float, grp string) USING parquet + +statement +INSERT INTO k_float VALUES + (CAST(1.0 AS FLOAT), 'g1'), (CAST(10.0 AS FLOAT), 'g1'), + (CAST(100.0 AS FLOAT), 'g1'), (CAST(10.0 AS FLOAT), 'g1'), + (CAST(1.0 AS FLOAT), 'g1') + +statement +CREATE TABLE k_long(v bigint, grp string) USING parquet + +statement +INSERT INTO k_long VALUES + (1, 'g1'), (10, 'g1'), (100, 'g1'), (10, 'g1'), (1, 'g1') + +statement +CREATE TABLE k_wnd(k int, part string, v double) USING parquet + +statement +INSERT INTO k_wnd VALUES + (1, 'p1', 1.0), (2, 'p1', 1.0), (3, 'p1', 2.0), (4, 'p1', 2.0), + (5, 'p1', 3.0), (6, 'p1', 3.0), (7, 'p1', 3.0), + (8, 'p2', 1.0), (9, 'p2', 2.0), (10, 'p2', 5.0) + +-- Float input: promoted to Double by Spark's ImplicitCastInputTypes. +query +SELECT grp, kurtosis(v) FROM k_float GROUP BY grp ORDER BY grp + +-- BigInt input. +query +SELECT grp, kurtosis(v) FROM k_long GROUP BY grp ORDER BY grp + +-- Mixed with other CentralMomentAgg siblings in one query. +query +SELECT kurtosis(v), avg(v), stddev(v), count(*) FROM k_dbl WHERE v IS NOT NULL + +-- Window use: matches Spark's `DataFrameWindowFunctionsSuite` +-- "skewness and kurtosis functions in window" test. Comet's window +-- path doesn't wire `kurtosis` as a window aggregate today, so this +-- falls back to Spark. +query expect_fallback(is not supported for window function) +SELECT k, + kurtosis(v) OVER (PARTITION BY part ORDER BY k + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +FROM k_wnd ORDER BY k + +-- ============================================================ +-- Numerical stress: NaN/Infinity/-Infinity in a group. Spark +-- propagates these through arithmetic, so the aggregate is +-- expected to produce NaN or NULL rather than a finite value. +-- Use spark_answer_only because our Welford recurrence produces +-- byte-identical NaN payloads with a different bit pattern from +-- Spark's DeclarativeAggregate compilation; result-value equality +-- via Spark's own comparison is what the harness needs to see. +-- ============================================================ + +statement +CREATE TABLE k_nan(v double) USING parquet + +statement +INSERT INTO k_nan VALUES + (1.0), (2.0), (CAST('NaN' AS DOUBLE)), (3.0) + +statement +CREATE TABLE k_inf(v double) USING parquet + +statement +INSERT INTO k_inf VALUES + (1.0), (2.0), (CAST('Infinity' AS DOUBLE)), (3.0) + +statement +CREATE TABLE k_neg_inf(v double) USING parquet + +statement +INSERT INTO k_neg_inf VALUES + (1.0), (2.0), (CAST('-Infinity' AS DOUBLE)), (3.0) + +query spark_answer_only +SELECT kurtosis(v) FROM k_nan + +query spark_answer_only +SELECT kurtosis(v) FROM k_inf + +query spark_answer_only +SELECT kurtosis(v) FROM k_neg_inf + +-- ============================================================ +-- Large-magnitude inputs: Welford is numerically stable but the +-- final `n * m4 / (m2 * m2) - 3.0` can still differ from Spark's +-- codegen at high magnitudes. Use spark_answer_only. +-- ============================================================ + +statement +CREATE TABLE k_big(v double) USING parquet + +statement +INSERT INTO k_big VALUES + (1.0e15), (2.0e15), (3.0e15), (4.0e15), (5.0e15) + +query spark_answer_only +SELECT kurtosis(v) FROM k_big diff --git a/spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis_legacy.sql b/spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis_legacy.sql new file mode 100644 index 0000000000..28e3424ac4 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis_legacy.sql @@ -0,0 +1,41 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Legacy statistical-aggregate semantics: divide-by-zero returns NaN +-- instead of NULL. Spark passes `!legacyStatisticalAggregate` as +-- `nullOnDivideByZero` when constructing the aggregate. +-- Config: spark.sql.legacy.statisticalAggregate=true + +statement +CREATE TABLE k_legacy_single(v double) USING parquet + +statement +INSERT INTO k_legacy_single VALUES (42.0) + +statement +CREATE TABLE k_legacy_const(v double) USING parquet + +statement +INSERT INTO k_legacy_const VALUES (7.0), (7.0), (7.0) + +-- Single-value group: m2 == 0. Expect NaN (not NULL). +query +SELECT kurtosis(v) FROM k_legacy_single + +-- All-equal group: same divide-by-zero shape. +query +SELECT kurtosis(v) FROM k_legacy_const