Disclosure: this issue was drafted with the help of an AI assistant .
What
Wire up runtime dynamic filter pushdown (a.k.a. runtime filtering / sideways information passing) for Comet's native hash joins: once the build side completes, apply a build-key-derived predicate to probe-side batches before the hash probe.
DataFusion 54 already ships the machinery — HashJoinExec supports an attached DynamicFilterPhysicalExpr (placeholder lit(true), updated after the build with min/max bounds plus InList or hash-table-lookup membership via SharedBuildAccumulator), and with_dynamic_filter_expr is public. Comet never benefits because the consumer-side wiring lives in DataFusion's physical optimizer (FilterPushdown rule), and Comet constructs its physical plan directly without running optimizer rules.
This class of optimization is one of the largest TPC-DS levers in comparable engines (Velox HashProbe::pushdownDynamicFilters; Databricks DFP reports up to 8x on selective star joins).
Proposed design (generic, scan-agnostic phase)
In the planner's OpStruct::HashJoin arm, when eligible:
- Create
DynamicFilterPhysicalExpr::new(probe_keys, lit(true)) from the final (post-swap_inputs) join's on() probe-side keys — always valid against the probe child schema, no column remapping.
- Attach it to the join via
with_dynamic_filter_expr (rebuild through HashJoinExecBuilder).
- Wrap the join's probe child in a new
CometDynamicFilterExec operator evaluating the same Arc:
- pass-through fast path while the filter is the scalar-
true placeholder (correctness never depends on population),
filter_record_batch once populated,
- per-stream selectivity guard (auto-disable when observed selectivity > ~0.95 after 64k rows),
- metrics:
dynamic_filter_rows_pruned, engaged flag.
Eligibility gate (mirrors DataFusion's own): join_type.on_lr_is_preserved().1 (Inner, LeftOuter, LeftSemi, RightSemi, LeftAnti, LeftMark), not null-aware anti join, and a new opt-in config spark.comet.exec.join.dynamicFilter.enabled (default false initially) carried in the HashJoin proto message.
Timing works in Comet's partition model: each native execution corresponds to one Spark partition, so the build accumulator completes after that task's build and the filter is populated before probe batches flow. BroadcastHashJoin (star joins) gets a broadcast-wide filter; ShuffledHashJoin gets a per-partition-tight filter.
What this phase does NOT do
- No IO-level pruning inside scans (row-group/page skipping) — follow-up issues cover
IcebergScanExec and the Delta contrib scan (both kernel-read operators, not ParquetSource); the native_datafusion ParquetSource IO-level integration is likewise a candidate follow-up.
- No deep insertion below projections/filters on the probe side (keys would need remapping); the wrapper wraps the join's immediate probe child.
Alternatives considered
- Running DataFusion's
FilterPushdown physical optimizer rule over the built plan: would get ParquetSource IO-level for free, but the rule can restructure the plan (absorb FilterExecs), breaking Comet's native-operator-to-Spark-node metrics mapping.
- Stock
FilterExec with the dynamic filter as predicate: no pre-population fast path, no selectivity guard.
Test plan
Rust unit tests for the operator (pass-through, post-update filtering, empty-build scalar-false, guard) and planner gating (join types, swap branch); Scala join suite parity runs (BHJ + SHJ; inner/left/semi/anti; empty build; NULL keys) with the config enabled, asserting dynamic_filter_rows_pruned > 0 on a selective join.
What
Wire up runtime dynamic filter pushdown (a.k.a. runtime filtering / sideways information passing) for Comet's native hash joins: once the build side completes, apply a build-key-derived predicate to probe-side batches before the hash probe.
DataFusion 54 already ships the machinery —
HashJoinExecsupports an attachedDynamicFilterPhysicalExpr(placeholderlit(true), updated after the build withmin/maxbounds plusInListor hash-table-lookup membership viaSharedBuildAccumulator), andwith_dynamic_filter_expris public. Comet never benefits because the consumer-side wiring lives in DataFusion's physical optimizer (FilterPushdownrule), and Comet constructs its physical plan directly without running optimizer rules.This class of optimization is one of the largest TPC-DS levers in comparable engines (Velox
HashProbe::pushdownDynamicFilters; Databricks DFP reports up to 8x on selective star joins).Proposed design (generic, scan-agnostic phase)
In the planner's
OpStruct::HashJoinarm, when eligible:DynamicFilterPhysicalExpr::new(probe_keys, lit(true))from the final (post-swap_inputs) join'son()probe-side keys — always valid against the probe child schema, no column remapping.with_dynamic_filter_expr(rebuild throughHashJoinExecBuilder).CometDynamicFilterExecoperator evaluating the sameArc:trueplaceholder (correctness never depends on population),filter_record_batchonce populated,dynamic_filter_rows_pruned, engaged flag.Eligibility gate (mirrors DataFusion's own):
join_type.on_lr_is_preserved().1(Inner, LeftOuter, LeftSemi, RightSemi, LeftAnti, LeftMark), not null-aware anti join, and a new opt-in configspark.comet.exec.join.dynamicFilter.enabled(default false initially) carried in theHashJoinproto message.Timing works in Comet's partition model: each native execution corresponds to one Spark partition, so the build accumulator completes after that task's build and the filter is populated before probe batches flow. BroadcastHashJoin (star joins) gets a broadcast-wide filter; ShuffledHashJoin gets a per-partition-tight filter.
What this phase does NOT do
IcebergScanExecand the Delta contrib scan (both kernel-read operators, notParquetSource); thenative_datafusionParquetSourceIO-level integration is likewise a candidate follow-up.Alternatives considered
FilterPushdownphysical optimizer rule over the built plan: would getParquetSourceIO-level for free, but the rule can restructure the plan (absorbFilterExecs), breaking Comet's native-operator-to-Spark-node metrics mapping.FilterExecwith the dynamic filter as predicate: no pre-population fast path, no selectivity guard.Test plan
Rust unit tests for the operator (pass-through, post-update filtering, empty-build scalar-false, guard) and planner gating (join types, swap branch); Scala join suite parity runs (BHJ + SHJ; inner/left/semi/anti; empty build; NULL keys) with the config enabled, asserting
dynamic_filter_rows_pruned > 0on a selective join.