Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/contributor-guide/expression-audits/agg_funcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@

> Audit notes for expressions in this category that have been audited. Absence of an entry means the expression has not been audited yet, not that it is unsupported. See the user guide [Spark Expression Support] for current support status.

## approx_count_distinct

- Spark 3.4.3 (2026-07-03): registered as `expression[HyperLogLogPlusPlus]("approx_count_distinct")`, an `ImperativeAggregate` that hashes each non-null input with `XxHash64` (seed 42, floats normalized via `NormalizeNaNAndZero`) and keeps a HyperLogLog++ register buffer of `numWords` `Long`s (10 six-bit registers per word). The cardinality is estimated with linear counting for small inputs and bias-corrected HLL otherwise. Comet ports `HyperLogLogPlusPlusHelper` exactly, including the bias-correction tables, reuses Comet's Spark-compatible `xxhash64` for hashing, and stores the register buffer in Spark's identical packed-`Long` layout, so results are bit-identical to Spark and the partial-aggregation state matches Spark's `aggBufferSchema`. `relativeSD` (default 0.05) sets the precision `p`.
- Spark 3.5.8 (2026-07-03): algorithm and tables identical to 3.4.3.
- Spark 4.0.1 (2026-07-03): `HyperLogLogPlusPlusHelper` moved to `catalyst.util` and `XxHash64Function.hash` gained collation parameters, but for the default `UTF8_BINARY` collation and non-string types the hash value is unchanged, so results match 3.4.3.
- Spark 4.1.1 (2026-07-03): identical to 4.0.1.

## any

- Spark 3.4.3 (audited 2026-05-26): registered as a SQL alias of `BoolOr`, which extends `RuntimeReplaceableAggregate` with `replacement = Max(child)`. Catalyst rewrites `any(x)` to `max(x)` before Comet sees the plan, so `any` is served by `CometMax` on a `BooleanType` column.
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ expressions. The following function families are **not currently planned** for n

The file-metadata functions `input_file_name`, `input_file_block_start`, and `input_file_block_length` depend on scan-internal per-row file information rather than the expression layer; their support status is covered in the [scan compatibility guide](compatibility/scans.md).

Note that `approx_count_distinct`, `median`, and `mode` are planned: they are mainstream (`median` and `mode` are exact aggregates). `approx_percentile` / `percentile_approx` are not currently planned because their approximate results cannot be made bit-identical to Spark.
Note that `median` and `mode` are planned: they are mainstream exact aggregates. `approx_percentile` / `percentile_approx` are not currently planned because their approximate results cannot be made bit-identical to Spark. (`approx_count_distinct` is supported: Comet ports Spark's `HyperLogLogPlusPlus` exactly, so its result is bit-identical.)

The tables below list every Spark built-in expression with its current status.

Expand All @@ -70,7 +70,7 @@ The tables below list every Spark built-in expression with its current status.
| --- | --- | --- |
| `any` | ✅ | |
| `any_value` | ✅ | |
| `approx_count_distinct` | 🔜 | tracking [#4098](https://github.com/apache/datafusion-comet/issues/4098) |
| `approx_count_distinct` | | |
| `array_agg` | 🔜 | Array aggregate (related to `collect_list`, [#2524](https://github.com/apache/datafusion-comet/issues/2524)) |
| `avg` | ✅ | Interval types fall back |
| `bit_and` | ✅ | |
Expand Down
9 changes: 7 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ use datafusion_comet_proto::{
use datafusion_comet_spark_expr::{
jvm_udf::JvmScalarUdfExpr, ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation,
Covariance, CreateNamedStruct, DecimalRescaleCheckOverflow, GetArrayStructFields,
GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal,
ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
GetStructField, HllPlusPlus, IfExpr, ListExtract, NormalizeNaNAndZero, SparkCastOptions,
Stddev, SumDecimal, ToJson, UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
};
use itertools::Itertools;
use jni::objects::{Global, JObject};
Expand Down Expand Up @@ -2653,6 +2653,11 @@ impl PhysicalPlanner {
let func = AggregateUDF::new_from_impl(SparkCollectSet::new());
Self::create_aggr_func_expr("collect_set", schema, vec![child], func)
}
AggExprStruct::Hllpp(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?;
let func = AggregateUDF::new_from_impl(HllPlusPlus::new(expr.precision));
Self::create_aggr_func_expr("approx_count_distinct", schema, vec![child], func)
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message AggExpr {
BloomFilterAgg bloomFilterAgg = 16;
CollectSet collectSet = 17;
Percentile percentile = 18;
HllPlusPlus hllpp = 19;
}

// Optional filter expression for SQL FILTER (WHERE ...) clause.
Expand Down Expand Up @@ -277,6 +278,13 @@ message CollectSet {
DataType datatype = 2;
}

// approx_count_distinct (Spark's HyperLogLogPlusPlus)
message HllPlusPlus {
Expr child = 1;
// Number of addressing bits, computed from relativeSD to match Spark.
int32 precision = 2;
}

enum EvalMode {
LEGACY = 0;
TRY = 1;
Expand Down
Loading
Loading