Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/source/contributor-guide/expression-audits/agg_funcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,27 @@
- Spark 3.5.8 (2026-05-26)
- Spark 4.0.1 (2026-05-26)

## max_by

- Spark 3.4.3 (2026-07-03): `MaxBy` is a 2-argument `DeclarativeAggregate` registered as `expression[MaxBy]("max_by")`. Buffer is `(valueWithExtremumOrdering, extremumOrdering)`; null orderings are ignored, the value paired with the maximum ordering is returned (and may itself be null), and an all-null-ordering group yields null. Comet implements a native `max_by` aggregate. Only fixed-length value and ordering types are accelerated: a variable-length or nested type (string, binary, struct) forces Spark's `SortAggregate`, which Comet does not support, so those cases fall back to Spark. `max_by` is non-deterministic when several rows tie on the maximum ordering, matching Spark's documented behavior.
- Spark 3.5.8 (2026-07-03): aggregate logic identical to 3.4.3.
- Spark 4.0.1 (2026-07-03): aggregate logic identical to 3.4.3; only the `@ExpressionDescription` example and note text differ.
- Spark 4.1.1 (2026-07-03): aggregate logic identical to 3.4.3. The 3-argument top-k form `max_by(x, y, k)` (via `MaxByBuilder` / `MaxMinByK`) is only present on Spark master, not in any released 3.4 through 4.1 version, so Comet handles only the 2-argument form.

## median

- Spark 3.4.3 (audited 2026-06-24): `Median(child)` is a `RuntimeReplaceableAggregate` with `replacement = Percentile(child, Literal(0.5))`. Catalyst rewrites `median(x)` to `percentile(x, 0.5)` before Comet sees the plan, so it is served by `CometPercentile`.
- Spark 3.5.8 (audited 2026-06-24): identical to 3.4.3.
- Spark 4.0.1 (audited 2026-06-24): `replacement` becomes `lazy val`; semantics unchanged.
- Spark 4.1.1 (audited 2026-06-24): identical to 4.0.1.

## min_by

- Spark 3.4.3 (2026-07-03): `MinBy` shares the abstract `MaxMinBy` `DeclarativeAggregate` with `MaxBy`, differing only in the comparison direction (`least` / `<` instead of `greatest` / `>`). Registered as `expression[MinBy]("min_by")`. Null orderings are ignored, the value paired with the minimum ordering is returned (and may itself be null), and an all-null-ordering group yields null. Comet serves it through the same native `MaxMinBy` aggregate as `max_by`, with the same fixed-length value and ordering restriction (variable-length or nested types fall back to Spark). Non-deterministic on ties, matching Spark.
- Spark 3.5.8 (2026-07-03): aggregate logic identical to 3.4.3.
- Spark 4.0.1 (2026-07-03): aggregate logic identical to 3.4.3; only the `@ExpressionDescription` example and note text differ.
- Spark 4.1.1 (2026-07-03): aggregate logic identical to 3.4.3. The 3-argument top-k form `min_by(x, y, k)` (via `MinByBuilder` / `MaxMinByK`) is only present on Spark master, so Comet handles only the 2-argument form.

## percentile

- Spark 3.4.3 (audited 2026-06-24): `Percentile(child, percentageExpression, frequencyExpression, ..., reverse)` over `PercentileBase`. Exact percentile using `index = p * (n - 1)` linear interpolation, NULL inputs skipped, empty/all-null group returns NULL. `CometPercentile` maps the single-literal-percentage, default-frequency, numeric-input, ascending form to DataFusion's `percentile_cont` (same interpolation). Array-of-percentages, a non-default frequency argument, descending order, and interval inputs fall back to Spark.
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ The tables below list every Spark built-in expression with its current status.
| `last_value` | ✅ | |
| `listagg` | 🔜 | String aggregation |
| `max` | ✅ | |
| `max_by` | 🔜 | [#3841](https://github.com/apache/datafusion-comet/issues/3841) |
| `max_by` | | Value and ordering must be fixed-length types |
| `mean` | ✅ | |
| `median` | ✅ | Rewrites to `percentile(col, 0.5)`; falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `min` | ✅ | |
| `min_by` | 🔜 | [#3841](https://github.com/apache/datafusion-comet/issues/3841) |
| `min_by` | | Value and ordering must be fixed-length types |
| `mode` | 🔜 | [#3970](https://github.com/apache/datafusion-comet/issues/3970) |
| `percentile` | ✅ | Single literal percentage on numeric input; array of percentages and a frequency argument fall back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `percentile_cont` | ✅ | Spark 4.0+ `WITHIN GROUP (ORDER BY ...)`; ascending only, `DESC` falls back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
Expand Down
18 changes: 16 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ use datafusion_comet_proto::{
use datafusion_comet_spark_expr::{
jvm_udf::JvmScalarUdfExpr, ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation,
Covariance, CreateNamedStruct, DecimalRescaleCheckOverflow, GetArrayStructFields,
GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal,
ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
GetStructField, IfExpr, ListExtract, MaxMinBy, NormalizeNaNAndZero, SparkCastOptions, Stddev,
SumDecimal, ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
};
use itertools::Itertools;
use jni::objects::{Global, JObject};
Expand Down Expand Up @@ -2653,6 +2653,20 @@ impl PhysicalPlanner {
let func = AggregateUDF::new_from_impl(SparkCollectSet::new());
Self::create_aggr_func_expr("collect_set", schema, vec![child], func)
}
AggExprStruct::MaxBy(expr) => {
let value = self.create_expr(expr.value.as_ref().unwrap(), Arc::clone(&schema))?;
let ordering =
self.create_expr(expr.ordering.as_ref().unwrap(), Arc::clone(&schema))?;
let func = AggregateUDF::new_from_impl(MaxMinBy::new_max_by());
Self::create_aggr_func_expr("max_by", schema, vec![value, ordering], func)
}
AggExprStruct::MinBy(expr) => {
let value = self.create_expr(expr.value.as_ref().unwrap(), Arc::clone(&schema))?;
let ordering =
self.create_expr(expr.ordering.as_ref().unwrap(), Arc::clone(&schema))?;
let func = AggregateUDF::new_from_impl(MaxMinBy::new_min_by());
Self::create_aggr_func_expr("min_by", schema, vec![value, ordering], func)
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ message AggExpr {
BloomFilterAgg bloomFilterAgg = 16;
CollectSet collectSet = 17;
Percentile percentile = 18;
MaxBy maxBy = 19;
MinBy minBy = 20;
}

// Optional filter expression for SQL FILTER (WHERE ...) clause.
Expand Down Expand Up @@ -277,6 +279,20 @@ message CollectSet {
DataType datatype = 2;
}

message MaxBy {
// The value returned by the aggregate (associated with the maximum ordering).
Expr value = 1;
// The ordering expression whose maximum selects the returned value.
Expr ordering = 2;
}

message MinBy {
// The value returned by the aggregate (associated with the minimum ordering).
Expr value = 1;
// The ordering expression whose minimum selects the returned value.
Expr ordering = 2;
}

enum EvalMode {
LEGACY = 0;
TRY = 1;
Expand Down
Loading
Loading