You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Disclosure: this issue was drafted with the help of an AI assistant .
Follow-up to #4807 (runtime dynamic filter pushdown for native hash joins).
The generic phase of #4807 applies the join's DynamicFilterPhysicalExpr as a row-level filter above the probe-side scan, which CometIcebergNativeScan / IcebergScanExec inherits with no format-specific work. This issue covers the IO-level upgrade so the filter also prunes row groups / pages inside iceberg-rust instead of only dropping rows post-decode.
Why it is not free
IcebergScanExec is not a DataSourceExec/ParquetSource — it executes pre-planned FileScanTasks through iceberg-rust's ArrowReaderBuilder (native/core/src/execution/operators/iceberg_scan.rs), with predicates carried as iceberg::expr::Predicate bound at plan time. There is no runtime filter-pushdown surface to hand a DynamicFilterPhysicalExpr to.
Proposed shape
Deferred reader construction: move the ArrowReaderBuilder/task-stream construction from execute() into the stream's first poll (the same futures::stream::once(async ...) pattern the file already uses for delete-file-size prefetch), so the populated dynamic filter can be snapshotted after the join build completes (build resolves before the probe stream is polled).
Expression conversion: convert the convertible conjuncts of the snapshotted filter — col >= min AND col <= max bounds always; InList -> is_in when the build side was small — into iceberg::expr::Predicate, bind, and attach to each FileScanTask. The HashTableLookupExpr membership strategy (large build sides) has no iceberg equivalent; keep only the bounds in that case (the row-level wrapper from feat: Runtime dynamic filter pushdown for native hash joins (build side -> probe side) #4807 still applies the full expression above the scan).
Literal conversion can reuse the existing protobuf->iceberg::spec::Literal code (planner.rs:3504+).
Respect transforms/schema evolution when binding; bail out (keep row-level only) on anything not cleanly convertible.
Scope: row-group/page/row-selection pruning inside iceberg-rust only. File-level pruning is out of scope — the file list is resolved driver-side (Spark/Iceberg runtime filtering already covers coarse file pruning for partition-derived columns).
Decision input
The dynamic_filter_rows_pruned / selectivity metrics added in #4807 indicate whether this upgrade is worth scheduling: high observed selectivity on Iceberg star joins implies row-group skipping would have triggered.
Follow-up to #4807 (runtime dynamic filter pushdown for native hash joins).
The generic phase of #4807 applies the join's
DynamicFilterPhysicalExpras a row-level filter above the probe-side scan, whichCometIcebergNativeScan/IcebergScanExecinherits with no format-specific work. This issue covers the IO-level upgrade so the filter also prunes row groups / pages inside iceberg-rust instead of only dropping rows post-decode.Why it is not free
IcebergScanExecis not aDataSourceExec/ParquetSource— it executes pre-plannedFileScanTasks through iceberg-rust'sArrowReaderBuilder(native/core/src/execution/operators/iceberg_scan.rs), with predicates carried asiceberg::expr::Predicatebound at plan time. There is no runtime filter-pushdown surface to hand aDynamicFilterPhysicalExprto.Proposed shape
ArrowReaderBuilder/task-stream construction fromexecute()into the stream's first poll (the samefutures::stream::once(async ...)pattern the file already uses for delete-file-size prefetch), so the populated dynamic filter can be snapshotted after the join build completes (build resolves before the probe stream is polled).col >= min AND col <= maxbounds always;InList->is_inwhen the build side was small — intoiceberg::expr::Predicate, bind, and attach to eachFileScanTask. TheHashTableLookupExprmembership strategy (large build sides) has no iceberg equivalent; keep only the bounds in that case (the row-level wrapper from feat: Runtime dynamic filter pushdown for native hash joins (build side -> probe side) #4807 still applies the full expression above the scan).iceberg::spec::Literalcode (planner.rs:3504+).Decision input
The
dynamic_filter_rows_pruned/ selectivity metrics added in #4807 indicate whether this upgrade is worth scheduling: high observed selectivity on Iceberg star joins implies row-group skipping would have triggered.