feat: support listagg / string_agg aggregate (Spark 4.0+)#4816
Open
andygrove wants to merge 2 commits into
Open
feat: support listagg / string_agg aggregate (Spark 4.0+)#4816andygrove wants to merge 2 commits into
andygrove wants to merge 2 commits into
Conversation
Adds a `SparkListAgg` UDAF and a `CometListAgg` serde so that Comet can natively execute the simple form of Spark 4.0's `LISTAGG(child, delimiter)` / `string_agg` on `StringType` inputs with a literal delimiter. `WITHIN GROUP (ORDER BY ...)`, `BinaryType` inputs, non-literal delimiters, and non-default collations fall back to Spark. `DISTINCT` falls back because Comet already rejects multi-column distinct aggregates. The native accumulator returns `Utf8` but keeps its intermediate state as `Binary` to match Spark's `TypedImperativeAggregate` buffer schema so the Comet shuffle layer does not insert a `Utf8` → `Binary` cast the merge side cannot read back. Scaffolding produced by the `implement-comet-expression` skill.
Consolidate the reason strings into `private val`s so `getSupportLevel` and `getUnsupportedReasons` reference the same source of truth, and replace the manual `case _: Literal` delimiter check with the standard `.foldable` gate used elsewhere (e.g. `CometPercentile`).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
Spark 4.0 added the
LISTAGG/STRING_AGGaggregate to concatenate string values in a group. Comet previously fell back to Spark for the entire aggregate. This PR adds native support for the common shape so groups that uselistaggon aStringTypecolumn with a literal delimiter can stay in Comet's native execution path.What changes are included in this PR?
SparkListAggUDAF (native/spark-expr/src/agg_funcs/list_agg.rs) that skips nulls, returnsUtf8, and — critically — keeps its intermediate state asBinaryto match Spark'sTypedImperativeAggregatebuffer schema. EmittingUtf8state would force the Comet shuffle layer to insert aUtf8→Binarycast that the merge side then cannot read back.CometListAggserde inspark/src/main/spark-4.x/and wires it intoQueryPlanSerde.aggrSerdeMapthrough a newsparkVersionSpecificAggregatesshim slot (empty for Spark 3.x, providesListAggon Spark 4.x). This keeps the base map compiling against Spark 3.x whereListAggdoes not exist.ListAggprotobuf message and wires it through the native planner.WITHIN GROUP (ORDER BY ...),BinaryTypeinputs, non-literal delimiters, and non-default collations fall back to Spark.DISTINCTfalls back naturally because Comet's aggregate dispatcher rejects multi-column distinct aggregates.docs/source/contributor-guide/expression-audits/agg_funcs.mdand updates the support status indocs/source/user-guide/latest/expressions.md.Scaffolding produced by the
implement-comet-expressionskill; theaudit-comet-expressionskill drove the fallback and coverage-gap tests.How are these changes tested?
ListAggAccumulatorcovering delimiter handling, null-input skipping, empty groups, and multi-partition state merge (native/spark-expr/src/agg_funcs/list_agg.rs).spark/src/test/resources/sql-tests/expressions/aggregate/listagg.sqlgated to Spark 4.0+ (MinSparkVersion: 4.0), running underConfigMatrix: parquet.enable.dictionary=false,trueto exercise both dictionary-encoded and plain-encoded reads. It covers:string_aggalias, default (empty) delimiter, all-NULLgroup, empty table, global aggregate, multi-char and empty-string delimiters, empty-string values inside a group, multibyte UTF-8 values (composed accents, CJK, emoji).expect_fallbackcases:DISTINCT,WITHIN GROUP (ORDER BY ...)ascending and descending,BinaryTypechild with default and literal binary delimiter../mvnw test -Pspark-4.0 -Dsuites="org.apache.comet.CometSqlFileTestSuite listagg" -Dtest=nonepasses for both dictionary-encoding modes.cd native && cargo clippy --all-targets --workspace -- -D warningspasses.