Skip to content

feat: support max_by and min_by aggregate expressions#4817

Open
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:support-max-by-expression
Open

feat: support max_by and min_by aggregate expressions#4817
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:support-max-by-expression

Conversation

@andygrove

@andygrove andygrove commented Jul 3, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #3841.

Rationale for this change

max_by(x, y) returns the value of x associated with the maximum value of y, and min_by(x, y) returns the value associated with the minimum. Both were previously unsupported, so any query using them fell back to Spark. There is no max_by/min_by aggregate in DataFusion or the datafusion-spark crate, so this adds a native implementation.

What changes are included in this PR?

  • A native DataFusion aggregate MaxMinBy (native/spark-expr/src/agg_funcs/max_min_by.rs) serving both max_by and min_by (parameterized by an is_max flag). It compares the ordering column using Arrow's row format, so any orderable fixed-length type is handled with a single Spark-consistent total order (NaN sorts largest, matching Spark). Null orderings are ignored, the value paired with the extremum ordering is returned (and may itself be null), and an all-null-ordering group yields null.
  • A vectorized GroupsAccumulator for grouped aggregation. Each group keeps its best (value, ordering) pair as Arrow row-format bytes, so selecting the extremum for a batch is one row conversion plus per-row byte comparisons. This avoids the per-group ScalarValue work of DataFusion's generic GroupsAccumulatorAdapter. The scalar Accumulator is retained for the non-grouped path.
  • MaxBy and MinBy protobuf messages and wiring in planner.rs.
  • CometMaxBy / CometMinBy serdes sharing a CometMaxMinBy[T] base, registered in QueryPlanSerde. getSupportLevel restricts both the value and the ordering to fixed-length types: a variable-length or nested type (string, binary, struct) forces Spark's SortAggregate, which Comet does not accelerate, so those cases fall back to Spark.
  • max_by / min_by groups in CometAggregateExpressionBenchmark.
  • Documentation updates (expression support status, expression audit notes).

The 2-argument implementations are identical across Spark 3.4.3, 3.5.8, 4.0.1, and 4.1.1, so no version shim is needed. The 3-argument top-k forms max_by(x, y, k) / min_by(x, y, k) exist only on Spark master and are out of scope.

This implementation was scaffolded with the implement-comet-expression project skill, which also ran the audit-comet-expression skill to drive the test-coverage pass.

Benchmark

CometAggregateExpressionBenchmark on an Apple M3 Ultra, 1M rows (best time in ms; lower is better). max_by/min_by run fully natively and are at parity with Spark on this scan-dominated workload, with a small edge at high group cardinality where the vectorized GroupsAccumulator matters most.

Case grouping Spark Comet
max_by(int, long) grp (1k groups) 78 78
max_by(double, int) grp 80 80
max_by(decimal, int) grp 87 87
max_by(int, long) high card (100k groups) 199 193
max_by(int, long) global (no group by) 24 24
min_by(int, long) grp 80 79
min_by(double, int) grp 80 79
min_by(int, long) high card (100k groups) 194 194

How are these changes tested?

  • Rust unit tests in max_min_by.rs (14 total) covering the scalar accumulator and the grouped accumulator: basic max/min selection, null-ordering handling, null value at the extremum, all-null orderings, NaN ordering, empty groups, multi-group aggregation, filter handling, and partial/final merge equivalence.
  • SQL file tests spark/src/test/resources/sql-tests/expressions/aggregate/max_by.sql and min_by.sql covering global and grouped aggregation, null handling, literal arguments, negative and boundary orderings, NaN/±Infinity, decimal/date/timestamp/boolean types, multiple aggregates in one query, and mixing max_by with min_by. Each query runs through both Spark and Comet and asserts native execution.

Add native support for Spark's max_by(x, y) aggregate, which returns the
value of x associated with the maximum value of y.

The expression is implemented as a native DataFusion aggregate (MaxMinBy)
that compares the ordering column via Arrow's row format. Null orderings
are ignored, the value paired with the maximum ordering is returned (and
may itself be null), and an all-null-ordering group yields null.

Both the value and ordering must be fixed-length types: a variable-length
or nested type forces Spark's SortAggregate, which Comet does not run, so
those cases fall back to Spark.
@andygrove andygrove added this to the 1.0.0 milestone Jul 3, 2026
Add min_by alongside max_by, both served by the shared native MaxMinBy
aggregate. Refactor the Scala serde into a shared CometMaxMinBy base.

Add a vectorized GroupsAccumulator that keeps each group's best
(value, ordering) pair as Arrow row-format bytes, so grouped max_by/min_by
avoids the per-group ScalarValue work of the generic
GroupsAccumulatorAdapter.

Extend the aggregate microbenchmark and SQL file tests to cover both
max_by and min_by.
@andygrove andygrove changed the title feat: support max_by aggregate expression feat: support max_by and min_by aggregate expressions Jul 3, 2026
@andygrove andygrove marked this pull request as ready for review July 3, 2026 17:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add native Comet support for max_by and min_by aggregates (HashAggregate + SortAggregate)

1 participant