Skip to content

feat: Runtime dynamic filter pushdown for native hash joins#4810

Open
schenksj wants to merge 5 commits into
apache:mainfrom
schenksj:feat/join-dynamic-filter
Open

feat: Runtime dynamic filter pushdown for native hash joins#4810
schenksj wants to merge 5 commits into
apache:mainfrom
schenksj:feat/join-dynamic-filter

Conversation

@schenksj

@schenksj schenksj commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Disclosure: this issue was drafted with the help of an AI assistant .

Which issue does this PR close?

Closes #4807.

Follow-up issues for IO-level pushdown into kernel scans: #4808 (Iceberg), #4809 (Delta).

Rationale for this change

DataFusion 54 ships runtime dynamic filtering for hash joins (HashJoinExec + DynamicFilterPhysicalExpr + SharedBuildAccumulator), but the consumer-side wiring lives in DataFusion's physical optimizer, which Comet does not run — so Comet never benefits. Build-side runtime filtering is one of the largest TPC-DS levers in comparable engines (Velox HashProbe::pushdownDynamicFilters; Databricks DFP). This PR wires it up on Comet's side, scan-agnostically, so all probe-side scans (Parquet DataSourceExec, JVM-fed ScanExec, Iceberg, and future Delta kernel scans) inherit row-level filtering.

What changes are included in this PR?

  • New CometDynamicFilterExec operator (native/core/src/execution/operators/dynamic_filter.rs): evaluates the join's shared DynamicFilterPhysicalExpr against probe-side batches before the hash probe, with
    • a pass-through fast path while the filter is the constant-true placeholder (correctness never depends on population),
    • a selectivity guard that permanently disables evaluation on a partition stream when it keeps > 95% of rows after 64K rows observed,
    • metrics: dynamic_filter_rows_pruned + baseline elapsed-compute/output-rows.
  • attach_join_dynamic_filter helper: rewires an eligible HashJoinExec (including the ProjectionExec-over-join shape produced by swap_inputs) so the join and the new probe-side wrapper share one filter, via the public HashJoinExecBuilder + with_dynamic_filter_expr APIs.
  • Eligibility gate mirrors DataFusion's own: probe side must be preserved under the ON clause (JoinType::on_lr_is_preserved().1 — Inner, LeftOuter, LeftSemi/RightSemi, LeftAnti, LeftMark); null-aware anti joins are excluded.
  • Planner integration in both HashJoin branches (build-left and swap), proto field HashJoin.dynamic_filter_enabled, and a new config spark.comet.exec.join.dynamicFilter.enabled (default false, experimental).

Timing note: each Comet native execution corresponds to a single Spark partition, so the build accumulator completes after that task's build and the filter is populated before probe batches flow. Broadcast joins get a broadcast-wide filter; shuffled hash joins get a per-partition filter. If DataFusion declines to populate (session flag off, etc.), the placeholder stays true and the wrapper is a no-op.

How are these changes tested?

  • New Rust unit tests (native/core/src/execution/operators/dynamic_filter.rs): pass-through pre-population, filtering after update(), constant-false (empty build) pruning, metric counting; end-to-end HashJoinExec execution asserting identical results with and without the filter attached and dynamic_filter_rows_pruned > 0 (proving the build phase populates the manually-attached filter); attach-helper gating for ineligible join types (Right/Full/RightAnti). Full native suite passes (128 tests), clippy + rustfmt clean.
  • New Scala test (CometJoinSuite: "HashJoin with dynamic filter enabled"): result parity vs. Spark with the config enabled across broadcast + shuffled hash joins, inner/left-outer/semi/anti, a selective build side, an empty build side, and NULL join keys. Verified under the spark-3.5 profile.

🤖 Generated with Claude Code

https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p

Wires DataFusion's join dynamic filter machinery into Comet: when
spark.comet.exec.join.dynamicFilter.enabled is set, eligible native hash
joins attach a DynamicFilterPhysicalExpr (populated by the build phase
with min/max bounds plus InList or hash-table-lookup membership) and the
probe child is wrapped in a new CometDynamicFilterExec that applies the
same filter to probe batches before the hash probe.

The wrapper passes batches through untouched while the filter is still
the constant-true placeholder, so correctness never depends on
population; a selectivity guard disables evaluation on streams where the
filter prunes few rows, and a dynamic_filter_rows_pruned metric records
effectiveness. Eligibility mirrors DataFusion's own gate
(JoinType::on_lr_is_preserved probe side) and excludes null-aware anti
joins.

Closes apache#4807

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p
@schenksj schenksj marked this pull request as draft July 3, 2026 13:15
schenksj and others added 2 commits July 3, 2026 11:56
Registers the probe-side CometDynamicFilterExec in
SparkPlan::new_with_additional so dynamic_filter_rows_pruned and
elapsed-compute metrics roll up to the join node in the Spark UI.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p
- Mirror DataFusion's full allow_join_dynamic_filter_pushdown gate in
  attach_join_dynamic_filter (session flag + preserve_file_partitions
  with Partitioned mode), with a test; DataFusion re-checks the same
  gate at execute time, so this also avoids installing wrappers that
  can never engage.
- Time filter evaluation under a dedicated dynamic_filter_eval_time
  metric instead of the baseline elapsed_compute, so the metrics merge
  into the join node no longer inflates its reported compute time.
- Return the installed wrapper from attach_join_dynamic_filter instead
  of re-finding it (removes find_dynamic_filter_wrapper and the
  duplicated projection-descent traversal).
- Deduplicate the attach/register logic in the planner behind a single
  apply_join_dynamic_filter helper used by both HashJoin branches.
- Fix the config doc to list only Spark-reachable join types (right
  semi is never mapped by the serde).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p
@schenksj

schenksj commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

Benchmark result

Running benchmark: sparse selective build (membership pruning)
  Running case: Spark
  Stopped after 10 iterations, 2097 ms
  Running case: Comet (dynamic filter off)
  Stopped after 18 iterations, 2004 ms
  Running case: Comet (dynamic filter on)
  Stopped after 20 iterations, 2077 ms

OpenJDK 64-Bit Server VM 19.0.1+10-21 on Mac OS X 26.5
Apple M1
sparse selective build (membership pruning):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------
Spark                                                  176            210          22         95.2          10.5       1.0X
Comet (dynamic filter off)                              96            111          13        175.3           5.7       1.8X
Comet (dynamic filter on)                               91            104          11        183.9           5.4       1.9X

Running benchmark: clustered selective build (bounds pruning)
  Running case: Spark
  Stopped after 12 iterations, 2221 ms
  Running case: Comet (dynamic filter off)
  Stopped after 22 iterations, 2053 ms
  Running case: Comet (dynamic filter on)
  Stopped after 23 iterations, 2025 ms

OpenJDK 64-Bit Server VM 19.0.1+10-21 on Mac OS X 26.5
Apple M1
clustered selective build (bounds pruning):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------
Spark                                                 131            185          55        128.4           7.8       1.0X
Comet (dynamic filter off)                             83             93           7        201.2           5.0       1.6X
Comet (dynamic filter on)                              78             88           6        214.4           4.7       1.7X

Running benchmark: non-selective build (guard disables filter)
  Running case: Spark
  Stopped after 14 iterations, 2175 ms
  Running case: Comet (dynamic filter off)
  Stopped after 18 iterations, 2070 ms
  Running case: Comet (dynamic filter on)
  Stopped after 18 iterations, 2014 ms

OpenJDK 64-Bit Server VM 19.0.1+10-21 on Mac OS X 26.5
Apple M1
non-selective build (guard disables filter):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------
Spark                                                  140            155          12        119.5           8.4       1.0X
Comet (dynamic filter off)                             100            115          12        168.3           5.9       1.4X
Comet (dynamic filter on)                               98            112          12        171.1           5.8       1.4X

@schenksj schenksj marked this pull request as ready for review July 3, 2026 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Runtime dynamic filter pushdown for native hash joins (build side -> probe side)

1 participant