Skip to content

feat: support approx_count_distinct aggregate expression#4819

Open
andygrove wants to merge 1 commit into
apache:mainfrom
andygrove:support-approx-count-distinct
Open

feat: support approx_count_distinct aggregate expression#4819
andygrove wants to merge 1 commit into
apache:mainfrom
andygrove:support-approx-count-distinct

Conversation

@andygrove

@andygrove andygrove commented Jul 3, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Part of #4098.

Rationale for this change

approx_count_distinct (Spark's HyperLogLogPlusPlus) was unsupported, so any query using it fell back to Spark. Neither DataFusion nor the datafusion-spark crate provides a Spark-compatible HyperLogLog++, and DataFusion's own approx_distinct uses a different hash and algorithm, so its estimates would not match Spark. This PR ports Spark's HyperLogLogPlusPlus exactly so the result is bit-identical.

Note: Spark's approx_count_distinct uses the legacy HyperLogLogPlusPlus, which is a different algorithm from the Apache DataSketches hll_sketch_agg family. They are not interchangeable, so this does not reuse the DataSketches sketch.

What changes are included in this PR?

  • A native HllPlusPlus aggregate (native/spark-expr/src/agg_funcs/hll_plus_plus.rs) porting HyperLogLogPlusPlusHelper:
    • Hashes each non-null input with Comet's existing Spark-compatible xxhash64 (seed 42), normalizing floats (-0.0 -> 0.0, canonical NaN) first, exactly as Spark's NormalizeNaNAndZero does before hashing.
    • Stores registers in Spark's identical packed-Long buffer layout (10 six-bit registers per 64-bit word), so the partial-aggregation state matches Spark's aggBufferSchema (numWords Long columns).
    • Estimates cardinality with linear counting for small inputs and bias-corrected HLL otherwise, using the bias-correction tables ported verbatim from Spark (hll_plus_plus_const.rs, generated from the Spark source).
    • Provides both a scalar Accumulator and a vectorized GroupsAccumulator.
  • HllPlusPlus protobuf message (child + precision) and wiring in planner.rs.
  • CometApproxCountDistinct serde, which computes the precision p from relativeSD with Spark's exact formula and passes it through the protobuf. Restricts inputs to the atomic types Comet's xxhash64 hashes identically to Spark; other types fall back.
  • Benchmark coverage in CometAggregateExpressionBenchmark.
  • Documentation updates (expression support status, expression audit notes).

The 2-argument HyperLogLogPlusPlus implementation (algorithm and tables) is identical across Spark 3.4.3, 3.5.8, 4.0.1, and 4.1.1, so no version shim is needed.

This implementation was scaffolded with the implement-comet-expression project skill.

Benchmark

CometAggregateExpressionBenchmark on an Apple M3 Ultra, 1M rows (best time in ms; lower is better). approx_count_distinct runs fully natively and is at parity with Spark on this scan/hash-dominated workload, with a small edge at high group cardinality where the vectorized GroupsAccumulator helps.

Case grouping Spark Comet
approx_count_distinct(int) grp (1k groups) 124 124
approx_count_distinct(string) grp 151 151
approx_count_distinct(int) global 52 52
approx_count_distinct(int) high card (100k groups) 472 460

How are these changes tested?

  • A SQL file test spark/src/test/resources/sql-tests/expressions/aggregate/approx_count_distinct.sql that runs each query through both Spark and Comet and asserts equal results and native execution. It covers a range of cardinalities from a few up to 50000 distinct (exercising linear counting, bias correction, and plain HLL), the relativeSD argument, grouped and global aggregation, all supported input types, NULL handling, the empty-table (returns 0) case, and float -0.0/NaN normalization. Equality passing confirms the port is bit-identical to Spark.
  • Rust unit tests in hll_plus_plus.rs covering exact small-cardinality counts, NULL handling, string inputs, precision derivation, merge equivalence, grouped-vs-scalar agreement, and large-cardinality accuracy.

Add native support for Spark's approx_count_distinct, a faithful port of
Spark's HyperLogLogPlusPlus / HyperLogLogPlusPlusHelper.

Each non-null input is hashed with Comet's Spark-compatible XxHash64
(seed 42, floats normalized first), and the HyperLogLog++ registers are
stored in Spark's exact packed-Long buffer layout (10 six-bit registers
per word). The cardinality is estimated with the same linear-counting and
bias-correction tables Spark uses, so results are bit-identical to Spark
and the partial-aggregation state matches Spark's aggBufferSchema.

Includes a vectorized GroupsAccumulator, SQL file tests comparing against
Spark across a range of cardinalities, native unit tests, benchmark
coverage, and documentation updates.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant