Skip to content

feat: Runtime dynamic filter pushdown for native hash joins (build side -> probe side) #4807

Description

@schenksj

Disclosure: this issue was drafted with the help of an AI assistant .

What

Wire up runtime dynamic filter pushdown (a.k.a. runtime filtering / sideways information passing) for Comet's native hash joins: once the build side completes, apply a build-key-derived predicate to probe-side batches before the hash probe.

DataFusion 54 already ships the machinery — HashJoinExec supports an attached DynamicFilterPhysicalExpr (placeholder lit(true), updated after the build with min/max bounds plus InList or hash-table-lookup membership via SharedBuildAccumulator), and with_dynamic_filter_expr is public. Comet never benefits because the consumer-side wiring lives in DataFusion's physical optimizer (FilterPushdown rule), and Comet constructs its physical plan directly without running optimizer rules.

This class of optimization is one of the largest TPC-DS levers in comparable engines (Velox HashProbe::pushdownDynamicFilters; Databricks DFP reports up to 8x on selective star joins).

Proposed design (generic, scan-agnostic phase)

In the planner's OpStruct::HashJoin arm, when eligible:

  1. Create DynamicFilterPhysicalExpr::new(probe_keys, lit(true)) from the final (post-swap_inputs) join's on() probe-side keys — always valid against the probe child schema, no column remapping.
  2. Attach it to the join via with_dynamic_filter_expr (rebuild through HashJoinExecBuilder).
  3. Wrap the join's probe child in a new CometDynamicFilterExec operator evaluating the same Arc:
    • pass-through fast path while the filter is the scalar-true placeholder (correctness never depends on population),
    • filter_record_batch once populated,
    • per-stream selectivity guard (auto-disable when observed selectivity > ~0.95 after 64k rows),
    • metrics: dynamic_filter_rows_pruned, engaged flag.

Eligibility gate (mirrors DataFusion's own): join_type.on_lr_is_preserved().1 (Inner, LeftOuter, LeftSemi, RightSemi, LeftAnti, LeftMark), not null-aware anti join, and a new opt-in config spark.comet.exec.join.dynamicFilter.enabled (default false initially) carried in the HashJoin proto message.

Timing works in Comet's partition model: each native execution corresponds to one Spark partition, so the build accumulator completes after that task's build and the filter is populated before probe batches flow. BroadcastHashJoin (star joins) gets a broadcast-wide filter; ShuffledHashJoin gets a per-partition-tight filter.

What this phase does NOT do

  • No IO-level pruning inside scans (row-group/page skipping) — follow-up issues cover IcebergScanExec and the Delta contrib scan (both kernel-read operators, not ParquetSource); the native_datafusion ParquetSource IO-level integration is likewise a candidate follow-up.
  • No deep insertion below projections/filters on the probe side (keys would need remapping); the wrapper wraps the join's immediate probe child.

Alternatives considered

  • Running DataFusion's FilterPushdown physical optimizer rule over the built plan: would get ParquetSource IO-level for free, but the rule can restructure the plan (absorb FilterExecs), breaking Comet's native-operator-to-Spark-node metrics mapping.
  • Stock FilterExec with the dynamic filter as predicate: no pre-population fast path, no selectivity guard.

Test plan

Rust unit tests for the operator (pass-through, post-update filtering, empty-build scalar-false, guard) and planner gating (join types, swap branch); Scala join suite parity runs (BHJ + SHJ; inner/left/semi/anti; empty build; NULL keys) with the config enabled, asserting dynamic_filter_rows_pruned > 0 on a selective join.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions