feat: support kurtosis aggregate#4818
Open
andygrove wants to merge 1 commit into
Open
Conversation
Adds a Comet-owned `Kurtosis` UDAF and `CometKurtosis` serde so that Spark's excess-kurtosis aggregate runs natively. The native accumulator stores `[n, avg, m2, m3, m4]` `Float64` state to mirror Spark's `CentralMomentAgg` (momentOrder = 4) buffer, and the update/merge kernels are a direct port of Spark's `updateExpressionsDef` / `mergeExpressions` expressions. `nullOnDivideByZero` is threaded through so `spark.sql.legacy.statisticalAggregate` both branches behave the same as Spark (NULL vs NaN when `m2 == 0`). Window use falls back to Spark today: the window path doesn't wire the Comet aggregate for kurtosis. Captured as an `expect_fallback` case in the SQL test. Scaffolding produced by the `implement-comet-expression` skill; audit performed by the `audit-comet-expression` skill.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
kurtosisis a standard SQL statistical aggregate and one of the last remainingCentralMomentAggsiblings that Comet didn't run natively (variance and stddev are already supported). Adding it lets queries using kurtosis stay in Comet's native path instead of falling back to Spark for the whole aggregate.What changes are included in this PR?
KurtosisUDAF (native/spark-expr/src/agg_funcs/kurtosis.rs) with a per-rowKurtosisAccumulator. Intermediate state is[n, avg, m2, m3, m4]Float64, mirroring Spark'sCentralMomentAgg(momentOrder = 4) wire format so a Spark-produced Partial and a Comet-produced Final can share bytes without conversion. Update and merge kernels are direct ports of Spark'supdateExpressionsDefandmergeExpressions(Meng 2015 recurrence).nullOnDivideByZerothrough the proto sospark.sql.legacy.statisticalAggregatebehaves the same on both engines: the default returnsNULLwhenm2 == 0, legacy mode returnsNaN.Kurtosisprotobuf message and wires it through the native planner.CometKurtosisinspark/src/main/scala/org/apache/comet/serde/aggregates.scalaand registers it inQueryPlanSerde.aggrSerdeMap.supportsMixedPartialFinalis left atfalseto match the conservative policy already used byVarianceandStddevin the same file.docs/source/contributor-guide/expression-audits/agg_funcs.md; flips the support status indocs/source/user-guide/latest/expressions.mdfrom planned to supported.kurtosis(x) OVER (...)) still falls back because the Comet window path doesn't wire kurtosis today. Captured as anexpect_fallbackcase rather than left as an implicit gap.Scaffolding produced by the
implement-comet-expressionskill; theaudit-comet-expressionskill drove the audit and produced the extra fallback and coverage-gap tests.How are these changes tested?
KurtosisAccumulatorcovering empty group, single-value divide-by-zero in bothnullOnDivideByZeromodes, both of Spark's ownExpressionDescriptionexamples (-0.7014368047529627and0.19432323191699075), and a two-partition state merge that reproduces the single-batch result.spark/src/test/resources/sql-tests/expressions/aggregate/kurtosis.sqlrunning underConfigMatrix: parquet.enable.dictionary=false,true. Covers Spark's documented examples, GROUP BY, global aggregate, empty table, integer/decimal/float/bigint inputs, literal argument,FILTER (WHERE ...), mixed with other aggregates, andexpect_fallbackcases forskewnessand window use ofkurtosis. Numerical-stress cases (NaN,Infinity,-Infinity,1e15magnitudes) usespark_answer_onlymode.kurtosis_legacy.sqlgated withConfig: spark.sql.legacy.statisticalAggregate=trueexercises theNaNpath for single-value and all-equal groups../mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite kurtosis" -Dtest=nonepasses under both the default (Spark 3.5) and-Pspark-4.0profiles.cd native && cargo clippy --all-targets --workspace -- -D warningspasses.