feat: support max_by and min_by aggregate expressions#4817
Open
andygrove wants to merge 2 commits into
Open
Conversation
Add native support for Spark's max_by(x, y) aggregate, which returns the value of x associated with the maximum value of y. The expression is implemented as a native DataFusion aggregate (MaxMinBy) that compares the ordering column via Arrow's row format. Null orderings are ignored, the value paired with the maximum ordering is returned (and may itself be null), and an all-null-ordering group yields null. Both the value and ordering must be fixed-length types: a variable-length or nested type forces Spark's SortAggregate, which Comet does not run, so those cases fall back to Spark.
Add min_by alongside max_by, both served by the shared native MaxMinBy aggregate. Refactor the Scala serde into a shared CometMaxMinBy base. Add a vectorized GroupsAccumulator that keeps each group's best (value, ordering) pair as Arrow row-format bytes, so grouped max_by/min_by avoids the per-group ScalarValue work of the generic GroupsAccumulatorAdapter. Extend the aggregate microbenchmark and SQL file tests to cover both max_by and min_by.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3841.
Rationale for this change
max_by(x, y)returns the value ofxassociated with the maximum value ofy, andmin_by(x, y)returns the value associated with the minimum. Both were previously unsupported, so any query using them fell back to Spark. There is nomax_by/min_byaggregate in DataFusion or thedatafusion-sparkcrate, so this adds a native implementation.What changes are included in this PR?
MaxMinBy(native/spark-expr/src/agg_funcs/max_min_by.rs) serving bothmax_byandmin_by(parameterized by anis_maxflag). It compares the ordering column using Arrow's row format, so any orderable fixed-length type is handled with a single Spark-consistent total order (NaN sorts largest, matching Spark). Null orderings are ignored, the value paired with the extremum ordering is returned (and may itself be null), and an all-null-ordering group yields null.GroupsAccumulatorfor grouped aggregation. Each group keeps its best(value, ordering)pair as Arrow row-format bytes, so selecting the extremum for a batch is one row conversion plus per-row byte comparisons. This avoids the per-groupScalarValuework of DataFusion's genericGroupsAccumulatorAdapter. The scalarAccumulatoris retained for the non-grouped path.MaxByandMinByprotobuf messages and wiring inplanner.rs.CometMaxBy/CometMinByserdes sharing aCometMaxMinBy[T]base, registered inQueryPlanSerde.getSupportLevelrestricts both the value and the ordering to fixed-length types: a variable-length or nested type (string, binary, struct) forces Spark'sSortAggregate, which Comet does not accelerate, so those cases fall back to Spark.max_by/min_bygroups inCometAggregateExpressionBenchmark.The 2-argument implementations are identical across Spark 3.4.3, 3.5.8, 4.0.1, and 4.1.1, so no version shim is needed. The 3-argument top-k forms
max_by(x, y, k)/min_by(x, y, k)exist only on Spark master and are out of scope.This implementation was scaffolded with the
implement-comet-expressionproject skill, which also ran theaudit-comet-expressionskill to drive the test-coverage pass.Benchmark
CometAggregateExpressionBenchmarkon an Apple M3 Ultra, 1M rows (best time in ms; lower is better).max_by/min_byrun fully natively and are at parity with Spark on this scan-dominated workload, with a small edge at high group cardinality where the vectorizedGroupsAccumulatormatters most.max_by(int, long)max_by(double, int)max_by(decimal, int)max_by(int, long)max_by(int, long)min_by(int, long)min_by(double, int)min_by(int, long)How are these changes tested?
max_min_by.rs(14 total) covering the scalar accumulator and the grouped accumulator: basic max/min selection, null-ordering handling, null value at the extremum, all-null orderings, NaN ordering, empty groups, multi-group aggregation, filter handling, and partial/final merge equivalence.spark/src/test/resources/sql-tests/expressions/aggregate/max_by.sqlandmin_by.sqlcovering global and grouped aggregation, null handling, literal arguments, negative and boundary orderings, NaN/±Infinity, decimal/date/timestamp/boolean types, multiple aggregates in one query, and mixingmax_bywithmin_by. Each query runs through both Spark and Comet and asserts native execution.