Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ The tables below list every Spark built-in expression with its current status.
| `median` | ✅ | Rewrites to `percentile(col, 0.5)`; falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `min` | ✅ | |
| `min_by` | 🔜 | [#3841](https://github.com/apache/datafusion-comet/issues/3841) |
| `mode` | 🔜 | [#3970](https://github.com/apache/datafusion-comet/issues/3970) |
| `mode` | | `mode(col)` only; Spark breaks ties non-deterministically, so Comet returns the smallest tied value and falls back by default, opt-in via allowIncompatible ([#3970](https://github.com/apache/datafusion-comet/issues/3970)) |
| `percentile` | ✅ | Single literal percentage on numeric input; array of percentages and a frequency argument fall back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `percentile_cont` | ✅ | Spark 4.0+ `WITHIN GROUP (ORDER BY ...)`; ascending only, `DESC` falls back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `percentile_disc` | 🔜 | Percentile aggregate |
Expand Down
10 changes: 8 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ use datafusion_comet_proto::{
use datafusion_comet_spark_expr::{
jvm_udf::JvmScalarUdfExpr, ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation,
Covariance, CreateNamedStruct, DecimalRescaleCheckOverflow, GetArrayStructFields,
GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal,
ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
GetStructField, IfExpr, ListExtract, Mode, NormalizeNaNAndZero, SparkCastOptions, Stddev,
SumDecimal, ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
};
use itertools::Itertools;
use jni::objects::{Global, JObject};
Expand Down Expand Up @@ -2643,6 +2643,12 @@ impl PhysicalPlanner {
let func = AggregateUDF::new_from_impl(SparkCollectSet::new());
Self::create_aggr_func_expr("collect_set", schema, vec![child], func)
}
AggExprStruct::Mode(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
let func = AggregateUDF::new_from_impl(Mode::new(datatype));
Self::create_aggr_func_expr("mode", schema, vec![child], func)
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ message AggExpr {
BloomFilterAgg bloomFilterAgg = 16;
CollectSet collectSet = 17;
Percentile percentile = 18;
Mode mode = 19;
}

// Optional filter expression for SQL FILTER (WHERE ...) clause.
Expand Down Expand Up @@ -275,6 +276,11 @@ message CollectSet {
DataType datatype = 2;
}

message Mode {
Expr child = 1;
DataType datatype = 2;
}

enum EvalMode {
LEGACY = 0;
TRY = 1;
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/agg_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod avg;
mod avg_decimal;
mod correlation;
mod covariance;
mod mode;
mod stddev;
mod sum_decimal;
mod sum_int;
Expand All @@ -29,6 +30,7 @@ pub use avg::Avg;
pub use avg_decimal::AvgDecimal;
pub use correlation::Correlation;
pub use covariance::Covariance;
pub use mode::Mode;
pub use stddev::Stddev;
pub use sum_decimal::SumDecimal;
pub use sum_int::SumInteger;
Expand Down
Loading
Loading