feat: Runtime dynamic filter pushdown for native hash joins#4810
Open
schenksj wants to merge 5 commits into
Open
feat: Runtime dynamic filter pushdown for native hash joins#4810schenksj wants to merge 5 commits into
schenksj wants to merge 5 commits into
Conversation
Wires DataFusion's join dynamic filter machinery into Comet: when spark.comet.exec.join.dynamicFilter.enabled is set, eligible native hash joins attach a DynamicFilterPhysicalExpr (populated by the build phase with min/max bounds plus InList or hash-table-lookup membership) and the probe child is wrapped in a new CometDynamicFilterExec that applies the same filter to probe batches before the hash probe. The wrapper passes batches through untouched while the filter is still the constant-true placeholder, so correctness never depends on population; a selectivity guard disables evaluation on streams where the filter prunes few rows, and a dynamic_filter_rows_pruned metric records effectiveness. Eligibility mirrors DataFusion's own gate (JoinType::on_lr_is_preserved probe side) and excludes null-aware anti joins. Closes apache#4807 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p
Registers the probe-side CometDynamicFilterExec in SparkPlan::new_with_additional so dynamic_filter_rows_pruned and elapsed-compute metrics roll up to the join node in the Spark UI. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p
- Mirror DataFusion's full allow_join_dynamic_filter_pushdown gate in attach_join_dynamic_filter (session flag + preserve_file_partitions with Partitioned mode), with a test; DataFusion re-checks the same gate at execute time, so this also avoids installing wrappers that can never engage. - Time filter evaluation under a dedicated dynamic_filter_eval_time metric instead of the baseline elapsed_compute, so the metrics merge into the join node no longer inflates its reported compute time. - Return the installed wrapper from attach_join_dynamic_filter instead of re-finding it (removes find_dynamic_filter_wrapper and the duplicated projection-descent traversal). - Deduplicate the attach/register logic in the planner behind a single apply_join_dynamic_filter helper used by both HashJoin branches. - Fix the config doc to list only Spark-reachable join types (right semi is never mapped by the serde). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p
Contributor
Author
Benchmark result |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #4807.
Follow-up issues for IO-level pushdown into kernel scans: #4808 (Iceberg), #4809 (Delta).
Rationale for this change
DataFusion 54 ships runtime dynamic filtering for hash joins (
HashJoinExec+DynamicFilterPhysicalExpr+SharedBuildAccumulator), but the consumer-side wiring lives in DataFusion's physical optimizer, which Comet does not run — so Comet never benefits. Build-side runtime filtering is one of the largest TPC-DS levers in comparable engines (VeloxHashProbe::pushdownDynamicFilters; Databricks DFP). This PR wires it up on Comet's side, scan-agnostically, so all probe-side scans (ParquetDataSourceExec, JVM-fedScanExec, Iceberg, and future Delta kernel scans) inherit row-level filtering.What changes are included in this PR?
CometDynamicFilterExecoperator (native/core/src/execution/operators/dynamic_filter.rs): evaluates the join's sharedDynamicFilterPhysicalExpragainst probe-side batches before the hash probe, withtrueplaceholder (correctness never depends on population),dynamic_filter_rows_pruned+ baseline elapsed-compute/output-rows.attach_join_dynamic_filterhelper: rewires an eligibleHashJoinExec(including theProjectionExec-over-join shape produced byswap_inputs) so the join and the new probe-side wrapper share one filter, via the publicHashJoinExecBuilder+with_dynamic_filter_exprAPIs.JoinType::on_lr_is_preserved().1— Inner, LeftOuter, LeftSemi/RightSemi, LeftAnti, LeftMark); null-aware anti joins are excluded.HashJoin.dynamic_filter_enabled, and a new configspark.comet.exec.join.dynamicFilter.enabled(defaultfalse, experimental).Timing note: each Comet native execution corresponds to a single Spark partition, so the build accumulator completes after that task's build and the filter is populated before probe batches flow. Broadcast joins get a broadcast-wide filter; shuffled hash joins get a per-partition filter. If DataFusion declines to populate (session flag off, etc.), the placeholder stays
trueand the wrapper is a no-op.How are these changes tested?
native/core/src/execution/operators/dynamic_filter.rs): pass-through pre-population, filtering afterupdate(), constant-false (empty build) pruning, metric counting; end-to-endHashJoinExecexecution asserting identical results with and without the filter attached anddynamic_filter_rows_pruned > 0(proving the build phase populates the manually-attached filter); attach-helper gating for ineligible join types (Right/Full/RightAnti). Full native suite passes (128 tests), clippy + rustfmt clean.CometJoinSuite: "HashJoin with dynamic filter enabled"): result parity vs. Spark with the config enabled across broadcast + shuffled hash joins, inner/left-outer/semi/anti, a selective build side, an empty build side, and NULL join keys. Verified under the spark-3.5 profile.🤖 Generated with Claude Code
https://claude.ai/code/session_019dAmX1gT713ChZjYtKKb7p