feat: support approx_count_distinct aggregate expression#4819
Open
andygrove wants to merge 1 commit into
Open
Conversation
Add native support for Spark's approx_count_distinct, a faithful port of Spark's HyperLogLogPlusPlus / HyperLogLogPlusPlusHelper. Each non-null input is hashed with Comet's Spark-compatible XxHash64 (seed 42, floats normalized first), and the HyperLogLog++ registers are stored in Spark's exact packed-Long buffer layout (10 six-bit registers per word). The cardinality is estimated with the same linear-counting and bias-correction tables Spark uses, so results are bit-identical to Spark and the partial-aggregation state matches Spark's aggBufferSchema. Includes a vectorized GroupsAccumulator, SQL file tests comparing against Spark across a range of cardinalities, native unit tests, benchmark coverage, and documentation updates.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Part of #4098.
Rationale for this change
approx_count_distinct(Spark'sHyperLogLogPlusPlus) was unsupported, so any query using it fell back to Spark. Neither DataFusion nor thedatafusion-sparkcrate provides a Spark-compatible HyperLogLog++, and DataFusion's ownapprox_distinctuses a different hash and algorithm, so its estimates would not match Spark. This PR ports Spark'sHyperLogLogPlusPlusexactly so the result is bit-identical.Note: Spark's
approx_count_distinctuses the legacyHyperLogLogPlusPlus, which is a different algorithm from the Apache DataSketcheshll_sketch_aggfamily. They are not interchangeable, so this does not reuse the DataSketches sketch.What changes are included in this PR?
HllPlusPlusaggregate (native/spark-expr/src/agg_funcs/hll_plus_plus.rs) portingHyperLogLogPlusPlusHelper:xxhash64(seed 42), normalizing floats (-0.0 -> 0.0, canonicalNaN) first, exactly as Spark'sNormalizeNaNAndZerodoes before hashing.Longbuffer layout (10 six-bit registers per 64-bit word), so the partial-aggregation state matches Spark'saggBufferSchema(numWordsLongcolumns).hll_plus_plus_const.rs, generated from the Spark source).Accumulatorand a vectorizedGroupsAccumulator.HllPlusPlusprotobuf message (child + precision) and wiring inplanner.rs.CometApproxCountDistinctserde, which computes the precisionpfromrelativeSDwith Spark's exact formula and passes it through the protobuf. Restricts inputs to the atomic types Comet'sxxhash64hashes identically to Spark; other types fall back.CometAggregateExpressionBenchmark.The 2-argument
HyperLogLogPlusPlusimplementation (algorithm and tables) is identical across Spark 3.4.3, 3.5.8, 4.0.1, and 4.1.1, so no version shim is needed.This implementation was scaffolded with the
implement-comet-expressionproject skill.Benchmark
CometAggregateExpressionBenchmarkon an Apple M3 Ultra, 1M rows (best time in ms; lower is better).approx_count_distinctruns fully natively and is at parity with Spark on this scan/hash-dominated workload, with a small edge at high group cardinality where the vectorizedGroupsAccumulatorhelps.approx_count_distinct(int)approx_count_distinct(string)approx_count_distinct(int)approx_count_distinct(int)How are these changes tested?
spark/src/test/resources/sql-tests/expressions/aggregate/approx_count_distinct.sqlthat runs each query through both Spark and Comet and asserts equal results and native execution. It covers a range of cardinalities from a few up to 50000 distinct (exercising linear counting, bias correction, and plain HLL), therelativeSDargument, grouped and global aggregation, all supported input types, NULL handling, the empty-table (returns 0) case, and float-0.0/NaNnormalization. Equality passing confirms the port is bit-identical to Spark.hll_plus_plus.rscovering exact small-cardinality counts, NULL handling, string inputs, precision derivation, merge equivalence, grouped-vs-scalar agreement, and large-cardinality accuracy.