From f8a5677aa1540a0199e90110e3850ad3bb33fb02 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sun, 24 May 2026 19:39:10 -0500 Subject: [PATCH 01/10] feat(io): add pyiceberg-core scan adapters --- pyiceberg/io/pyiceberg_core.py | 293 +++++++++++++++++++++++++++++ tests/io/test_pyiceberg_core.py | 316 ++++++++++++++++++++++++++++++++ 2 files changed, 609 insertions(+) create mode 100644 pyiceberg/io/pyiceberg_core.py create mode 100644 tests/io/test_pyiceberg_core.py diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py new file mode 100644 index 0000000000..f49bfb48da --- /dev/null +++ b/pyiceberg/io/pyiceberg_core.py @@ -0,0 +1,293 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Internal adapters from PyIceberg models to pyiceberg-core objects.""" + +from __future__ import annotations + +import importlib +from typing import TYPE_CHECKING, Any + +from pyiceberg.expressions import ( + AlwaysFalse, + AlwaysTrue, + And, + BooleanExpression, + BoundPredicate, + BoundTerm, + BoundUnaryPredicate, + EqualTo, + GreaterThan, + GreaterThanOrEqual, + In, + IsNull, + LessThan, + LessThanOrEqual, + LiteralPredicate, + Not, + NotEqualTo, + NotIn, + NotNull, + NotStartsWith, + Or, + Reference, + SetPredicate, + StartsWith, + UnaryPredicate, + UnboundTerm, +) +from pyiceberg.io import FileIO +from pyiceberg.manifest import DataFile +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import FileScanTask +from pyiceberg.table.name_mapping import NameMapping +from pyiceberg.typedef import Record + +if TYPE_CHECKING: + from collections.abc import Iterable + + +def _core_module(name: str) -> Any: + """Import a pyiceberg-core submodule lazily so PyIceberg can run without the Rust wheel.""" + try: + return importlib.import_module(f"pyiceberg_core.{name}") + except ModuleNotFoundError as exc: + if exc.name == f"pyiceberg_core.{name}": + raise NotImplementedError( + "The installed pyiceberg-core wheel does not expose native scan bindings. " + "Install a pyiceberg-core build that includes schema, expression, file_io, and scan modules." + ) from exc + raise ModuleNotFoundError( + 'pyiceberg-core is required for native scan adapters. Install it with `pip install "pyiceberg[pyiceberg-core]"`.' + ) from exc + + +def _model_json(value: Any) -> str: + return value.model_dump_json(by_alias=True, exclude_none=True) + + +def schema_to_pyiceberg_core(schema: Schema) -> Any: + """Convert a PyIceberg Schema to a pyiceberg-core schema object.""" + return _core_module("schema").Schema.from_json(_model_json(schema)) + + +def file_io_to_pyiceberg_core(file_io: FileIO) -> Any: + """Convert a PyIceberg FileIO to a pyiceberg-core FileIO-like object.""" + return _core_module("file_io").FileIO.from_props(dict(file_io.properties)) + + +def _literal_value(value: Any) -> Any: + return getattr(value, "value", value) + + +def _term_name(term: UnboundTerm | BoundTerm) -> str: + if isinstance(term, BoundTerm): + ref = term.ref() + return ref.field.name + if isinstance(term, Reference): + return term.name + + raise NotImplementedError(f"Cannot convert expression term {term!r} to pyiceberg_core") + + +def _term_field_id(term: UnboundTerm | BoundTerm, schema: Schema, case_sensitive: bool) -> int: + if isinstance(term, BoundTerm): + return term.ref().field.field_id + if isinstance(term, Reference): + return schema.find_field(term.name, case_sensitive=case_sensitive).field_id + + raise NotImplementedError(f"Cannot collect field id for expression term {term!r}") + + +def _expression_field_ids(expr: BooleanExpression, schema: Schema, case_sensitive: bool) -> set[int]: + if isinstance(expr, (AlwaysTrue, AlwaysFalse)): + return set() + if isinstance(expr, (And, Or)): + return _expression_field_ids(expr.left, schema, case_sensitive) | _expression_field_ids( + expr.right, schema, case_sensitive + ) + if isinstance(expr, Not): + return _expression_field_ids(expr.child, schema, case_sensitive) + if isinstance(expr, BoundPredicate): + return {_term_field_id(expr.term, schema, case_sensitive)} + if isinstance(expr, (UnaryPredicate, LiteralPredicate, SetPredicate)): + return {_term_field_id(expr.term, schema, case_sensitive)} + + raise NotImplementedError(f"Cannot collect field ids for unsupported PyIceberg expression {expr!r}") + + +def _read_field_ids( + projected_schema: Schema, + residual: BooleanExpression, + schema: Schema, + case_sensitive: bool, + project_field_ids: Iterable[int] | None, +) -> list[int]: + ids = list(project_field_ids) if project_field_ids is not None else list(projected_schema.field_ids) + seen = set(ids) + for field_id in sorted(_expression_field_ids(residual, schema, case_sensitive)): + if field_id not in seen: + ids.append(field_id) + seen.add(field_id) + return ids + + +_UNARY_METHODS: dict[type[BooleanExpression], str] = { + IsNull: "is_null", + NotNull: "is_not_null", +} + +_LITERAL_METHODS: dict[type[BooleanExpression], str] = { + EqualTo: "eq", + NotEqualTo: "ne", + LessThan: "lt", + LessThanOrEqual: "lte", + GreaterThan: "gt", + GreaterThanOrEqual: "gte", + StartsWith: "starts_with", + NotStartsWith: "not_starts_with", +} + +_SET_METHODS: dict[type[BooleanExpression], str] = { + In: "is_in", + NotIn: "is_not_in", +} + + +def expression_to_pyiceberg_core( + expr: BooleanExpression, + schema: Schema | None = None, + case_sensitive: bool = True, +) -> Any: + """Convert a PyIceberg BooleanExpression to a pyiceberg-core expression object.""" + expression = _core_module("expression") + if isinstance(expr, AlwaysTrue): + return expression.Predicate.always_true() + if isinstance(expr, AlwaysFalse): + return expression.Predicate.always_false() + if isinstance(expr, And): + return expression_to_pyiceberg_core(expr.left, schema, case_sensitive).and_( + expression_to_pyiceberg_core(expr.right, schema, case_sensitive) + ) + if isinstance(expr, Or): + return expression_to_pyiceberg_core(expr.left, schema, case_sensitive).or_( + expression_to_pyiceberg_core(expr.right, schema, case_sensitive) + ) + if isinstance(expr, Not): + return expression_to_pyiceberg_core(expr.child, schema, case_sensitive).negate() + + if isinstance(expr, BoundPredicate): + return _bound_predicate_to_pyiceberg_core(expr) + + if isinstance(expr, (UnaryPredicate, LiteralPredicate, SetPredicate)): + if schema is None: + raise NotImplementedError(f"Cannot convert unbound expression {expr!r} without a Schema") + return expression_to_pyiceberg_core(expr.bind(schema, case_sensitive=case_sensitive), schema, case_sensitive) + + raise NotImplementedError(f"Cannot convert unsupported PyIceberg expression {expr!r} to pyiceberg_core") + + +def _bound_predicate_to_pyiceberg_core(expr: BoundPredicate) -> Any: + ref = _core_module("expression").Reference(_term_name(expr.term)) + + if isinstance(expr, BoundUnaryPredicate): + method = _UNARY_METHODS.get(expr.as_unbound) + if method is None: + raise NotImplementedError(f"Cannot convert unsupported unary predicate {expr!r} to pyiceberg_core") + return getattr(ref, method)() + + if isinstance(expr, LiteralPredicate): + raise NotImplementedError(f"Expected a bound literal predicate, got unbound predicate {expr!r}") + + if hasattr(expr, "literal"): + method = _LITERAL_METHODS.get(expr.as_unbound) + if method is None: + raise NotImplementedError(f"Cannot convert unsupported literal predicate {expr!r} to pyiceberg_core") + return getattr(ref, method)(_literal_value(expr.literal)) + + if hasattr(expr, "literals"): + method = _SET_METHODS.get(expr.as_unbound) + if method is None: + raise NotImplementedError(f"Cannot convert unsupported set predicate {expr!r} to pyiceberg_core") + return getattr(ref, method)([_literal_value(lit) for lit in expr.literals]) + + raise NotImplementedError(f"Cannot convert unsupported bound predicate {expr!r} to pyiceberg_core") + + +def _record_to_values(record: Record | None) -> list[Any] | None: + if record is None: + return None + return [record[pos] for pos in range(len(record))] + + +def _file_format_value(data_file: DataFile) -> str: + file_format = data_file.file_format + return getattr(file_format, "value", file_format).lower() + + +def delete_file_to_pyiceberg_core(delete_file: DataFile) -> Any: + """Convert a PyIceberg delete DataFile to a pyiceberg-core DeleteFile.""" + content = int(delete_file.content) + if content == 1: + file_type = "position-deletes" + elif content == 2: + raise NotImplementedError("pyiceberg-core equality delete scan parity is tracked separately") + else: + raise ValueError(f"Expected a delete file, got data file content {delete_file.content!r}") + + return _core_module("scan").DeleteFile( + delete_file.file_path, + delete_file.file_size_in_bytes, + file_type, + partition_spec_id=delete_file.spec_id, + equality_ids=delete_file.equality_ids, + ) + + +def file_scan_task_to_pyiceberg_core( + task: FileScanTask, + schema: Schema, + projected_schema: Schema | None = None, + partition_spec: PartitionSpec | None = None, + name_mapping: NameMapping | None = None, + case_sensitive: bool = True, + project_field_ids: Iterable[int] | None = None, +) -> Any: + """Convert a PyIceberg FileScanTask to a pyiceberg-core file scan task object.""" + projected = projected_schema or schema + field_ids = _read_field_ids(projected, task.residual, schema, case_sensitive, project_field_ids) + file_size_in_bytes = task.file.file_size_in_bytes + partition_data = _record_to_values(task.file.partition) + if partition_data and partition_spec is None: + raise ValueError("partition_spec is required when converting a partitioned FileScanTask") + + return _core_module("scan").FileScanTask( + schema=schema_to_pyiceberg_core(schema), + data_file_path=task.file.file_path, + file_size_in_bytes=file_size_in_bytes, + project_field_ids=field_ids, + start=0, + length=file_size_in_bytes, + record_count=task.file.record_count, + data_file_format=_file_format_value(task.file), + predicate=expression_to_pyiceberg_core(task.residual, schema, case_sensitive), + deletes=[delete_file_to_pyiceberg_core(delete_file) for delete_file in task.delete_files], + partition_data=partition_data, + partition_spec=_model_json(partition_spec) if partition_spec is not None else None, + name_mapping=_model_json(name_mapping) if name_mapping is not None else None, + case_sensitive=case_sensitive, + ) diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py new file mode 100644 index 0000000000..0e1fd0f95b --- /dev/null +++ b/tests/io/test_pyiceberg_core.py @@ -0,0 +1,316 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import sys +from types import ModuleType +from typing import Any + +import pytest + +from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith +from pyiceberg.io import FileIO, InputFile, OutputFile +from pyiceberg.io.pyiceberg_core import ( + delete_file_to_pyiceberg_core, + expression_to_pyiceberg_core, + file_io_to_pyiceberg_core, + file_scan_task_to_pyiceberg_core, + schema_to_pyiceberg_core, +) +from pyiceberg.manifest import DataFile, DataFileContent +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import FileScanTask +from pyiceberg.table.name_mapping import MappedField, NameMapping +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import Record +from pyiceberg.types import FloatType, IntegerType, NestedField, StringType + + +class CoreObject: + def __init__(self, *args: Any, **kwargs: Any) -> None: + self.args = args + self.kwargs = kwargs + + +class CoreSchema(CoreObject): + @classmethod + def from_json(cls, schema_json: str) -> CoreSchema: + return cls(schema_json=schema_json) + + +class CoreFileIO(CoreObject): + @classmethod + def from_props(cls, properties: dict[str, str]) -> CoreFileIO: + return cls(properties=properties) + + +class CorePredicate(CoreObject): + @staticmethod + def always_true() -> CorePredicate: + return CorePredicate(kind="always_true") + + @staticmethod + def always_false() -> CorePredicate: + return CorePredicate(kind="always_false") + + def and_(self, other: CorePredicate) -> CorePredicate: + return CorePredicate(op="and", left=self, right=other) + + def or_(self, other: CorePredicate) -> CorePredicate: + return CorePredicate(op="or", left=self, right=other) + + def negate(self) -> CorePredicate: + return CorePredicate(op="not", child=self) + + +class CoreReference(CoreObject): + def _predicate(self, op: str, *args: Any) -> CorePredicate: + return CorePredicate(op=op, name=self.args[0], args=args) + + def eq(self, value: Any) -> CorePredicate: + return self._predicate("eq", value) + + def starts_with(self, value: Any) -> CorePredicate: + return self._predicate("starts_with", value) + + def is_null(self) -> CorePredicate: + return self._predicate("is_null") + + +class FakeFileIO(FileIO): + def new_input(self, location: str) -> InputFile: + raise NotImplementedError + + def new_output(self, location: str) -> OutputFile: + raise NotImplementedError + + def delete(self, location: str | InputFile | OutputFile) -> None: + raise NotImplementedError + + +@pytest.fixture(autouse=True) +def fake_pyiceberg_core(monkeypatch: pytest.MonkeyPatch) -> None: + root = ModuleType("pyiceberg_core") + + schema: Any = ModuleType("pyiceberg_core.schema") + schema.Schema = CoreSchema + + file_io: Any = ModuleType("pyiceberg_core.file_io") + file_io.FileIO = CoreFileIO + + expression: Any = ModuleType("pyiceberg_core.expression") + expression.Predicate = CorePredicate + expression.Reference = CoreReference + + scan: Any = ModuleType("pyiceberg_core.scan") + scan.DeleteFile = CoreObject + scan.FileScanTask = CoreObject + + monkeypatch.setitem(sys.modules, "pyiceberg_core", root) + monkeypatch.setitem(sys.modules, "pyiceberg_core.schema", schema) + monkeypatch.setitem(sys.modules, "pyiceberg_core.file_io", file_io) + monkeypatch.setitem(sys.modules, "pyiceberg_core.expression", expression) + monkeypatch.setitem(sys.modules, "pyiceberg_core.scan", scan) + + +def test_schema_to_pyiceberg_core_feature_gates_old_core_wheels(monkeypatch: pytest.MonkeyPatch, simple_schema: Schema) -> None: + monkeypatch.setitem(sys.modules, "pyiceberg_core", ModuleType("pyiceberg_core")) + monkeypatch.delitem(sys.modules, "pyiceberg_core.schema", raising=False) + + with pytest.raises(NotImplementedError, match="does not expose native scan bindings"): + schema_to_pyiceberg_core(simple_schema) + + +@pytest.fixture +def simple_schema() -> Schema: + return Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "data", StringType()), + schema_id=3, + ) + + +def test_schema_to_pyiceberg_core_uses_lazy_core_schema_json(simple_schema: Schema) -> None: + converted = schema_to_pyiceberg_core(simple_schema) + + assert converted.kwargs["schema_json"] == simple_schema.model_dump_json(by_alias=True, exclude_none=True) + + +def test_file_io_to_pyiceberg_core_uses_file_io_properties() -> None: + converted = file_io_to_pyiceberg_core(FakeFileIO(properties={"s3.region": "us-east-1"})) + + assert converted.kwargs == {"properties": {"s3.region": "us-east-1"}} + + +def test_expression_to_pyiceberg_core_converts_expression_tree(simple_schema: Schema) -> None: + converted = expression_to_pyiceberg_core(And(EqualTo("id", 34), StartsWith("data", "abc")), simple_schema) + + assert converted.kwargs["op"] == "and" + assert converted.kwargs["left"].kwargs == {"op": "eq", "name": "id", "args": (34,)} + assert converted.kwargs["right"].kwargs == {"op": "starts_with", "name": "data", "args": ("abc",)} + + +def test_expression_to_pyiceberg_core_converts_unary_expression(simple_schema: Schema) -> None: + converted = expression_to_pyiceberg_core(IsNull("data"), simple_schema) + + assert converted.kwargs == {"op": "is_null", "name": "data", "args": ()} + + +def test_expression_to_pyiceberg_core_requires_schema_for_unbound_expression() -> None: + with pytest.raises(NotImplementedError, match="without a Schema"): + expression_to_pyiceberg_core(EqualTo("id", 34)) + + +def test_expression_to_pyiceberg_core_raises_clear_error_for_unsupported_expression() -> None: + from pyiceberg.expressions import IsNaN + + nan_schema = Schema(NestedField(1, "value", FloatType())) + with pytest.raises(NotImplementedError, match="unsupported unary predicate"): + expression_to_pyiceberg_core(IsNaN("value"), nan_schema) + + +def test_delete_file_to_pyiceberg_core_converts_delete_file_payload() -> None: + delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path="s3://warehouse/table/delete.parquet", + file_format="PARQUET", + partition=Record("bucket-1"), + record_count=1, + file_size_in_bytes=123, + column_sizes={}, + value_counts={}, + null_value_counts={}, + nan_value_counts={}, + lower_bounds={}, + upper_bounds={}, + ) + delete_file.spec_id = 7 + + converted = delete_file_to_pyiceberg_core(delete_file) + + assert converted.args == ("s3://warehouse/table/delete.parquet", 123, "position-deletes") + assert converted.kwargs == {"partition_spec_id": 7, "equality_ids": None} + + +def test_delete_file_to_pyiceberg_core_rejects_equality_deletes_until_parity_lands() -> None: + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path="s3://warehouse/table/eq-delete.parquet", + file_format="PARQUET", + partition=Record(), + record_count=1, + file_size_in_bytes=123, + column_sizes={}, + value_counts={}, + null_value_counts={}, + nan_value_counts={}, + lower_bounds={}, + upper_bounds={}, + equality_ids=[1], + ) + delete_file.spec_id = 7 + + with pytest.raises(NotImplementedError, match="equality delete scan parity"): + delete_file_to_pyiceberg_core(delete_file) + + +def test_file_scan_task_to_pyiceberg_core_converts_task_payload(simple_schema: Schema) -> None: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://warehouse/table/data.parquet", + file_format="PARQUET", + partition=Record("bucket-1"), + record_count=10, + file_size_in_bytes=1234, + column_sizes={1: 20}, + value_counts={1: 10}, + null_value_counts={1: 0}, + nan_value_counts={}, + lower_bounds={1: b"\x01\x00\x00\x00"}, + upper_bounds={1: b"\x0a\x00\x00\x00"}, + ) + data_file.spec_id = 7 + partition_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id")) + name_mapping = NameMapping([MappedField(field_id=1, names=["id"])]) + + task = FileScanTask(data_file, residual=EqualTo("id", 3)) + converted = file_scan_task_to_pyiceberg_core( + task, + simple_schema, + partition_spec=partition_spec, + name_mapping=name_mapping, + case_sensitive=False, + ) + + assert converted.kwargs["data_file_path"] == "s3://warehouse/table/data.parquet" + assert converted.kwargs["data_file_format"] == "parquet" + assert converted.kwargs["file_size_in_bytes"] == 1234 + assert converted.kwargs["length"] == 1234 + assert converted.kwargs["record_count"] == 10 + assert converted.kwargs["partition_data"] == ["bucket-1"] + assert converted.kwargs["partition_spec"] == partition_spec.model_dump_json(by_alias=True, exclude_none=True) + assert converted.kwargs["name_mapping"] == name_mapping.model_dump_json(by_alias=True, exclude_none=True) + assert converted.kwargs["case_sensitive"] is False + assert set(converted.kwargs["project_field_ids"]) == {1, 2} + assert converted.kwargs["predicate"].kwargs == {"op": "eq", "name": "id", "args": (3,)} + + +def test_file_scan_task_to_pyiceberg_core_adds_filter_only_field_to_read_projection(simple_schema: Schema) -> None: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://warehouse/table/data.parquet", + file_format="PARQUET", + partition=Record(), + record_count=10, + file_size_in_bytes=1234, + column_sizes={1: 20}, + value_counts={1: 10}, + null_value_counts={1: 0}, + nan_value_counts={}, + lower_bounds={1: b"\x01\x00\x00\x00"}, + upper_bounds={1: b"\x0a\x00\x00\x00"}, + ) + data_file.spec_id = 0 + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=3) + + task = FileScanTask(data_file, residual=EqualTo("data", "abc")) + converted = file_scan_task_to_pyiceberg_core(task, simple_schema, projected_schema=projected_schema) + + assert converted.kwargs["project_field_ids"] == [1, 2] + + +def test_file_scan_task_to_pyiceberg_core_requires_partition_spec_for_partitioned_task(simple_schema: Schema) -> None: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://warehouse/table/data.parquet", + file_format="PARQUET", + partition=Record("bucket-1"), + record_count=10, + file_size_in_bytes=1234, + column_sizes={}, + value_counts={}, + null_value_counts={}, + nan_value_counts={}, + lower_bounds={}, + upper_bounds={}, + ) + data_file.spec_id = 7 + + with pytest.raises(ValueError, match="partition_spec is required"): + file_scan_task_to_pyiceberg_core(FileScanTask(data_file), simple_schema) From dc0a7afe208ec0758d5bf8c0901658dfda49e5a6 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sun, 24 May 2026 19:51:37 -0500 Subject: [PATCH 02/10] feat(table): add opt-in pyiceberg-core arrow reader --- pyiceberg/io/pyiceberg_core.py | 37 +++++++++++++++++++++++++++++++++ pyiceberg/table/__init__.py | 32 ++++++++++++++++++++++++++-- tests/io/test_pyiceberg_core.py | 8 +++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index f49bfb48da..f060056196 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -146,6 +146,16 @@ def _read_field_ids( return ids +def can_read_projected_schema_with_pyiceberg_core( + schema: Schema, + projected_schema: Schema, + row_filter: BooleanExpression, + case_sensitive: bool, +) -> bool: + """Return whether pyiceberg-core can read exactly the requested projection for this filter.""" + return _expression_field_ids(row_filter, schema, case_sensitive).issubset(projected_schema.field_ids) + + _UNARY_METHODS: dict[type[BooleanExpression], str] = { IsNull: "is_null", NotNull: "is_not_null", @@ -291,3 +301,30 @@ def file_scan_task_to_pyiceberg_core( name_mapping=_model_json(name_mapping) if name_mapping is not None else None, case_sensitive=case_sensitive, ) + + +def arrow_batch_reader_from_pyiceberg_core( + file_io: FileIO, + tasks: Iterable[FileScanTask], + schema: Schema, + projected_schema: Schema, + partition_specs: dict[int, PartitionSpec], + name_mapping: NameMapping | None, + case_sensitive: bool = True, +) -> Any: + """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader.""" + core_tasks = [ + file_scan_task_to_pyiceberg_core( + task, + schema, + projected_schema, + partition_spec=partition_specs.get(task.file.spec_id), + name_mapping=name_mapping, + case_sensitive=case_sensitive, + project_field_ids=list(projected_schema.field_ids), + ) + for task in tasks + ] + + reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io)) + return reader.read(schema_to_pyiceberg_core(projected_schema), core_tasks) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 64ad10050d..ad45eac2c7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -110,6 +110,7 @@ ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" +PYICEBERG_RUST_ARROW_SCAN = "PYICEBERG_RUST_ARROW_SCAN" @dataclass() @@ -2242,9 +2243,36 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - target_schema = schema_to_pyarrow(self.projection()) + projected_schema = self.projection() + if self.limit is None and os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: + from pyiceberg.io.pyiceberg_core import ( + arrow_batch_reader_from_pyiceberg_core, + can_read_projected_schema_with_pyiceberg_core, + ) + + if can_read_projected_schema_with_pyiceberg_core( + self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive + ): + try: + return arrow_batch_reader_from_pyiceberg_core( + self.io, + self.plan_files(), + self.table_metadata.schema(), + projected_schema, + self.table_metadata.specs(), + self.table_metadata.name_mapping(), + self.case_sensitive, + ) + except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) + + target_schema = schema_to_pyarrow(projected_schema) batches = ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + self.table_metadata, self.io, projected_schema, self.row_filter, self.case_sensitive, self.limit ).to_record_batches(self.plan_files()) return pa.RecordBatchReader.from_batches( diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 0e1fd0f95b..8d516d96cf 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -26,6 +26,7 @@ from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + can_read_projected_schema_with_pyiceberg_core, delete_file_to_pyiceberg_core, expression_to_pyiceberg_core, file_io_to_pyiceberg_core, @@ -295,6 +296,13 @@ def test_file_scan_task_to_pyiceberg_core_adds_filter_only_field_to_read_project assert converted.kwargs["project_field_ids"] == [1, 2] +def test_can_read_projected_schema_with_pyiceberg_core_requires_filter_fields(simple_schema: Schema) -> None: + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=3) + + assert not can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("data", "abc"), True) + assert can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("id", 1), True) + + def test_file_scan_task_to_pyiceberg_core_requires_partition_spec_for_partitioned_task(simple_schema: Schema) -> None: data_file = DataFile.from_args( content=DataFileContent.DATA, From d6d0b3bc1570be5436c9717725b37444ec506042 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 04:34:34 -0500 Subject: [PATCH 03/10] feat(io): shard pyiceberg-core arrow reads across cores Fan scan tasks out over a thread pool of native ArrowReaders so decode uses multiple cores instead of one, streaming batches as they complete with at most one decoded batch per shard in flight. A default batch size amortizes the per-batch GIL handoff that otherwise dominates the fan-in. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 164 ++++++++++++++++++++- tests/io/test_pyiceberg_core.py | 250 ++++++++++++++++++++++++++++++++ 2 files changed, 411 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index f060056196..4ef33dcfca 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -19,6 +19,10 @@ from __future__ import annotations import importlib +import os +import threading +import weakref +from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from typing import TYPE_CHECKING, Any from pyiceberg.expressions import ( @@ -303,6 +307,132 @@ def file_scan_task_to_pyiceberg_core( ) +# Rows per Arrow batch handed back from the native reader. The native default emits very small +# batches, and the sharded fan-in marshals every batch back through the GIL-holding consumer, so a +# tiny batch makes per-batch Python orchestration dominate the decode (measured ~3x slower than a +# whole-shard drain). A larger batch amortizes that handoff while keeping in-flight memory bounded +# to shards x batch (so the streaming contract still holds). Override with +# PYICEBERG_RUST_ARROW_BATCH_SIZE. +_DEFAULT_ARROW_BATCH_SIZE = 262144 + + +def _reader_kwargs() -> dict[str, int]: + batch_size = os.environ.get("PYICEBERG_RUST_ARROW_BATCH_SIZE") + kwargs: dict[str, int] = {"batch_size": int(batch_size) if batch_size else _DEFAULT_ARROW_BATCH_SIZE} + concurrency = os.environ.get("PYICEBERG_RUST_ARROW_FILE_CONCURRENCY") + if concurrency: + kwargs["data_file_concurrency_limit"] = int(concurrency) + return kwargs + + +def _shard_count(n_tasks: int) -> int: + """How many decode threads to fan out across. + + The native ArrowReader decodes a single stream on one core (it parallelizes I/O, not CPU + decode), so a single-stream read of many files leaves the box idle. Sharding the file tasks + across threads — each driving its own reader — recovers multi-core decode (the GIL is released + during the C-stream drain). Default scales with cores, capped so tiny scans don't pay thread + overhead; override with PYICEBERG_RUST_ARROW_SHARDS (1 disables sharding). + """ + override = os.environ.get("PYICEBERG_RUST_ARROW_SHARDS") + if override: + return max(1, int(override)) + if n_tasks <= 1: + return 1 + return max(1, min(n_tasks, (os.cpu_count() or 1))) + + +class _ShardedBatchStream: + """Generator-backed, backpressured fan-in over several native shard readers. + + Each shard owns one ``pyiceberg_core`` ``RecordBatchReader`` and is drained sequentially (a + stateful reader must not be polled concurrently), so at most one read per shard is ever in + flight. A ``ThreadPoolExecutor`` pulls the *next* batch from every idle shard at once and the + consumer is handed batches as they complete, so decode runs on up to ``n_shards`` cores (the + GIL is released during the C-stream drain). Peak memory is bounded to at most one decoded + batch per shard plus what the consumer holds — never the whole result — because a shard is not + asked for its next batch until its current one has been handed out (backpressure: a slow + consumer stalls the shards rather than buffering ahead). + + Batches are yielded as they complete (``FIRST_COMPLETED``); ordering across shards is not + preserved, which is sound because the scan result is an unordered union of file tasks. Worker + exceptions are re-raised to the consumer, and the pool is shut down on exhaustion, on an + exception during iteration, on an explicit :meth:`close`, or — for a consumer that simply + stops iterating and drops the reader — via the ``weakref`` finalizer at garbage collection. + """ + + def __init__(self, readers: list[Any]) -> None: + self._readers = readers + self._pool = ThreadPoolExecutor(max_workers=len(readers)) + # Shard indices whose reader is idle and not yet known to be exhausted. + self._idle: list[int] = list(range(len(readers))) + self._in_flight: dict[Future[Any], int] = {} + self._closed = False + self._lock = threading.Lock() + # Shut the pool down if the consumer abandons the iterator without closing it. + self._finalizer = weakref.finalize(self, self._shutdown, self._pool, self._readers) + + @staticmethod + def _next_batch(reader: Any) -> Any | None: + """Pull one batch from a shard reader, returning ``None`` when the shard is exhausted.""" + try: + return reader.read_next_batch() + except StopIteration: + return None + + def _submit_idle(self) -> None: + """Submit the next read for every idle, non-exhausted shard (one read per shard).""" + while self._idle: + shard = self._idle.pop() + future = self._pool.submit(self._next_batch, self._readers[shard]) + self._in_flight[future] = shard + + def __iter__(self) -> _ShardedBatchStream: + return self + + def __next__(self) -> Any: + if self._closed: + raise StopIteration + try: + while True: + self._submit_idle() + if not self._in_flight: + # Every shard is exhausted: the union is complete. + self.close() + raise StopIteration + done, _ = wait(self._in_flight, return_when=FIRST_COMPLETED) + for future in done: + shard = self._in_flight.pop(future) + batch = future.result() # re-raises any worker exception here + if batch is None: + continue # shard exhausted; do not return it to the idle set + # Shard produced a batch: it may have more, so mark it idle again. + self._idle.append(shard) + return batch + except StopIteration: + raise + except BaseException: + # Worker exception, GeneratorExit, or KeyboardInterrupt: tear the workers down so an + # aborted scan never leaves decode threads running. + self.close() + raise + + def close(self) -> None: + """Cancel pending work and release shard readers; idempotent and consumer-safe.""" + with self._lock: + if self._closed: + return + self._closed = True + self._finalizer.detach() + self._shutdown(self._pool, self._readers) + + @staticmethod + def _shutdown(pool: ThreadPoolExecutor, readers: list[Any]) -> None: + # cancel_futures drops queued reads; in-flight reads are joined so no thread outlives us. + pool.shutdown(wait=True, cancel_futures=True) + readers.clear() + + def arrow_batch_reader_from_pyiceberg_core( file_io: FileIO, tasks: Iterable[FileScanTask], @@ -312,7 +442,18 @@ def arrow_batch_reader_from_pyiceberg_core( name_mapping: NameMapping | None, case_sensitive: bool = True, ) -> Any: - """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader.""" + """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. + + Multi-file scans are sharded across a thread pool (see ``_shard_count``) so decode uses + multiple cores; a single native reader over all files would decode on one core. Each shard + drives its own native ``RecordBatchReader``; the returned ``pyarrow.RecordBatchReader`` pulls + batches from the shards lazily (at most one decoded batch per shard in flight), so the whole + result is never materialized and peak memory stays bounded. The single-file or single-shard + case skips the fan-out entirely and returns the native reader directly. + + Worker-thread exceptions propagate to the consumer, and the shard threads are shut down when + the reader is exhausted, closed early, or garbage collected. + """ core_tasks = [ file_scan_task_to_pyiceberg_core( task, @@ -326,5 +467,22 @@ def arrow_batch_reader_from_pyiceberg_core( for task in tasks ] - reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io)) - return reader.read(schema_to_pyiceberg_core(projected_schema), core_tasks) + core_projection = schema_to_pyiceberg_core(projected_schema) + reader_kwargs = _reader_kwargs() + + def _read(shard_tasks: list[Any]) -> Any: + reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io), **reader_kwargs) + return reader.read(core_projection, shard_tasks) + + shards = _shard_count(len(core_tasks)) + if shards <= 1 or len(core_tasks) <= 1: + return _read(core_tasks) + + import pyarrow as pa + + groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] + readers = [_read(group) for group in groups] + # Every native reader carries the same projected Arrow schema; use it to type the stream. + arrow_schema = readers[0].schema + stream = _ShardedBatchStream(readers) + return pa.RecordBatchReader.from_batches(arrow_schema, stream) diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 8d516d96cf..c1797441db 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -18,14 +18,18 @@ from __future__ import annotations import sys +import threading from types import ModuleType from typing import Any +import pyarrow as pa import pytest from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + _ShardedBatchStream, + arrow_batch_reader_from_pyiceberg_core, can_read_projected_schema_with_pyiceberg_core, delete_file_to_pyiceberg_core, expression_to_pyiceberg_core, @@ -122,6 +126,7 @@ def fake_pyiceberg_core(monkeypatch: pytest.MonkeyPatch) -> None: scan: Any = ModuleType("pyiceberg_core.scan") scan.DeleteFile = CoreObject scan.FileScanTask = CoreObject + scan.ArrowReader = CoreObject # streaming tests override this with a batch-producing fake monkeypatch.setitem(sys.modules, "pyiceberg_core", root) monkeypatch.setitem(sys.modules, "pyiceberg_core.schema", schema) @@ -322,3 +327,248 @@ def test_file_scan_task_to_pyiceberg_core_requires_partition_spec_for_partitione with pytest.raises(ValueError, match="partition_spec is required"): file_scan_task_to_pyiceberg_core(FileScanTask(data_file), simple_schema) + + +# --- streaming sharded reader ------------------------------------------------- + +_STREAM_SCHEMA = pa.schema([("id", pa.int64())]) + + +def _batch(values: list[int]) -> pa.RecordBatch: + return pa.record_batch({"id": pa.array(values, type=pa.int64())}) + + +class FakeShardReader: + """A stand-in for a native ``pyiceberg_core`` RecordBatchReader over one shard's tasks.""" + + def __init__(self, batches: list[pa.RecordBatch]) -> None: + self._batches = list(batches) + self._pos = 0 + self.schema = _STREAM_SCHEMA + + def read_next_batch(self) -> pa.RecordBatch: + if self._pos >= len(self._batches): + raise StopIteration + batch = self._batches[self._pos] + self._pos += 1 + return batch + + +def _drain(reader: pa.RecordBatchReader) -> tuple[int, int]: + """Return (row_count, sum-of-id checksum) — the same parity signal used in the perf gate.""" + table = reader.read_all() + rows = table.num_rows + checksum = pa.compute.sum(table["id"]).as_py() or 0 + return rows, checksum + + +def test_sharded_batch_stream_preserves_all_rows_and_checksum() -> None: + # Uneven shards exercise the "one shard finishes early" path of the fan-in. + readers = [ + FakeShardReader([_batch([1, 2]), _batch([3])]), + FakeShardReader([_batch([4, 5, 6])]), + FakeShardReader([]), # an empty shard must not stall the union + ] + expected_rows = 6 + expected_sum = 21 + + stream = _ShardedBatchStream(readers) + reader = pa.RecordBatchReader.from_batches(_STREAM_SCHEMA, stream) + + assert _drain(reader) == (expected_rows, expected_sum) + + +def test_arrow_batch_reader_streams_lazily_without_materializing(monkeypatch: pytest.MonkeyPatch) -> None: + # The legacy implementation called read_all() per shard up front; assert the new path does not + # pull any batch until the consumer asks for one (the streaming contract). + pulled: list[int] = [] + + class ObservingReader(FakeShardReader): + def read_next_batch(self) -> pa.RecordBatch: + batch = super().read_next_batch() + pulled.append(batch.num_rows) + return batch + + shard_readers = [ObservingReader([_batch([1]), _batch([2])]), ObservingReader([_batch([3]), _batch([4])])] + + class FakeArrowReader: + def __init__(self, _file_io: Any, **_kwargs: Any) -> None: + pass + + def read(self, _projection: Any, _tasks: list[Any]) -> ObservingReader: + return shard_readers.pop(0) + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "2") + + reader = _build_reader(monkeypatch, n_tasks=2) + assert pulled == [] # construction must not drain anything + + first = reader.read_next_batch() + assert first.num_rows == 1 + # Backpressure: at most one read per shard is outstanding, so the first pull never drains all 4. + assert len(pulled) < 4 + + rows, checksum = _drain(reader) + assert (rows + first.num_rows, checksum + first["id"][0].as_py()) == (4, 1 + 2 + 3 + 4) + + +def test_sharded_batch_stream_bounds_in_flight_reads() -> None: + # Each shard read parks on a gate; assert no more than one read per shard is ever outstanding, + # so peak memory is bounded to one decoded batch per shard rather than the whole result. A + # shard must not be asked for its next batch until its current one is consumed (backpressure). + n_shards = 5 + release = threading.Event() + started = threading.Semaphore(0) + concurrent = 0 + peak = 0 + lock = threading.Lock() + + class GatedReader: + def __init__(self) -> None: + self.schema = _STREAM_SCHEMA + self._remaining = 4 + + def read_next_batch(self) -> pa.RecordBatch: + nonlocal concurrent, peak + if self._remaining <= 0: + raise StopIteration + self._remaining -= 1 + with lock: + concurrent += 1 + peak = max(peak, concurrent) + started.release() + release.wait(timeout=5) + with lock: + concurrent -= 1 + return _batch([1]) + + stream = _ShardedBatchStream([GatedReader() for _ in range(n_shards)]) + + consumer = threading.Thread(target=lambda: list(stream)) + consumer.start() + # All shards start their first read and park. An (n_shards + 1)th concurrent start would mean a + # shard was double-polled; assert it does not happen while the gate is closed. + for _ in range(n_shards): + assert started.acquire(timeout=5) + assert not started.acquire(timeout=0.2), "a shard was polled twice before its batch was consumed" + with lock: + assert peak <= n_shards + + release.set() + consumer.join(timeout=10) + assert not consumer.is_alive() + assert peak <= n_shards + + +def test_sharded_batch_stream_propagates_worker_exceptions() -> None: + class BoomReader: + def __init__(self) -> None: + self.schema = _STREAM_SCHEMA + + def read_next_batch(self) -> pa.RecordBatch: + raise RuntimeError("native decode failed") + + stream = _ShardedBatchStream([FakeShardReader([_batch([1])]), BoomReader()]) + reader = pa.RecordBatchReader.from_batches(_STREAM_SCHEMA, stream) + + with pytest.raises(RuntimeError, match="native decode failed"): + reader.read_all() + + # The pool must be torn down (no leaked worker threads) once the error surfaced. + assert stream._closed + assert stream._pool._shutdown + + +def test_sharded_batch_stream_shuts_down_on_early_close() -> None: + readers = [FakeShardReader([_batch([i]) for i in range(10)]) for _ in range(3)] + stream = _ShardedBatchStream(readers) + + first = next(stream) + assert first.num_rows == 1 + + stream.close() + assert stream._closed + assert stream._pool._shutdown + # close() is idempotent and a closed stream is exhausted. + stream.close() + with pytest.raises(StopIteration): + next(stream) + + +def test_sharded_batch_stream_shuts_down_on_garbage_collection() -> None: + import gc + import weakref + + readers = [FakeShardReader([_batch([i]) for i in range(50)]) for _ in range(3)] + stream = _ShardedBatchStream(readers) + pool = stream._pool + + next(stream) # leave the pool and workers live, then abandon the stream without close() + ref = weakref.ref(stream) + del stream + gc.collect() + + assert ref() is None # no lingering references kept the stream (and its threads) alive + assert pool._shutdown # the finalizer tore the pool down + + +def test_arrow_batch_reader_single_shard_returns_native_reader_directly(monkeypatch: pytest.MonkeyPatch) -> None: + native = FakeShardReader([_batch([7])]) + + class FakeArrowReader: + def __init__(self, _file_io: Any, **_kwargs: Any) -> None: + pass + + def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: + return native + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "1") + + # The fast path must hand back the native reader untouched, not wrap it in a fan-in. + reader = _build_reader(monkeypatch, n_tasks=4) + assert reader is native + + +def _build_reader(monkeypatch: pytest.MonkeyPatch, n_tasks: int) -> Any: + """Drive ``arrow_batch_reader_from_pyiceberg_core`` with ``n_tasks`` trivial data-file tasks.""" + + def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: + return task + + # The conversion + projection helpers are covered by their own tests; stub them so this test + # focuses on the streaming fan-in rather than re-exercising payload conversion. + monkeypatch.setattr("pyiceberg.io.pyiceberg_core.file_scan_task_to_pyiceberg_core", _identity_task_conversion) + monkeypatch.setattr("pyiceberg.io.pyiceberg_core.schema_to_pyiceberg_core", lambda schema: schema) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + tasks = [] + for _ in range(n_tasks): + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://warehouse/table/data.parquet", + file_format="PARQUET", + partition=Record(), + record_count=1, + file_size_in_bytes=1, + column_sizes={}, + value_counts={}, + null_value_counts={}, + nan_value_counts={}, + lower_bounds={}, + upper_bounds={}, + ) + data_file.spec_id = 0 + tasks.append(FileScanTask(data_file)) + + return arrow_batch_reader_from_pyiceberg_core( + FakeFileIO(properties={}), + tasks, + schema, + schema, + {0: PartitionSpec(spec_id=0)}, + None, + True, + ) From 7e2b3f03787678dc76d6639fbd068b442fac9d05 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 04:45:22 -0500 Subject: [PATCH 04/10] feat(io): apply scan limit on the pyiceberg-core path A limited scan previously always fell back to PyArrow. Push the limit through the native reader instead: truncate the streamed result at the limit (slicing the crossing batch and closing the shards so they stop decoding early) and cap the batch size to the limit so a small limit does not decode a full batch per shard. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 42 +++++++++++++++++++++++---- pyiceberg/table/__init__.py | 3 +- tests/io/test_pyiceberg_core.py | 51 ++++++++++++++++++++++++++++++++- 3 files changed, 89 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index 4ef33dcfca..b887d4a378 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -62,7 +62,7 @@ from pyiceberg.typedef import Record if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import Iterable, Iterator def _core_module(name: str) -> Any: @@ -433,6 +433,29 @@ def _shutdown(pool: ThreadPoolExecutor, readers: list[Any]) -> None: readers.clear() +def _limited_batches(source: Any, limit: int) -> Iterator[Any]: + """Yield batches from ``source`` until ``limit`` rows have been emitted, then stop. + + The batch that crosses the limit is sliced, and the underlying source is closed so a sharded + scan stops decoding early instead of draining every file. An Iceberg scan limit has no ordering + guarantee, so returning the first ``limit`` rows the readers produce is correct. + """ + remaining = limit + try: + for batch in source: + if remaining <= 0: + break + if batch.num_rows > remaining: + yield batch.slice(0, remaining) + break + remaining -= batch.num_rows + yield batch + finally: + close = getattr(source, "close", None) + if close is not None: + close() + + def arrow_batch_reader_from_pyiceberg_core( file_io: FileIO, tasks: Iterable[FileScanTask], @@ -441,6 +464,7 @@ def arrow_batch_reader_from_pyiceberg_core( partition_specs: dict[int, PartitionSpec], name_mapping: NameMapping | None, case_sensitive: bool = True, + limit: int | None = None, ) -> Any: """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. @@ -469,20 +493,28 @@ def arrow_batch_reader_from_pyiceberg_core( core_projection = schema_to_pyiceberg_core(projected_schema) reader_kwargs = _reader_kwargs() + if limit is not None: + # No point decoding a full default-sized batch per shard just to truncate to a small limit. + reader_kwargs["batch_size"] = max(1, min(reader_kwargs["batch_size"], limit)) def _read(shard_tasks: list[Any]) -> Any: reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io), **reader_kwargs) return reader.read(core_projection, shard_tasks) + import pyarrow as pa + shards = _shard_count(len(core_tasks)) if shards <= 1 or len(core_tasks) <= 1: - return _read(core_tasks) - - import pyarrow as pa + reader = _read(core_tasks) + if limit is None: + return reader + return pa.RecordBatchReader.from_batches(reader.schema, _limited_batches(reader, limit)) groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] readers = [_read(group) for group in groups] # Every native reader carries the same projected Arrow schema; use it to type the stream. arrow_schema = readers[0].schema - stream = _ShardedBatchStream(readers) + stream: Any = _ShardedBatchStream(readers) + if limit is not None: + stream = _limited_batches(stream, limit) return pa.RecordBatchReader.from_batches(arrow_schema, stream) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ad45eac2c7..29b637ea21 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2244,7 +2244,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow projected_schema = self.projection() - if self.limit is None and os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: + if os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: from pyiceberg.io.pyiceberg_core import ( arrow_batch_reader_from_pyiceberg_core, can_read_projected_schema_with_pyiceberg_core, @@ -2262,6 +2262,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: self.table_metadata.specs(), self.table_metadata.name_mapping(), self.case_sensitive, + limit=self.limit, ) except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: warnings.warn( diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index c1797441db..3f3cabb3ed 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -28,6 +28,7 @@ from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + _limited_batches, _ShardedBatchStream, arrow_batch_reader_from_pyiceberg_core, can_read_projected_schema_with_pyiceberg_core, @@ -531,7 +532,54 @@ def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: assert reader is native -def _build_reader(monkeypatch: pytest.MonkeyPatch, n_tasks: int) -> Any: +def test_limited_batches_truncates_to_limit_and_closes_source() -> None: + class ClosableSource: + def __init__(self, batches: list[pa.RecordBatch]) -> None: + self._it = iter(batches) + self.closed = False + + def __iter__(self) -> ClosableSource: + return self + + def __next__(self) -> pa.RecordBatch: + return next(self._it) + + def close(self) -> None: + self.closed = True + + source = ClosableSource([_batch([1, 2, 3]), _batch([4, 5, 6]), _batch([7, 8, 9])]) + out = list(_limited_batches(source, 4)) + + rows = [v for batch in out for v in batch["id"].to_pylist()] + assert rows == [1, 2, 3, 4] # the batch crossing the limit is sliced + assert source.closed # the underlying source is closed so a sharded scan stops decoding early + + +def test_arrow_batch_reader_applies_limit_across_shards(monkeypatch: pytest.MonkeyPatch) -> None: + captured_batch_size: list[Any] = [] + shard_readers = [ + FakeShardReader([_batch([1, 2]), _batch([3, 4])]), + FakeShardReader([_batch([5, 6]), _batch([7, 8])]), + ] + + class FakeArrowReader: + def __init__(self, _file_io: Any, **kwargs: Any) -> None: + captured_batch_size.append(kwargs.get("batch_size")) + + def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: + return shard_readers.pop(0) + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "2") + + reader = _build_reader(monkeypatch, n_tasks=4, limit=3) + table = reader.read_all() + + assert table.num_rows == 3 # the global limit is enforced across shards, not per shard + assert captured_batch_size == [3, 3] # batch size is capped to the limit so small limits don't over-decode + + +def _build_reader(monkeypatch: pytest.MonkeyPatch, n_tasks: int, limit: int | None = None) -> Any: """Drive ``arrow_batch_reader_from_pyiceberg_core`` with ``n_tasks`` trivial data-file tasks.""" def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: @@ -571,4 +619,5 @@ def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: {0: PartitionSpec(spec_id=0)}, None, True, + limit=limit, ) From cf1fa47d427c896dc86fa22fcd732c9d225e8683 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 04:58:39 -0500 Subject: [PATCH 05/10] fix(io): return the PyArrow target schema from the native scan The native reader emits arrow-rs's own types (string rather than large_string, and run-end-encoded identity-partition columns), so its output diverged from the PyArrow scan path. Cast every batch to schema_to_pyarrow(projected_schema), decoding run-end-encoded columns first since there is no direct cast kernel for them, so the native path is a faithful drop-in. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 53 +++++++++++++++++++++++++-------- tests/io/test_pyiceberg_core.py | 49 ++++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index b887d4a378..777057de7a 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -433,6 +433,34 @@ def _shutdown(pool: ThreadPoolExecutor, readers: list[Any]) -> None: readers.clear() +def _cast_batches(source: Any, target_schema: Any) -> Iterator[Any]: + """Cast every batch to the PyArrow path's output schema so the native path is a faithful drop-in. + + The native reader returns arrow-rs's own types (e.g. ``string`` rather than PyIceberg's + ``large_string``) and run-end-encodes constant identity-partition columns. There is no direct + cast kernel from run-end-encoded to the target type, so those columns are decoded first, then a + single ``RecordBatch.cast`` aligns the whole batch with ``schema_to_pyarrow(projected_schema)``. + """ + import pyarrow as pa + import pyarrow.compute as pc + + try: + for batch in source: + columns = None + for i, column in enumerate(batch.columns): + if pa.types.is_run_end_encoded(column.type): + if columns is None: + columns = list(batch.columns) + columns[i] = pc.run_end_decode(column) + if columns is not None: + batch = pa.RecordBatch.from_arrays(columns, names=batch.schema.names) + yield batch.cast(target_schema) + finally: + close = getattr(source, "close", None) + if close is not None: + close() + + def _limited_batches(source: Any, limit: int) -> Iterator[Any]: """Yield batches from ``source`` until ``limit`` rows have been emitted, then stop. @@ -503,18 +531,19 @@ def _read(shard_tasks: list[Any]) -> Any: import pyarrow as pa + from pyiceberg.io.pyarrow import schema_to_pyarrow + + # The PyArrow scan path returns this exact schema; casting the native batches to it makes the + # native path a faithful drop-in (matching large_string, decoded partition columns, etc.). + target_schema = schema_to_pyarrow(projected_schema) + shards = _shard_count(len(core_tasks)) if shards <= 1 or len(core_tasks) <= 1: - reader = _read(core_tasks) - if limit is None: - return reader - return pa.RecordBatchReader.from_batches(reader.schema, _limited_batches(reader, limit)) - - groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] - readers = [_read(group) for group in groups] - # Every native reader carries the same projected Arrow schema; use it to type the stream. - arrow_schema = readers[0].schema - stream: Any = _ShardedBatchStream(readers) + source: Any = _read(core_tasks) + else: + groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] + source = _ShardedBatchStream([_read(group) for group in groups]) + if limit is not None: - stream = _limited_batches(stream, limit) - return pa.RecordBatchReader.from_batches(arrow_schema, stream) + source = _limited_batches(source, limit) + return pa.RecordBatchReader.from_batches(target_schema, _cast_batches(source, target_schema)) diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 3f3cabb3ed..41ec5af365 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -28,6 +28,7 @@ from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + _cast_batches, _limited_batches, _ShardedBatchStream, arrow_batch_reader_from_pyiceberg_core, @@ -354,6 +355,13 @@ def read_next_batch(self) -> pa.RecordBatch: self._pos += 1 return batch + # A real pyarrow.RecordBatchReader (what the single-shard fast path drains) is also iterable. + def __iter__(self) -> FakeShardReader: + return self + + def __next__(self) -> pa.RecordBatch: + return self.read_next_batch() + def _drain(reader: pa.RecordBatchReader) -> tuple[int, int]: """Return (row_count, sum-of-id checksum) — the same parity signal used in the perf gate.""" @@ -514,7 +522,9 @@ def test_sharded_batch_stream_shuts_down_on_garbage_collection() -> None: assert pool._shutdown # the finalizer tore the pool down -def test_arrow_batch_reader_single_shard_returns_native_reader_directly(monkeypatch: pytest.MonkeyPatch) -> None: +def test_arrow_batch_reader_single_shard_casts_to_target_schema(monkeypatch: pytest.MonkeyPatch) -> None: + # The native reader hands back int64; the projected schema is IntegerType, so even the + # single-shard fast path must cast its output to the PyArrow target schema (here int32). native = FakeShardReader([_batch([7])]) class FakeArrowReader: @@ -527,9 +537,42 @@ def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "1") - # The fast path must hand back the native reader untouched, not wrap it in a fan-in. reader = _build_reader(monkeypatch, n_tasks=4) - assert reader is native + table = reader.read_all() + + assert table.column("id").to_pylist() == [7] + assert table.schema.field("id").type == pa.int32() # cast to the projected schema's type, not the native int64 + + +def test_cast_batches_decodes_ree_and_matches_target_schema() -> None: + import pyarrow.compute as pc + + ree = pc.run_end_encode(pa.array(["a", "a", "b"], pa.string())) + batch = pa.record_batch({"id": pa.array([1, 2, 3], pa.int64()), "cat": ree}) + target = pa.schema([("id", pa.int32()), ("cat", pa.large_string())]) + + class Source: + def __init__(self, batches: list[pa.RecordBatch]) -> None: + self._it = iter(batches) + self.closed = False + + def __iter__(self) -> Source: + return self + + def __next__(self) -> pa.RecordBatch: + return next(self._it) + + def close(self) -> None: + self.closed = True + + source = Source([batch]) + out = list(_cast_batches(source, target)) + + assert len(out) == 1 + assert out[0].schema.field("id").type == pa.int32() # native int64 widened/narrowed to the target + assert out[0].schema.field("cat").type == pa.large_string() # run-end-encoded column decoded then cast + assert out[0].column("cat").to_pylist() == ["a", "a", "b"] + assert source.closed def test_limited_batches_truncates_to_limit_and_closes_source() -> None: From 50c154b4b81fa16e79c5cd0dfc407091fb4cd7f0 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 05:28:05 -0500 Subject: [PATCH 06/10] feat(io): add opt-in rust scan planning PYICEBERG_RUST_SCAN_PLANNING plans the scan in pyiceberg-core (Table.plan_files) instead of PyIceberg's Python manifest planning, then streams the planned tasks through the same sharded, casted reader as the read path. Falls back to PyArrow on any scan pyiceberg-core cannot handle. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 100 +++++++++++++++----- pyiceberg/table/__init__.py | 22 +++++ tests/io/test_pyiceberg_core.py | 158 ++++++++++++++++++++++++++++++++ 3 files changed, 259 insertions(+), 21 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index 777057de7a..5792b10220 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -484,17 +484,13 @@ def _limited_batches(source: Any, limit: int) -> Iterator[Any]: close() -def arrow_batch_reader_from_pyiceberg_core( +def _read_core_tasks( file_io: FileIO, - tasks: Iterable[FileScanTask], - schema: Schema, + core_tasks: list[Any], projected_schema: Schema, - partition_specs: dict[int, PartitionSpec], - name_mapping: NameMapping | None, - case_sensitive: bool = True, - limit: int | None = None, + limit: int | None, ) -> Any: - """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. + """Stream pyiceberg-core file scan tasks through ArrowReader as a casted, limited reader. Multi-file scans are sharded across a thread pool (see ``_shard_count``) so decode uses multiple cores; a single native reader over all files would decode on one core. Each shard @@ -506,19 +502,6 @@ def arrow_batch_reader_from_pyiceberg_core( Worker-thread exceptions propagate to the consumer, and the shard threads are shut down when the reader is exhausted, closed early, or garbage collected. """ - core_tasks = [ - file_scan_task_to_pyiceberg_core( - task, - schema, - projected_schema, - partition_spec=partition_specs.get(task.file.spec_id), - name_mapping=name_mapping, - case_sensitive=case_sensitive, - project_field_ids=list(projected_schema.field_ids), - ) - for task in tasks - ] - core_projection = schema_to_pyiceberg_core(projected_schema) reader_kwargs = _reader_kwargs() if limit is not None: @@ -547,3 +530,78 @@ def _read(shard_tasks: list[Any]) -> Any: if limit is not None: source = _limited_batches(source, limit) return pa.RecordBatchReader.from_batches(target_schema, _cast_batches(source, target_schema)) + + +def arrow_batch_reader_from_pyiceberg_core( + file_io: FileIO, + tasks: Iterable[FileScanTask], + schema: Schema, + projected_schema: Schema, + partition_specs: dict[int, PartitionSpec], + name_mapping: NameMapping | None, + case_sensitive: bool = True, + limit: int | None = None, +) -> Any: + """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. + + PyIceberg plans the files (Python-side manifest planning); each task is converted to a + pyiceberg-core file scan task and streamed through ``_read_core_tasks``. + """ + core_tasks = [ + file_scan_task_to_pyiceberg_core( + task, + schema, + projected_schema, + partition_spec=partition_specs.get(task.file.spec_id), + name_mapping=name_mapping, + case_sensitive=case_sensitive, + project_field_ids=list(projected_schema.field_ids), + ) + for task in tasks + ] + return _read_core_tasks(file_io, core_tasks, projected_schema, limit) + + +def plan_and_read_with_pyiceberg_core( + table_metadata: Any, + io: FileIO, + projected_schema: Schema, + row_filter: BooleanExpression, + table_identifier: tuple[str, ...] | None, + *, + case_sensitive: bool = True, + snapshot_id: int | None = None, + limit: int | None = None, +) -> Any: + """Plan and read a scan entirely in pyiceberg-core, skipping PyIceberg's manifest planning. + + pyiceberg-core's ``Table.plan_files`` produces the file scan tasks (data files, deletes, + partition data, residual predicate) directly from the table metadata, and those tasks are fed + straight into ``_read_core_tasks``. The native planner applies the residual during the read, so + a filter on a column that is not part of ``projected_schema`` is still honoured. + + ``selected_fields`` must be the top-level projected field *names* (the native planner sets each + task's ``project_field_ids`` from them); only flat projections are expressible this way. + """ + scan = _core_module("scan") + file_io = file_io_to_pyiceberg_core(io) + + # The identifier only names the table rust-side (data-file paths are absolute, so it does not + # affect planning); rust rejects a <2-part identifier, so fall back to a safe 2-part name. + identifier = list(table_identifier) if table_identifier and len(table_identifier) >= 2 else ["_", "_"] + + native_table = scan.Table.from_metadata_json(file_io, identifier, table_metadata.model_dump_json()) + + predicate = None + if not isinstance(row_filter, AlwaysTrue): + predicate = expression_to_pyiceberg_core(row_filter, table_metadata.schema(), case_sensitive) + + tasks = list( + native_table.plan_files( + selected_fields=[field.name for field in projected_schema.fields], + predicate=predicate, + snapshot_id=snapshot_id, + case_sensitive=case_sensitive, + ) + ) + return _read_core_tasks(io, tasks, projected_schema, limit) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 29b637ea21..8d6aece6e7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -111,6 +111,7 @@ ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" PYICEBERG_RUST_ARROW_SCAN = "PYICEBERG_RUST_ARROW_SCAN" +PYICEBERG_RUST_SCAN_PLANNING = "PYICEBERG_RUST_SCAN_PLANNING" @dataclass() @@ -2244,6 +2245,27 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow projected_schema = self.projection() + if os.environ.get(PYICEBERG_RUST_SCAN_PLANNING, "").lower() in {"1", "true", "yes"}: + from pyiceberg.io.pyiceberg_core import plan_and_read_with_pyiceberg_core + + try: + return plan_and_read_with_pyiceberg_core( + self.table_metadata, + self.io, + projected_schema, + self.row_filter, + self.table_identifier, + case_sensitive=self.case_sensitive, + snapshot_id=self.snapshot_id, + limit=self.limit, + ) + except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) + if os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: from pyiceberg.io.pyiceberg_core import ( arrow_batch_reader_from_pyiceberg_core, diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 41ec5af365..3149105886 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -37,6 +37,7 @@ expression_to_pyiceberg_core, file_io_to_pyiceberg_core, file_scan_task_to_pyiceberg_core, + plan_and_read_with_pyiceberg_core, schema_to_pyiceberg_core, ) from pyiceberg.manifest import DataFile, DataFileContent @@ -86,6 +87,29 @@ def negate(self) -> CorePredicate: return CorePredicate(op="not", child=self) +class CoreScanTable(CoreObject): + """Fake ``pyiceberg_core.scan.Table`` that records planning args and emits planned tasks.""" + + last_from_metadata_json: dict[str, Any] = {} + last_plan_files: dict[str, Any] = {} + + @classmethod + def from_metadata_json(cls, file_io: Any, identifier: Any, metadata_json: str, **kwargs: Any) -> CoreScanTable: + CoreScanTable.last_from_metadata_json = { + "file_io": file_io, + "identifier": identifier, + "metadata_json": metadata_json, + **kwargs, + } + return cls() + + def plan_files(self, **kwargs: Any) -> list[CoreObject]: + CoreScanTable.last_plan_files = kwargs + # One planned task per selected field keeps the count deterministic and lets the read fake + # echo back exactly what planning produced. + return [CoreObject(planned=name) for name in kwargs["selected_fields"]] + + class CoreReference(CoreObject): def _predicate(self, op: str, *args: Any) -> CorePredicate: return CorePredicate(op=op, name=self.args[0], args=args) @@ -128,6 +152,7 @@ def fake_pyiceberg_core(monkeypatch: pytest.MonkeyPatch) -> None: scan: Any = ModuleType("pyiceberg_core.scan") scan.DeleteFile = CoreObject scan.FileScanTask = CoreObject + scan.Table = CoreScanTable # planning tests drive from_metadata_json + plan_files through this scan.ArrowReader = CoreObject # streaming tests override this with a batch-producing fake monkeypatch.setitem(sys.modules, "pyiceberg_core", root) @@ -664,3 +689,136 @@ def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: True, limit=limit, ) + + +# --- native scan planning ----------------------------------------------------- + + +class FakeTableMetadata: + """Minimal stand-in: native planning only needs the metadata JSON and the table schema.""" + + def __init__(self, schema: Schema) -> None: + self._schema = schema + + def model_dump_json(self) -> str: + return '{"format-version": 2}' + + def schema(self) -> Schema: + return self._schema + + +def _planning_arrow_reader(monkeypatch: pytest.MonkeyPatch, captured: dict[str, Any]) -> None: + """Wire a fake ArrowReader that records the projection + planned tasks and emits one batch.""" + + class FakeArrowReader: + def __init__(self, _file_io: Any, **kwargs: Any) -> None: + captured["reader_kwargs"] = kwargs + + def read(self, projection: Any, tasks: list[Any]) -> FakeShardReader: + captured["projection"] = projection + captured["tasks"] = tasks + return FakeShardReader([_batch([1, 2, 3])]) + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "1") + + +def test_plan_and_read_passes_projection_and_filter_to_native_planner(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "data", StringType()), + schema_id=0, + ) + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + reader = plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={"s3.region": "us-east-1"}), + projected_schema, + EqualTo("id", 5), + ("ns", "t"), + case_sensitive=False, + snapshot_id=42, + ) + table = reader.read_all() + + # The metadata JSON and a >=2-part identifier are handed to from_metadata_json verbatim. + assert CoreScanTable.last_from_metadata_json["identifier"] == ["ns", "t"] + assert CoreScanTable.last_from_metadata_json["metadata_json"] == '{"format-version": 2}' + # plan_files receives the projection as field NAMES, the converted predicate, snapshot, sensitivity. + plan = CoreScanTable.last_plan_files + assert plan["selected_fields"] == ["id"] + assert plan["snapshot_id"] == 42 + assert plan["case_sensitive"] is False + assert plan["predicate"].kwargs == {"op": "eq", "name": "id", "args": (5,)} + # The planned tasks (not python-built tasks) are what the reader consumes. + assert [task.kwargs["planned"] for task in captured["tasks"]] == ["id"] + # Output is cast to the projected schema's PyArrow type (IntegerType -> int32, not native int64). + assert table.schema.field("id").type == pa.int32() + assert table.column("id").to_pylist() == [1, 2, 3] + + +def test_plan_and_read_skips_predicate_for_always_true_filter(monkeypatch: pytest.MonkeyPatch) -> None: + from pyiceberg.expressions import AlwaysTrue + + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={}), + schema, + AlwaysTrue(), + ("ns", "t"), + ).read_all() + + # An unfiltered scan must not pass a predicate at all (rather than always_true()). + assert CoreScanTable.last_plan_files["predicate"] is None + + +def test_plan_and_read_falls_back_to_safe_identifier_for_short_identifier(monkeypatch: pytest.MonkeyPatch) -> None: + from pyiceberg.expressions import AlwaysTrue + + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + # A single-part (or None) identifier cannot name a table rust-side; a 2-part fallback is used. + plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={}), + schema, + AlwaysTrue(), + ("only_one",), + ).read_all() + + identifier = CoreScanTable.last_from_metadata_json["identifier"] + assert len(identifier) >= 2 + + +def test_plan_and_read_applies_limit(monkeypatch: pytest.MonkeyPatch) -> None: + from pyiceberg.expressions import AlwaysTrue + + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + reader = plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={}), + schema, + AlwaysTrue(), + ("ns", "t"), + limit=2, + ) + table = reader.read_all() + + assert table.num_rows == 2 # the batch of 3 is truncated to the limit + assert captured["reader_kwargs"]["batch_size"] == 2 # batch size capped to the limit From 015178ce54660984b7d24fee84db6cadf1951df1 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 07:28:39 -0500 Subject: [PATCH 07/10] fix(io): make opt-in native arrow scan usable on real S3 catalogs The native path crashed on a normal REST+S3 catalog instead of degrading: non-str FileIO props (the REST auth manager) reached the Rust binding, S3 path-style/region were never translated to the opendal keys, and the fallback guard missed decode-time errors and schemeless local paths. Filter props to strings, mirror PyArrow's path-style default and pass region, prime the first batch so a decode mismatch falls back too, and skip native for bare paths. Scoped to static-credential FileIO -- a refreshing auth manager is still not carried to the native path. Co-Authored-By: Claude Opus 4.8 (1M context) --- pyiceberg/io/pyiceberg_core.py | 49 ++++++++++++++- pyiceberg/table/__init__.py | 99 ++++++++++++++++++++++------- tests/io/test_pyiceberg_core.py | 106 +++++++++++++++++++++++++++++++- 3 files changed, 227 insertions(+), 27 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index 5792b10220..7aa2c9aef8 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -53,13 +53,19 @@ UnaryPredicate, UnboundTerm, ) -from pyiceberg.io import FileIO +from pyiceberg.io import ( + AWS_REGION, + S3_FORCE_VIRTUAL_ADDRESSING, + S3_REGION, + FileIO, +) from pyiceberg.manifest import DataFile from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import FileScanTask from pyiceberg.table.name_mapping import NameMapping from pyiceberg.typedef import Record +from pyiceberg.utils.properties import property_as_bool if TYPE_CHECKING: from collections.abc import Iterable, Iterator @@ -89,9 +95,46 @@ def schema_to_pyiceberg_core(schema: Schema) -> Any: return _core_module("schema").Schema.from_json(_model_json(schema)) +def _string_props(properties: dict[str, Any]) -> dict[str, str]: + """Keep only the string-valued FileIO properties the Rust binding accepts. + + The Rust ``FileIO.from_props`` requires every value to be a ``str``. The REST catalog injects a + live ``LegacyOAuth2AuthManager`` object under ``auth.manager`` (for token refresh), which would + raise ``TypeError`` and crash every native scan. Dropping non-str values is a scoped fix for + static-credential FileIO (endpoint plus access key/secret/token, all strings) -- it deliberately + does NOT hand off a non-str auth manager, so signed/refreshing auth is not carried to the native + path. That is a known limitation, not a complete auth handoff. + """ + return {key: value for key, value in properties.items() if isinstance(value, str)} + + def file_io_to_pyiceberg_core(file_io: FileIO) -> Any: - """Convert a PyIceberg FileIO to a pyiceberg-core FileIO-like object.""" - return _core_module("file_io").FileIO.from_props(dict(file_io.properties)) + """Convert a PyIceberg FileIO to a pyiceberg-core FileIO-like object. + + Only string-valued properties are forwarded (see ``_string_props``), and two PyIceberg S3 + properties are translated into the opendal-backed binding's own keys so MinIO/path-style and + region-required deployments work on the native path the same way they do on PyArrow. + """ + props = _string_props(dict(file_io.properties)) + + # PyIceberg documents ``s3.force-virtual-addressing`` (default False on the s3 scan path), but + # the opendal S3 client reads ``s3.path-style-access``. Mirror the PyArrow default: path-style + # access is on unless virtual addressing was explicitly requested. Only set the binding key when + # it is not already provided, so an explicit native-side override still wins. + if "s3.path-style-access" not in props: + force_virtual_addressing = property_as_bool(props, S3_FORCE_VIRTUAL_ADDRESSING, False) + if not force_virtual_addressing: + props["s3.path-style-access"] = "true" + + # The opendal S3 builder requires an explicit region (PyArrow resolves it implicitly). Fall back + # to PyIceberg's ``client.region`` property and finally the AWS_REGION OS env so the native path + # does not fail with "region is missing". + if S3_REGION not in props: + region = props.get(AWS_REGION) or os.environ.get("AWS_REGION") + if region: + props[S3_REGION] = region + + return _core_module("file_io").FileIO.from_props(props) def _literal_value(value: Any) -> Any: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8d6aece6e7..5af9beb818 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -114,6 +114,35 @@ PYICEBERG_RUST_SCAN_PLANNING = "PYICEBERG_RUST_SCAN_PLANNING" +def _native_scan_supports_paths(tasks: Iterable[FileScanTask]) -> bool: + """Return whether every planned data file has a URL scheme the native opendal reader can open. + + The opendal-backed reader crashes on a bare local path (no ``scheme://``); the PyArrow path + reads those fine. Checking the planned task paths up front lets an unschemed path fall back to + PyArrow cleanly instead of crashing inside the native reader. + """ + return all("://" in task.file.file_path for task in tasks) + + +def _prime_native_reader(reader: pa.RecordBatchReader) -> pa.RecordBatchReader: + """Pull the first batch so a native decode failure surfaces before any batch is handed out. + + The native reader builds eagerly but decodes lazily, so a binding mismatch can raise either at + construction or while the FIRST batch is decoded (e.g. the run-end-decode/cast in + pyiceberg_core). Forcing the first batch here lets the caller's guard fall back to PyArrow. The + primed batch is re-chained ahead of the rest, so iteration is unchanged. Once a batch has been + yielded a later error MUST propagate -- restarting on PyArrow would re-emit already returned rows + (silent duplication), so this only ever fronts the very first batch. + """ + import pyarrow as pa + + try: + first = reader.read_next_batch() + except StopIteration: + return reader + return pa.RecordBatchReader.from_batches(reader.schema, itertools.chain((first,), reader)) + + @dataclass() class UpsertResult: """Summary the upsert operation.""" @@ -2245,21 +2274,34 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow projected_schema = self.projection() + + # An unsupported native scan should DEGRADE to PyArrow, not crash. The native reader builds + # eagerly but decodes lazily, so a binding mismatch can surface either at construction or + # while the FIRST batch is decoded (e.g. the run-end-decode/cast in pyiceberg_core). We prime + # that first batch under the guard and re-chain it; once any batch has been handed to the + # caller we MUST let later errors propagate -- restarting on PyArrow would re-emit already + # yielded rows (silent duplication). The catch is scoped to the native attempt only: the + # binding raises ValueError/NotImplementedError for unsupported scans and TypeError/ + # ArrowInvalid for prop/decode mismatches; broader user errors still surface. + native_fallback_errors = (ModuleNotFoundError, NotImplementedError, ValueError, TypeError, pa.lib.ArrowInvalid) + if os.environ.get(PYICEBERG_RUST_SCAN_PLANNING, "").lower() in {"1", "true", "yes"}: from pyiceberg.io.pyiceberg_core import plan_and_read_with_pyiceberg_core try: - return plan_and_read_with_pyiceberg_core( - self.table_metadata, - self.io, - projected_schema, - self.row_filter, - self.table_identifier, - case_sensitive=self.case_sensitive, - snapshot_id=self.snapshot_id, - limit=self.limit, + return _prime_native_reader( + plan_and_read_with_pyiceberg_core( + self.table_metadata, + self.io, + projected_schema, + self.row_filter, + self.table_identifier, + case_sensitive=self.case_sensitive, + snapshot_id=self.snapshot_id, + limit=self.limit, + ) ) - except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + except native_fallback_errors as exc: warnings.warn( f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", RuntimeWarning, @@ -2275,23 +2317,34 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: if can_read_projected_schema_with_pyiceberg_core( self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive ): - try: - return arrow_batch_reader_from_pyiceberg_core( - self.io, - self.plan_files(), - self.table_metadata.schema(), - projected_schema, - self.table_metadata.specs(), - self.table_metadata.name_mapping(), - self.case_sensitive, - limit=self.limit, - ) - except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + # Plan once and reuse: the bare-path guard inspects the same tasks the reader gets. + native_tasks = list(self.plan_files()) + if not _native_scan_supports_paths(native_tasks): warnings.warn( - f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + "Falling back to PyArrow scan because pyiceberg-core cannot read a data file path without a URL scheme.", RuntimeWarning, stacklevel=2, ) + else: + try: + return _prime_native_reader( + arrow_batch_reader_from_pyiceberg_core( + self.io, + native_tasks, + self.table_metadata.schema(), + projected_schema, + self.table_metadata.specs(), + self.table_metadata.name_mapping(), + self.case_sensitive, + limit=self.limit, + ) + ) + except native_fallback_errors as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) target_schema = schema_to_pyarrow(projected_schema) batches = ArrowScan( diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 3149105886..fc8cf6d2f4 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -188,7 +188,39 @@ def test_schema_to_pyiceberg_core_uses_lazy_core_schema_json(simple_schema: Sche def test_file_io_to_pyiceberg_core_uses_file_io_properties() -> None: converted = file_io_to_pyiceberg_core(FakeFileIO(properties={"s3.region": "us-east-1"})) - assert converted.kwargs == {"properties": {"s3.region": "us-east-1"}} + # Region present, no force-virtual-addressing -> path-style is turned on (PyArrow default). + assert converted.kwargs == {"properties": {"s3.region": "us-east-1", "s3.path-style-access": "true"}} + + +def test_file_io_to_pyiceberg_core_drops_non_str_props() -> None: + # The REST catalog injects a live auth-manager object under "auth.manager"; the Rust binding + # only accepts str values, so non-str props must be dropped rather than crashing the native scan. + file_io = FakeFileIO(properties={"s3.region": "us-east-1", "auth.manager": object()}) + converted = file_io_to_pyiceberg_core(file_io) + + assert "auth.manager" not in converted.kwargs["properties"] + assert converted.kwargs["properties"]["s3.region"] == "us-east-1" + + +def test_file_io_to_pyiceberg_core_translates_path_style_for_minio() -> None: + file_io = FakeFileIO(properties={"s3.region": "us-east-1", "s3.force-virtual-addressing": "false"}) + converted = file_io_to_pyiceberg_core(file_io) + + assert converted.kwargs["properties"]["s3.path-style-access"] == "true" + + +def test_file_io_to_pyiceberg_core_keeps_virtual_addressing() -> None: + file_io = FakeFileIO(properties={"s3.region": "us-east-1", "s3.force-virtual-addressing": "true"}) + converted = file_io_to_pyiceberg_core(file_io) + + assert "s3.path-style-access" not in converted.kwargs["properties"] + + +def test_file_io_to_pyiceberg_core_falls_back_to_aws_region_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AWS_REGION", "eu-west-1") + converted = file_io_to_pyiceberg_core(FakeFileIO(properties={})) + + assert converted.kwargs["properties"]["s3.region"] == "eu-west-1" def test_expression_to_pyiceberg_core_converts_expression_tree(simple_schema: Schema) -> None: @@ -822,3 +854,75 @@ def test_plan_and_read_applies_limit(monkeypatch: pytest.MonkeyPatch) -> None: assert table.num_rows == 2 # the batch of 3 is truncated to the limit assert captured["reader_kwargs"]["batch_size"] == 2 # batch size capped to the limit + + +def _bare_path_task(file_path: str) -> FileScanTask: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format="PARQUET", + partition=Record(), + record_count=1, + file_size_in_bytes=1, + ) + data_file.spec_id = 0 + return FileScanTask(data_file) + + +def test_native_scan_supports_paths_rejects_bare_paths() -> None: + from pyiceberg.table import _native_scan_supports_paths + + assert _native_scan_supports_paths([_bare_path_task("s3://warehouse/t/data.parquet")]) + # A bare local path (no scheme) crashes opendal, so the dispatch must fall back to PyArrow. + assert not _native_scan_supports_paths([_bare_path_task("/tmp/warehouse/t/data.parquet")]) + assert not _native_scan_supports_paths([_bare_path_task("s3://warehouse/t/a.parquet"), _bare_path_task("/tmp/t/b.parquet")]) + + +def _prime_batch(value: int) -> pa.RecordBatch: + return pa.record_batch({"id": pa.array([value], pa.int32())}) + + +def test_prime_native_reader_preserves_batch_order() -> None: + from pyiceberg.table import _prime_native_reader + + schema = pa.schema([pa.field("id", pa.int32())]) + reader = pa.RecordBatchReader.from_batches(schema, iter([_prime_batch(1), _prime_batch(2), _prime_batch(3)])) + + primed = _prime_native_reader(reader) + + assert primed.read_all().column("id").to_pylist() == [1, 2, 3] + + +def test_prime_native_reader_surfaces_error_before_first_batch() -> None: + from pyiceberg.table import _prime_native_reader + + schema = pa.schema([pa.field("id", pa.int32())]) + + def _explode() -> Any: + raise pa.lib.ArrowInvalid("decode failed on first batch") + yield # type: ignore[unreachable] # pragma: no cover - generator marker + + reader = pa.RecordBatchReader.from_batches(schema, _explode()) + + # The error must surface from _prime_native_reader so the caller's guard can fall back to PyArrow. + with pytest.raises(pa.lib.ArrowInvalid): + _prime_native_reader(reader) + + +def test_prime_native_reader_propagates_mid_stream_error_without_restart() -> None: + from pyiceberg.table import _prime_native_reader + + schema = pa.schema([pa.field("id", pa.int32())]) + + def _batches() -> Any: + yield _prime_batch(1) + raise pa.lib.ArrowInvalid("decode failed mid-stream") + + reader = pa.RecordBatchReader.from_batches(schema, _batches()) + primed = _prime_native_reader(reader) + + # First batch was already handed out; a later failure must propagate (no silent fallback/restart + # that would re-emit the first batch). + assert primed.read_next_batch().column("id").to_pylist() == [1] + with pytest.raises(pa.lib.ArrowInvalid): + primed.read_next_batch() From 0428a6da564eeed48a29fb5f72ea33c052b9ca8e Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 07:32:17 -0500 Subject: [PATCH 08/10] feat(io): make scan planning pluggable via a ScanPlanner strategy plan_files() now resolves a ScanPlanner -- local manifest planning or REST server-side, exactly as before -- or uses one injected on the scan. That injection point is the seam a Rust or engine-specific planner plugs into without touching any call site. The bundled RustScanPlanner is an opt-in, deliberately-unimplemented stub: the native plan output exposes only booleans/counts, not the residual, partition data, or deletes needed to rebuild a faithful FileScanTask, so the fused native read path stays the supported native route. Env flags and default resolution are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- pyiceberg/table/__init__.py | 17 ++-- pyiceberg/table/scan_planning.py | 84 ++++++++++++++++++++ tests/table/test_scan_planning.py | 126 ++++++++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 6 deletions(-) create mode 100644 pyiceberg/table/scan_planning.py create mode 100644 tests/table/test_scan_planning.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5af9beb818..5d90215f1e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -107,6 +107,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.catalog.rest.scan_planning import RESTContentFile, RESTDeleteFile, RESTFileScanTask + from pyiceberg.table.scan_planning import ScanPlanner ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" @@ -1840,6 +1841,7 @@ class TableScan(ABC): limit: int | None catalog: Catalog | None table_identifier: Identifier | None + scan_planner: ScanPlanner | None def __init__( self, @@ -1853,6 +1855,7 @@ def __init__( limit: int | None = None, catalog: Catalog | None = None, table_identifier: Identifier | None = None, + scan_planner: ScanPlanner | None = None, ): self.table_metadata = table_metadata self.io = io @@ -1864,6 +1867,7 @@ def __init__( self.limit = limit self.catalog = catalog self.table_identifier = table_identifier + self.scan_planner = scan_planner def snapshot(self) -> Snapshot | None: if self.snapshot_id: @@ -2233,16 +2237,17 @@ def _plan_files_local(self) -> Iterable[FileScanTask]: def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. - If the table comes from a REST catalog with scan planning enabled, - this will use server-side scan planning. Otherwise, it falls back - to local planning. + The planning strategy is pluggable: an injected ``scan_planner`` is used if set, otherwise + a planner is resolved per scan. The default resolution uses REST server-side planning when + the catalog supports it and local manifest planning otherwise. Returns: List of FileScanTasks that contain both data and delete files. """ - if self._should_use_server_side_planning(): - return self._plan_files_server_side() - return self._plan_files_local() + from pyiceberg.table.scan_planning import resolve_scan_planner + + planner = self.scan_planner or resolve_scan_planner(self) + return planner.plan_files(self) def to_arrow(self) -> pa.Table: """Read an Arrow table eagerly from this DataScan. diff --git a/pyiceberg/table/scan_planning.py b/pyiceberg/table/scan_planning.py new file mode 100644 index 0000000000..e5332d8fa3 --- /dev/null +++ b/pyiceberg/table/scan_planning.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Pluggable scan-planning strategies. + +A ``ScanPlanner`` turns a ``DataScan`` into the file scan tasks that back it. The Python (local) +and REST (server-side) strategies reproduce the existing planner selection exactly; the resolver +picks between them just as ``DataScan.plan_files`` used to. A planner can also be injected on a +``DataScan`` to override resolution, which is how the Rust strategy is opted into. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from collections.abc import Iterable + + from pyiceberg.table import DataScan, FileScanTask + + +@runtime_checkable +class ScanPlanner(Protocol): + """Plan the file scan tasks for a ``DataScan``.""" + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: ... + + +class LocalScanPlanner: + """Plan files locally by reading manifests (PyIceberg's default planner).""" + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: + return scan._plan_files_local() + + +class RestScanPlanner: + """Plan files using REST server-side scan planning.""" + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: + return scan._plan_files_server_side() + + +class RustScanPlanner: + """Placeholder for a pyiceberg-core planner that returns ``FileScanTask`` objects. + + Opt-in only: ``resolve_scan_planner`` never selects this, so it has to be injected on a + ``DataScan``. It is intentionally unimplemented: pyiceberg-core's native plan output exposes + only booleans/counts for predicate, partition data, and deletes, not the residual expression, + partition ``Record``, or delete ``DataFile`` objects needed to rebuild a faithful PyIceberg + ``FileScanTask`` — so reconstructing one here would silently drop the residual and break + filter-on-dropped-column scans. Use the fused ``PYICEBERG_RUST_SCAN_PLANNING`` read path, which + plans and reads natively and never round-trips through ``FileScanTask``. + """ + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: + raise NotImplementedError( + "Native pyiceberg-core planning to FileScanTask is not supported; " + "the native plan output does not expose the residual, partition data, or deletes " + "needed to rebuild a faithful FileScanTask. Use the fused PYICEBERG_RUST_SCAN_PLANNING " + "read path instead." + ) + + +def resolve_scan_planner(scan: DataScan) -> ScanPlanner: + """Pick the planner for ``scan``, reproducing the historical local/server-side selection. + + ``RustScanPlanner`` is never auto-selected; it is opt-in via ``DataScan.scan_planner``. + """ + if scan._should_use_server_side_planning(): + return RestScanPlanner() + return LocalScanPlanner() diff --git a/tests/table/test_scan_planning.py b/tests/table/test_scan_planning.py new file mode 100644 index 0000000000..dc04f66c50 --- /dev/null +++ b/tests/table/test_scan_planning.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Any + +import pytest + +from pyiceberg.table import Table +from pyiceberg.table.scan_planning import ( + LocalScanPlanner, + RestScanPlanner, + RustScanPlanner, + ScanPlanner, + resolve_scan_planner, +) + + +class _FakeCatalog: + def __init__(self, server_side: bool) -> None: + self._server_side = server_side + + def supports_server_side_planning(self) -> bool: + return self._server_side + + +class _FakeScan: + """Minimal stand-in for DataScan that records how the planner consulted it.""" + + def __init__(self, catalog: _FakeCatalog | None) -> None: + self.catalog = catalog + self.local_called = False + self.server_side_called = False + + def _should_use_server_side_planning(self) -> bool: + return self.catalog is not None and self.catalog.supports_server_side_planning() + + def _plan_files_local(self) -> list[Any]: + self.local_called = True + return ["local-task"] + + def _plan_files_server_side(self) -> list[Any]: + self.server_side_called = True + return ["server-task"] + + +def test_local_scan_planner_satisfies_protocol() -> None: + assert isinstance(LocalScanPlanner(), ScanPlanner) + assert isinstance(RestScanPlanner(), ScanPlanner) + + +def test_local_scan_planner_delegates_to_the_scan() -> None: + scan = _FakeScan(catalog=None) + + tasks = list(LocalScanPlanner().plan_files(scan)) # type: ignore[arg-type] + + assert tasks == ["local-task"] + assert scan.local_called and not scan.server_side_called + + +def test_rest_scan_planner_delegates_to_the_scan() -> None: + scan = _FakeScan(catalog=_FakeCatalog(server_side=True)) + + tasks = list(RestScanPlanner().plan_files(scan)) # type: ignore[arg-type] + + assert tasks == ["server-task"] + assert scan.server_side_called and not scan.local_called + + +def test_resolve_scan_planner_returns_rest_when_server_side_supported() -> None: + scan = _FakeScan(catalog=_FakeCatalog(server_side=True)) + + assert isinstance(resolve_scan_planner(scan), RestScanPlanner) # type: ignore[arg-type] + + +def test_resolve_scan_planner_returns_local_otherwise() -> None: + assert isinstance(resolve_scan_planner(_FakeScan(catalog=None)), LocalScanPlanner) # type: ignore[arg-type] + assert isinstance(resolve_scan_planner(_FakeScan(catalog=_FakeCatalog(server_side=False))), LocalScanPlanner) # type: ignore[arg-type] + + +def test_rust_scan_planner_is_an_honest_stub() -> None: + # Native plan output cannot rebuild a faithful FileScanTask (no residual / partition / deletes), + # so the planner refuses rather than silently dropping the residual. The fused read path is the + # supported native route. + with pytest.raises(NotImplementedError, match="faithful FileScanTask"): + list(RustScanPlanner().plan_files(_FakeScan(catalog=None))) # type: ignore[arg-type] + + +def test_data_scan_plan_files_uses_injected_planner(table_v2: Table) -> None: + """An injected planner overrides resolution on a real DataScan and is handed the scan itself.""" + seen: list[Any] = [] + + class _RecordingPlanner: + def plan_files(self, scan: Any) -> list[Any]: + seen.append(scan) + return ["injected-task"] + + scan = table_v2.scan().update(scan_planner=_RecordingPlanner()) + + tasks = list(scan.plan_files()) + + assert tasks == ["injected-task"] + assert seen == [scan] # the planner receives the DataScan itself, not a copied context + + +def test_data_scan_plan_files_resolves_local_by_default(table_v2: Table) -> None: + # No injected planner and no server-side catalog: resolution must pick the local planner. + scan = table_v2.scan() + + assert scan.scan_planner is None + assert isinstance(resolve_scan_planner(scan), LocalScanPlanner) From deaa353769a0ed15002fd3b172328a6b5d7583f2 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 11:43:25 -0500 Subject: [PATCH 09/10] feat(table): route eager native scans through batch reader --- .gitignore | 1 + pyiceberg/table/__init__.py | 11 +++++++++-- tests/table/test_scan_planning.py | 23 +++++++++++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ef8c522482..d53d50a8c7 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,4 @@ htmlcov pyiceberg/avro/decoder_fast.c pyiceberg/avro/*.html pyiceberg/avro/*.so +research/ diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5d90215f1e..46a1f60647 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -115,6 +115,10 @@ PYICEBERG_RUST_SCAN_PLANNING = "PYICEBERG_RUST_SCAN_PLANNING" +def _env_flag_enabled(name: str) -> bool: + return os.environ.get(name, "").lower() in {"1", "true", "yes"} + + def _native_scan_supports_paths(tasks: Iterable[FileScanTask]) -> bool: """Return whether every planned data file has a URL scheme the native opendal reader can open. @@ -2257,6 +2261,9 @@ def to_arrow(self) -> pa.Table: Returns: pa.Table: Materialized Arrow Table from the Iceberg table's DataScan """ + if _env_flag_enabled(PYICEBERG_RUST_ARROW_SCAN) or _env_flag_enabled(PYICEBERG_RUST_SCAN_PLANNING): + return self.to_arrow_batch_reader().read_all() + from pyiceberg.io.pyarrow import ArrowScan return ArrowScan( @@ -2290,7 +2297,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: # ArrowInvalid for prop/decode mismatches; broader user errors still surface. native_fallback_errors = (ModuleNotFoundError, NotImplementedError, ValueError, TypeError, pa.lib.ArrowInvalid) - if os.environ.get(PYICEBERG_RUST_SCAN_PLANNING, "").lower() in {"1", "true", "yes"}: + if _env_flag_enabled(PYICEBERG_RUST_SCAN_PLANNING): from pyiceberg.io.pyiceberg_core import plan_and_read_with_pyiceberg_core try: @@ -2313,7 +2320,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: stacklevel=2, ) - if os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: + if _env_flag_enabled(PYICEBERG_RUST_ARROW_SCAN): from pyiceberg.io.pyiceberg_core import ( arrow_batch_reader_from_pyiceberg_core, can_read_projected_schema_with_pyiceberg_core, diff --git a/tests/table/test_scan_planning.py b/tests/table/test_scan_planning.py index dc04f66c50..75c5c0ace7 100644 --- a/tests/table/test_scan_planning.py +++ b/tests/table/test_scan_planning.py @@ -19,6 +19,7 @@ from typing import Any +import pyarrow as pa import pytest from pyiceberg.table import Table @@ -124,3 +125,25 @@ def test_data_scan_plan_files_resolves_local_by_default(table_v2: Table) -> None assert scan.scan_planner is None assert isinstance(resolve_scan_planner(scan), LocalScanPlanner) + + +def test_data_scan_to_arrow_uses_native_batch_reader_when_requested( + monkeypatch: pytest.MonkeyPatch, + table_v2: Table, +) -> None: + schema = pa.schema([pa.field("id", pa.int32())]) + batch = pa.record_batch({"id": pa.array([1, 2], type=pa.int32())}) + scan = table_v2.scan().select("id") + seen: list[Any] = [] + + def _batch_reader(self: Any) -> pa.RecordBatchReader: + seen.append(self) + return pa.RecordBatchReader.from_batches(schema, [batch]) + + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SCAN", "1") + monkeypatch.setattr(type(scan), "to_arrow_batch_reader", _batch_reader) + + table = scan.to_arrow() + + assert seen == [scan] + assert table.column("id").to_pylist() == [1, 2] From 1ec08ffed5254cdbc1ee1e6a41a781c923de738c Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 13:45:39 -0500 Subject: [PATCH 10/10] feat(table): wire native scan product paths --- dev/bench_native_product.py | 241 +++++++++++++++++ pyiceberg/io/pyarrow.py | 54 +++- pyiceberg/io/pyiceberg_core.py | 24 +- pyiceberg/table/__init__.py | 253 ++++++++++++------ pyiceberg/typedef.py | 9 + tests/catalog/test_catalog_behaviors.py | 2 +- .../test_writes/test_partitioned_writes.py | 2 +- tests/integration/test_writes/test_writes.py | 4 +- tests/io/test_pyiceberg_core.py | 8 +- tests/table/test_arrow_capsule.py | 200 ++++++++++++++ tests/table/test_scan_planning.py | 35 ++- tests/table/test_upsert.py | 34 +++ 12 files changed, 765 insertions(+), 101 deletions(-) create mode 100644 dev/bench_native_product.py create mode 100644 tests/table/test_arrow_capsule.py diff --git a/dev/bench_native_product.py b/dev/bench_native_product.py new file mode 100644 index 0000000000..41b4daaf84 --- /dev/null +++ b/dev/bench_native_product.py @@ -0,0 +1,241 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Benchmark PyIceberg scan backends and Arrow stream handoffs. + +The parent process launches one child per case so wall time, CPU time, and peak +RSS are isolated from previous cases. Native routes report ``unsupported`` when +the installed ``pyiceberg-core`` wheel does not expose the scan bindings. +""" + +from __future__ import annotations + +import argparse +import importlib +import json +import os +import resource +import shutil +import subprocess +import sys +import tempfile +import time +import warnings +from collections.abc import Callable +from pathlib import Path +from typing import Any + +import pyarrow as pa + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.schema import Schema +from pyiceberg.table import PYICEBERG_RUST_SCAN_MODE +from pyiceberg.types import DoubleType, IntegerType, LongType, NestedField + +ENGINES = ("pyarrow", "rust-read", "rust-plan-and-read", "capsule-pyarrow", "capsule-polars", "duckdb") +SHAPES = ("full", "projection", "filter") + + +def _rss_kib() -> int: + return int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) + + +def _table_data(start: int, rows: int) -> pa.Table: + ids = pa.array(range(start, start + rows), type=pa.int64()) + part = pa.array(((value % 8) for value in range(start, start + rows)), type=pa.int32()) + payload = pa.array((float(value % 1024) for value in range(start, start + rows)), type=pa.float64()) + return pa.table({"id": ids, "part": part, "payload": payload}) + + +def _create_table(warehouse: Path, rows: int, files: int) -> Any: + if warehouse.exists(): + shutil.rmtree(warehouse) + warehouse.mkdir(parents=True, exist_ok=True) + + catalog = InMemoryCatalog("bench", warehouse=warehouse.as_uri()) + catalog.create_namespace("default") + schema = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "part", IntegerType(), required=False), + NestedField(3, "payload", DoubleType(), required=False), + ) + table = catalog.create_table("default.scan_bench", schema=schema) + + rows_per_file = max(1, rows // files) + offset = 0 + for file_idx in range(files): + chunk_rows = rows - offset if file_idx == files - 1 else rows_per_file + table.append(_table_data(offset, chunk_rows)) + offset += chunk_rows + return table + + +def _scan_for_shape(table: Any, shape: str) -> Any: + if shape == "full": + return table.scan() + if shape == "projection": + return table.scan(selected_fields=("id", "payload")) + if shape == "filter": + return table.scan(row_filter="part == 1", selected_fields=("id", "payload")) + raise ValueError(f"Unknown shape: {shape}") + + +def _table_digest(table: pa.Table) -> dict[str, Any]: + if "id" not in table.column_names: + return {"rows": table.num_rows, "sum_id": None} + return {"rows": table.num_rows, "sum_id": table["id"].combine_chunks().sum().as_py()} + + +def _run_scan(scan: Any, engine: str) -> dict[str, Any]: + os.environ.pop(PYICEBERG_RUST_SCAN_MODE, None) + if engine == "rust-read": + importlib.import_module("pyiceberg_core.scan") + os.environ[PYICEBERG_RUST_SCAN_MODE] = "rust-read" + elif engine == "rust-plan-and-read": + importlib.import_module("pyiceberg_core.scan") + os.environ[PYICEBERG_RUST_SCAN_MODE] = "rust-plan-and-read" + + if engine in {"rust-read", "rust-plan-and-read"}: + with warnings.catch_warnings(): + warnings.filterwarnings("error", message="Falling back to PyArrow scan.*", category=RuntimeWarning) + return _table_digest(scan.to_arrow_batch_reader().read_all()) + if engine == "pyarrow": + return _table_digest(scan.to_arrow_batch_reader().read_all()) + if engine == "capsule-pyarrow": + return _table_digest(pa.table(scan)) + if engine == "capsule-polars": + import polars as pl + + frame = pl.DataFrame(scan) + return {"rows": frame.height, "sum_id": frame["id"].sum() if "id" in frame.columns else None} + if engine == "duckdb": + con = scan.to_duckdb("iceberg_scan") + rows, sum_id = con.sql("select count(*) as rows, sum(id) as sum_id from iceberg_scan").fetchone() + return {"rows": rows, "sum_id": sum_id} + raise ValueError(f"Unknown engine: {engine}") + + +def _measure(fn: Callable[[], dict[str, Any]]) -> dict[str, Any]: + rss_before = _rss_kib() + cpu_before = time.process_time() + wall_before = time.perf_counter() + digest = fn() + wall_ms = (time.perf_counter() - wall_before) * 1000 + cpu_ms = (time.process_time() - cpu_before) * 1000 + rss_after = _rss_kib() + return { + **digest, + "wall_ms": wall_ms, + "cpu_ms": cpu_ms, + "peak_rss_kib": rss_after, + "delta_peak_rss_kib": max(0, rss_after - rss_before), + } + + +def _child(args: argparse.Namespace) -> int: + result: dict[str, Any] = { + "engine": args.engine, + "shape": args.shape, + "rows_input": args.rows, + "files": args.files, + "status": "ok", + } + try: + table = _create_table(Path(args.warehouse), args.rows, args.files) + scan = _scan_for_shape(table, args.shape) + result.update(_measure(lambda: _run_scan(scan, args.engine))) + except Exception as exc: + error_message = str(exc) + result["status"] = ( + "unsupported" + if "pyiceberg-core" in error_message + or "pyiceberg_core" in error_message + or "Falling back to PyArrow scan" in error_message + else "error" + ) + result["error_type"] = type(exc).__name__ + result["error"] = str(exc) + print(json.dumps(result, sort_keys=True)) + return 0 + + +def _parent(args: argparse.Namespace) -> int: + engines = args.engines.split(",") if args.engines else list(ENGINES) + shapes = args.shapes.split(",") if args.shapes else list(SHAPES) + for engine in engines: + if engine not in ENGINES: + raise ValueError(f"Unknown engine: {engine}") + for shape in shapes: + if shape not in SHAPES: + raise ValueError(f"Unknown shape: {shape}") + + records: list[dict[str, Any]] = [] + for shape in shapes: + for engine in engines: + for repeat in range(args.repeats): + with tempfile.TemporaryDirectory(prefix="pyiceberg-bench-") as tmp: + cmd = [ + sys.executable, + __file__, + "--child", + "--engine", + engine, + "--shape", + shape, + "--rows", + str(args.rows), + "--files", + str(args.files), + "--warehouse", + tmp, + ] + proc = subprocess.run(cmd, text=True, check=False, capture_output=True) + if proc.stderr: + print(proc.stderr, file=sys.stderr, end="") + record = json.loads(proc.stdout.strip().splitlines()[-1]) + record["repeat"] = repeat + records.append(record) + print(json.dumps(record, sort_keys=True)) + + if args.output: + Path(args.output).write_text(json.dumps(records, indent=2, sort_keys=True) + "\n", encoding="utf-8") + return 0 + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--rows", type=int, default=1_000_000) + parser.add_argument("--files", type=int, default=16) + parser.add_argument("--repeats", type=int, default=3) + parser.add_argument("--engines", help=f"Comma-separated subset of: {', '.join(ENGINES)}") + parser.add_argument("--shapes", help=f"Comma-separated subset of: {', '.join(SHAPES)}") + parser.add_argument("--output") + parser.add_argument("--child", action="store_true") + parser.add_argument("--engine", choices=ENGINES) + parser.add_argument("--shape", choices=SHAPES) + parser.add_argument("--warehouse", default="") + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + if args.child: + return _child(args) + return _parent(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4ec7a73afe..c8ca9e95fb 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -149,7 +149,7 @@ from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping from pyiceberg.table.puffin import PuffinFile from pyiceberg.transforms import IdentityTransform, TruncateTransform -from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion +from pyiceberg.typedef import EMPTY_DICT, ArrowStreamExportable, Properties, Record, TableVersion from pyiceberg.types import ( BinaryType, BooleanType, @@ -2680,30 +2680,40 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[ """Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``. Note: - ``target_file_size`` is measured in **uncompressed in-memory** Arrow bytes - (``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk Parquet - bytes. The resulting Parquet file after compression (zstd by default, - plus dictionary/RLE encoding) is typically 3-10× smaller than - ``target_file_size``. This is a coarse proxy for the spec-defined + ``target_file_size`` is measured in **uncompressed in-memory** Arrow + bytes, not compressed on-disk Parquet bytes. The size estimate uses + ``nbytes`` when available and falls back to referenced buffer size for + Arrow view types that do not support ``nbytes``. The resulting Parquet + file after compression (zstd by default, plus dictionary/RLE encoding) + is typically 3-10× smaller than ``target_file_size``. This is a coarse + proxy for the spec-defined ``write.target-file-size-bytes`` and will be tightened to true on-disk bytes once the writer is switched to a rolling-``ParquetWriter`` with - ``OutputStream.tell()`` (#2998). + ``OutputStream.tell()``. """ from pyiceberg.utils.bin_packing import PackingIterator - avg_row_size_bytes = tbl.nbytes / tbl.num_rows + avg_row_size_bytes = _arrow_data_size(tbl) / tbl.num_rows target_rows_per_file = max(1, int(target_file_size / avg_row_size_bytes)) batches = tbl.to_batches(max_chunksize=target_rows_per_file) bin_packed_record_batches = PackingIterator( items=batches, target_weight=target_file_size, lookback=len(batches), # ignore lookback - weight_func=lambda x: x.nbytes, + weight_func=_arrow_data_size, largest_bin_first=False, ) return bin_packed_record_batches +def _arrow_data_size(data: pa.Table | pa.RecordBatch) -> int: + """Estimate Arrow data size for writer bin-packing.""" + try: + return data.nbytes + except pyarrow.lib.ArrowTypeError: + return data.get_total_buffer_size() + + def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: int) -> Iterator[list[pa.RecordBatch]]: """Microbatch a single-pass stream of RecordBatches into target-sized groups. @@ -2719,18 +2729,20 @@ def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: Note: ``target_file_size`` is measured in **uncompressed in-memory** Arrow - bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes. - The resulting Parquet file after compression is typically 3-10× - smaller than ``target_file_size``. Matches the existing + bytes, not compressed on-disk Parquet bytes. The size estimate uses + ``nbytes`` when available and falls back to referenced buffer size for + Arrow view types that do not support ``nbytes``. The resulting Parquet + file after compression is typically 3-10× smaller than + ``target_file_size``. Matches the existing :func:`bin_pack_arrow_table` semantics; both will be tightened to true on-disk bytes once the writer is switched to a rolling- - ``ParquetWriter`` with ``OutputStream.tell()`` (#2998). + ``ParquetWriter`` with ``OutputStream.tell()``. """ buffer: list[pa.RecordBatch] = [] buffer_bytes = 0 for batch in batches: buffer.append(batch) - buffer_bytes += batch.nbytes + buffer_bytes += _arrow_data_size(batch) if buffer_bytes >= target_file_size: yield buffer buffer = [] @@ -3033,3 +3045,17 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar field_array = arrow_table[path_parts[0]] # Navigate into the struct using the remaining path parts return pc.struct_field(field_array, path_parts[1:]) + + +def _coerce_arrow_input(df: pa.Table | pa.RecordBatchReader | ArrowStreamExportable) -> pa.Table | pa.RecordBatchReader: + """Normalize Arrow write input to a PyArrow table or stream reader.""" + if isinstance(df, (pa.Table, pa.RecordBatchReader)): + return df + + if hasattr(df, "__arrow_c_stream__"): + return pa.RecordBatchReader.from_stream(df) + + raise ValueError( + f"Expected pa.Table, pa.RecordBatchReader, or an object implementing the " + f"Arrow PyCapsule interface (__arrow_c_stream__), got: {df!r}" + ) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index 7aa2c9aef8..5407461734 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -258,6 +258,23 @@ def expression_to_pyiceberg_core( raise NotImplementedError(f"Cannot convert unsupported PyIceberg expression {expr!r} to pyiceberg_core") +def match_filter_to_pyiceberg_core(df: Any, join_cols: list[str]) -> Any: + """Build a native predicate matching the unique join keys in ``df``.""" + expression = _core_module("expression") + unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) + + if len(join_cols) == 1: + return expression.Reference(join_cols[0]).is_in(unique_keys[0].to_pylist()) + + predicate = expression.Predicate.always_false() + for row in unique_keys.to_pylist(): + row_predicate = expression.Predicate.always_true() + for col in join_cols: + row_predicate = row_predicate.and_(expression.Reference(col).eq(row[col])) + predicate = predicate.or_(row_predicate) + return predicate + + def _bound_predicate_to_pyiceberg_core(expr: BoundPredicate) -> Any: ref = _core_module("expression").Reference(_term_name(expr.term)) @@ -302,7 +319,7 @@ def delete_file_to_pyiceberg_core(delete_file: DataFile) -> Any: if content == 1: file_type = "position-deletes" elif content == 2: - raise NotImplementedError("pyiceberg-core equality delete scan parity is tracked separately") + file_type = "equality-deletes" else: raise ValueError(f"Expected a delete file, got data file content {delete_file.content!r}") @@ -615,6 +632,7 @@ def plan_and_read_with_pyiceberg_core( case_sensitive: bool = True, snapshot_id: int | None = None, limit: int | None = None, + native_predicate: Any | None = None, ) -> Any: """Plan and read a scan entirely in pyiceberg-core, skipping PyIceberg's manifest planning. @@ -635,8 +653,8 @@ def plan_and_read_with_pyiceberg_core( native_table = scan.Table.from_metadata_json(file_io, identifier, table_metadata.model_dump_json()) - predicate = None - if not isinstance(row_filter, AlwaysTrue): + predicate = native_predicate + if predicate is None and not isinstance(row_filter, AlwaysTrue): predicate = expression_to_pyiceberg_core(row_filter, table_metadata.schema(), case_sensitive) tasks = list( diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 46a1f60647..46732500eb 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -23,6 +23,7 @@ from abc import ABC, abstractmethod from collections.abc import Callable, Iterable, Iterator from dataclasses import dataclass +from enum import Enum from functools import cached_property from itertools import chain from types import TracebackType @@ -82,6 +83,7 @@ from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import ( EMPTY_DICT, + ArrowStreamExportable, IcebergBaseModel, IcebergRootModel, Identifier, @@ -113,12 +115,64 @@ DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" PYICEBERG_RUST_ARROW_SCAN = "PYICEBERG_RUST_ARROW_SCAN" PYICEBERG_RUST_SCAN_PLANNING = "PYICEBERG_RUST_SCAN_PLANNING" +PYICEBERG_RUST_SCAN_MODE = "PYICEBERG_RUST_SCAN_MODE" +PYICEBERG_RUST_UPSERT_PREDICATE = "PYICEBERG_RUST_UPSERT_PREDICATE" + + +class _ScanBackend(str, Enum): + PYARROW = "pyarrow" + RUST_READ = "rust-read" + RUST_PLAN_AND_READ = "rust-plan-and-read" def _env_flag_enabled(name: str) -> bool: return os.environ.get(name, "").lower() in {"1", "true", "yes"} +def _scan_backend_from_mode(mode: str) -> _ScanBackend | None: + try: + return _ScanBackend(mode.strip().lower()) + except ValueError: + warnings.warn( + f"Ignoring unknown PyIceberg scan backend mode {mode!r}; using PyArrow.", + RuntimeWarning, + stacklevel=2, + ) + return None + + +def _scan_backend_candidates(properties: Properties) -> tuple[_ScanBackend, ...]: + mode = os.environ.get(PYICEBERG_RUST_SCAN_MODE) + if mode: + backend = _scan_backend_from_mode(mode) + return (backend, _ScanBackend.PYARROW) if backend and backend is not _ScanBackend.PYARROW else (_ScanBackend.PYARROW,) + + candidates: list[_ScanBackend] = [] + if _env_flag_enabled(PYICEBERG_RUST_SCAN_PLANNING): + candidates.append(_ScanBackend.RUST_PLAN_AND_READ) + if _env_flag_enabled(PYICEBERG_RUST_ARROW_SCAN): + candidates.append(_ScanBackend.RUST_READ) + if candidates: + candidates.append(_ScanBackend.PYARROW) + return tuple(candidates) + + if mode := properties.get(TableProperties.PYICEBERG_CORE_SCAN_MODE): + backend = _scan_backend_from_mode(str(mode)) + return (backend, _ScanBackend.PYARROW) if backend and backend is not _ScanBackend.PYARROW else (_ScanBackend.PYARROW,) + + return (_ScanBackend.PYARROW,) + + +def _scan_uses_native_reader(properties: Properties) -> bool: + return any(candidate is not _ScanBackend.PYARROW for candidate in _scan_backend_candidates(properties)) + + +def _native_upsert_predicate_enabled(properties: Properties) -> bool: + return _env_flag_enabled(PYICEBERG_RUST_UPSERT_PREDICATE) or property_as_bool( + properties, TableProperties.PYICEBERG_CORE_UPSERT_MATCH_FILTER_ENABLED, False + ) + + def _native_scan_supports_paths(tasks: Iterable[FileScanTask]) -> bool: """Return whether every planned data file has a URL scheme the native opendal reader can open. @@ -216,6 +270,9 @@ class TableProperties: DELETE_MODE_MERGE_ON_READ = "merge-on-read" DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE + PYICEBERG_CORE_SCAN_MODE = "read.pyiceberg-core.scan-mode" + PYICEBERG_CORE_UPSERT_MATCH_FILTER_ENABLED = "write.pyiceberg-core.upsert-match-filter.enabled" + DEFAULT_NAME_MAPPING = "schema.name-mapping.default" FORMAT_VERSION = "format-version" DEFAULT_FORMAT_VERSION: TableVersion = 2 @@ -488,7 +545,7 @@ def update_statistics(self) -> UpdateStatistics: def append( self, - df: pa.Table | pa.RecordBatchReader, + df: pa.Table | pa.RecordBatchReader | ArrowStreamExportable, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH, ) -> None: @@ -541,10 +598,9 @@ def append( except ModuleNotFoundError as e: raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e - from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _coerce_arrow_input, _dataframe_to_data_files - if not isinstance(df, (pa.Table, pa.RecordBatchReader)): - raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}") + df = _coerce_arrow_input(df) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( @@ -634,7 +690,7 @@ def dynamic_partition_overwrite( def overwrite( self, - df: pa.Table | pa.RecordBatchReader, + df: pa.Table | pa.RecordBatchReader | ArrowStreamExportable, overwrite_filter: BooleanExpression | str = ALWAYS_TRUE, snapshot_properties: dict[str, str] = EMPTY_DICT, case_sensitive: bool = True, @@ -698,10 +754,9 @@ def overwrite( except ModuleNotFoundError as e: raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e - from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _coerce_arrow_input, _dataframe_to_data_files - if not isinstance(df, (pa.Table, pa.RecordBatchReader)): - raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}") + df = _coerce_arrow_input(df) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( @@ -910,22 +965,49 @@ def upsert( format_version=self.table_metadata.format_version, ) - # get list of rows that exist so we don't have to load the entire target table - matched_predicate = upsert_util.create_match_filter(df, join_cols) + matched_iceberg_record_batches = None + if _native_upsert_predicate_enabled(self.table_metadata.properties): + try: + from pyiceberg.io.pyiceberg_core import match_filter_to_pyiceberg_core, plan_and_read_with_pyiceberg_core + + snapshot_id = None + if branch in self.table_metadata.refs: + snapshot_id = self.table_metadata.refs[branch].snapshot_id + + matched_iceberg_record_batches = plan_and_read_with_pyiceberg_core( + self.table_metadata, + self._table.io, + self.table_metadata.schema(), + ALWAYS_TRUE, + self._table.name(), + case_sensitive=case_sensitive, + snapshot_id=snapshot_id, + native_predicate=match_filter_to_pyiceberg_core(df, join_cols), + ) + except (ModuleNotFoundError, NotImplementedError, ValueError, TypeError, pa.lib.ArrowInvalid) as exc: + warnings.warn( + f"Falling back to PyIceberg upsert match filtering because pyiceberg-core cannot handle it: {exc}", + RuntimeWarning, + stacklevel=2, + ) - # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. + if matched_iceberg_record_batches is None: + # get list of rows that exist so we don't have to load the entire target table + matched_predicate = upsert_util.create_match_filter(df, join_cols) - matched_iceberg_record_batches_scan = DataScan( - table_metadata=self.table_metadata, - io=self._table.io, - row_filter=matched_predicate, - case_sensitive=case_sensitive, - ) + # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. - if branch in self.table_metadata.refs: - matched_iceberg_record_batches_scan = matched_iceberg_record_batches_scan.use_ref(branch) + matched_iceberg_record_batches_scan = DataScan( + table_metadata=self.table_metadata, + io=self._table.io, + row_filter=matched_predicate, + case_sensitive=case_sensitive, + ) - matched_iceberg_record_batches = matched_iceberg_record_batches_scan.to_arrow_batch_reader() + if branch in self.table_metadata.refs: + matched_iceberg_record_batches_scan = matched_iceberg_record_batches_scan.use_ref(branch) + + matched_iceberg_record_batches = matched_iceberg_record_batches_scan.to_arrow_batch_reader() batches_to_overwrite = [] overwrite_predicates = [] @@ -1508,7 +1590,7 @@ def upsert( def append( self, - df: pa.Table | pa.RecordBatchReader, + df: pa.Table | pa.RecordBatchReader | ArrowStreamExportable, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH, ) -> None: @@ -1543,7 +1625,7 @@ def dynamic_partition_overwrite( def overwrite( self, - df: pa.Table | pa.RecordBatchReader, + df: pa.Table | pa.RecordBatchReader | ArrowStreamExportable, overwrite_filter: BooleanExpression | str = ALWAYS_TRUE, snapshot_properties: dict[str, str] = EMPTY_DICT, case_sensitive: bool = True, @@ -1752,6 +1834,10 @@ def __datafusion_table_provider__(self, session: Any | None = None) -> IcebergDa ).__datafusion_table_provider__ return provider(session) + def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: + """Export this table as an Arrow C stream.""" + return self.scan().to_arrow_batch_reader().__arrow_c_stream__(requested_schema) + class StaticTable(Table): """Load a table directly from a metadata file (i.e., without using a catalog).""" @@ -2261,7 +2347,7 @@ def to_arrow(self) -> pa.Table: Returns: pa.Table: Materialized Arrow Table from the Iceberg table's DataScan """ - if _env_flag_enabled(PYICEBERG_RUST_ARROW_SCAN) or _env_flag_enabled(PYICEBERG_RUST_SCAN_PLANNING): + if _scan_uses_native_reader(self.table_metadata.properties): return self.to_arrow_batch_reader().read_all() from pyiceberg.io.pyarrow import ArrowScan @@ -2297,66 +2383,71 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: # ArrowInvalid for prop/decode mismatches; broader user errors still surface. native_fallback_errors = (ModuleNotFoundError, NotImplementedError, ValueError, TypeError, pa.lib.ArrowInvalid) - if _env_flag_enabled(PYICEBERG_RUST_SCAN_PLANNING): - from pyiceberg.io.pyiceberg_core import plan_and_read_with_pyiceberg_core - - try: - return _prime_native_reader( - plan_and_read_with_pyiceberg_core( - self.table_metadata, - self.io, - projected_schema, - self.row_filter, - self.table_identifier, - case_sensitive=self.case_sensitive, - snapshot_id=self.snapshot_id, - limit=self.limit, + for backend in _scan_backend_candidates(self.table_metadata.properties): + if backend is _ScanBackend.PYARROW: + break + + if backend is _ScanBackend.RUST_PLAN_AND_READ: + from pyiceberg.io.pyiceberg_core import plan_and_read_with_pyiceberg_core + + try: + return _prime_native_reader( + plan_and_read_with_pyiceberg_core( + self.table_metadata, + self.io, + projected_schema, + self.row_filter, + self.table_identifier, + case_sensitive=self.case_sensitive, + snapshot_id=self.snapshot_id, + limit=self.limit, + ) ) - ) - except native_fallback_errors as exc: - warnings.warn( - f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", - RuntimeWarning, - stacklevel=2, - ) - - if _env_flag_enabled(PYICEBERG_RUST_ARROW_SCAN): - from pyiceberg.io.pyiceberg_core import ( - arrow_batch_reader_from_pyiceberg_core, - can_read_projected_schema_with_pyiceberg_core, - ) - - if can_read_projected_schema_with_pyiceberg_core( - self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive - ): - # Plan once and reuse: the bare-path guard inspects the same tasks the reader gets. - native_tasks = list(self.plan_files()) - if not _native_scan_supports_paths(native_tasks): + except native_fallback_errors as exc: warnings.warn( - "Falling back to PyArrow scan because pyiceberg-core cannot read a data file path without a URL scheme.", + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", RuntimeWarning, stacklevel=2, ) - else: - try: - return _prime_native_reader( - arrow_batch_reader_from_pyiceberg_core( - self.io, - native_tasks, - self.table_metadata.schema(), - projected_schema, - self.table_metadata.specs(), - self.table_metadata.name_mapping(), - self.case_sensitive, - limit=self.limit, - ) - ) - except native_fallback_errors as exc: + + if backend is _ScanBackend.RUST_READ: + from pyiceberg.io.pyiceberg_core import ( + arrow_batch_reader_from_pyiceberg_core, + can_read_projected_schema_with_pyiceberg_core, + ) + + if can_read_projected_schema_with_pyiceberg_core( + self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive + ): + # Plan once and reuse: the bare-path guard inspects the same tasks the reader gets. + native_tasks = list(self.plan_files()) + if not _native_scan_supports_paths(native_tasks): warnings.warn( - f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + "Falling back to PyArrow scan because pyiceberg-core cannot read a data file path " + "without a URL scheme.", RuntimeWarning, stacklevel=2, ) + else: + try: + return _prime_native_reader( + arrow_batch_reader_from_pyiceberg_core( + self.io, + native_tasks, + self.table_metadata.schema(), + projected_schema, + self.table_metadata.specs(), + self.table_metadata.name_mapping(), + self.case_sensitive, + limit=self.limit, + ) + ) + except native_fallback_errors as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) target_schema = schema_to_pyarrow(projected_schema) batches = ArrowScan( @@ -2368,6 +2459,10 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: batches, ).cast(target_schema) + def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: + """Export this scan as an Arrow C stream.""" + return self.to_arrow_batch_reader().__arrow_c_stream__(requested_schema) + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: """Read a Pandas DataFrame eagerly from this Iceberg table. @@ -2385,7 +2480,10 @@ def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None = Non import duckdb con = connection or duckdb.connect(database=":memory:") - con.register(table_name, self.to_arrow()) + try: + con.register(table_name, self.to_arrow_batch_reader()) + except TypeError: + con.register(table_name, self.to_arrow()) return con @@ -2407,7 +2505,10 @@ def to_polars(self) -> pl.DataFrame: """ import polars as pl - result = pl.from_arrow(self.to_arrow()) + try: + result = pl.DataFrame(self) + except (TypeError, ValueError): + result = pl.from_arrow(self.to_arrow_batch_reader()) if isinstance(result, pl.Series): result = result.to_frame() diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index 6989144ef9..834b1b22f2 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -112,6 +112,15 @@ def __setitem__(self, pos: int, value: Any) -> None: """Assign a value to a StructProtocol.""" +@runtime_checkable +class ArrowStreamExportable(Protocol): # pragma: no cover + """Any object implementing the Arrow PyCapsule stream interface.""" + + @abstractmethod + def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: + """Export the object as an Arrow C stream PyCapsule.""" + + class IcebergBaseModel(BaseModel): """ This class extends the Pydantic BaseModel to set default values by overriding them. diff --git a/tests/catalog/test_catalog_behaviors.py b/tests/catalog/test_catalog_behaviors.py index b859e2d541..4c94c7d3c0 100644 --- a/tests/catalog/test_catalog_behaviors.py +++ b/tests/catalog/test_catalog_behaviors.py @@ -1318,7 +1318,7 @@ def test_append_invalid_input_type_raises(catalog: Catalog) -> None: identifier = f"default.append_invalid_input_{catalog.name}" pa_table = _simple_arrow_table() tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema) - with pytest.raises(ValueError, match="Expected pa.Table or pa.RecordBatchReader"): + with pytest.raises(ValueError, match="Expected pa.Table, pa.RecordBatchReader, or an object implementing"): tbl.append("not an arrow object") diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 1d1488255f..eb391144fd 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -768,7 +768,7 @@ def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog) -> Non properties={"format-version": "1"}, ) - with pytest.raises(ValueError, match="Expected pa.Table or pa.RecordBatchReader, got: not a df"): + with pytest.raises(ValueError, match="Expected pa.Table, pa.RecordBatchReader, or an object implementing"): tbl.append("not a df") diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 609c1863bc..d32385c3fb 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -791,10 +791,10 @@ def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_ identifier = "default.arrow_data_files" tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, []) - with pytest.raises(ValueError, match="Expected pa.Table or pa.RecordBatchReader, got: not a df"): + with pytest.raises(ValueError, match="Expected pa.Table, pa.RecordBatchReader, or an object implementing"): tbl.overwrite("not a df") - with pytest.raises(ValueError, match="Expected pa.Table or pa.RecordBatchReader, got: not a df"): + with pytest.raises(ValueError, match="Expected pa.Table, pa.RecordBatchReader, or an object implementing"): tbl.append("not a df") diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index fc8cf6d2f4..c830154e7f 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -273,7 +273,7 @@ def test_delete_file_to_pyiceberg_core_converts_delete_file_payload() -> None: assert converted.kwargs == {"partition_spec_id": 7, "equality_ids": None} -def test_delete_file_to_pyiceberg_core_rejects_equality_deletes_until_parity_lands() -> None: +def test_delete_file_to_pyiceberg_core_converts_equality_delete_payload() -> None: delete_file = DataFile.from_args( content=DataFileContent.EQUALITY_DELETES, file_path="s3://warehouse/table/eq-delete.parquet", @@ -291,8 +291,10 @@ def test_delete_file_to_pyiceberg_core_rejects_equality_deletes_until_parity_lan ) delete_file.spec_id = 7 - with pytest.raises(NotImplementedError, match="equality delete scan parity"): - delete_file_to_pyiceberg_core(delete_file) + converted = delete_file_to_pyiceberg_core(delete_file) + + assert converted.args == ("s3://warehouse/table/eq-delete.parquet", 123, "equality-deletes") + assert converted.kwargs == {"partition_spec_id": 7, "equality_ids": [1]} def test_file_scan_task_to_pyiceberg_core_converts_task_payload(simple_schema: Schema) -> None: diff --git a/tests/table/test_arrow_capsule.py b/tests/table/test_arrow_capsule.py new file mode 100644 index 0000000000..1596ef8a91 --- /dev/null +++ b/tests/table/test_arrow_capsule.py @@ -0,0 +1,200 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Tests for Arrow stream producer and consumer support.""" + +from collections.abc import Callable +from pathlib import Path +from typing import Any + +import pyarrow as pa +import pytest + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.io.pyarrow import _coerce_arrow_input +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType, NestedField, StringType + +SCHEMA = Schema( + NestedField(1, "id", IntegerType(), required=False), + NestedField(2, "region", StringType(), required=False), +) +ARROW_SCHEMA = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True), + pa.field("region", pa.string(), nullable=True), + ] +) +PARTITION_SPEC = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="region")) + + +class _ArrowStreamWrapper: + """A minimal third-party-style Arrow stream producer.""" + + def __init__(self, data: pa.Table): + self._data = data + + def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: + return self._data.__arrow_c_stream__(requested_schema) + + +@pytest.fixture +def catalog(tmp_path: Path) -> InMemoryCatalog: + catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix()) + catalog.create_namespace("default") + return catalog + + +def _data(ids: list[int], regions: list[str]) -> pa.Table: + return pa.table({"id": pa.array(ids, type=pa.int32()), "region": regions}, schema=ARROW_SCHEMA) + + +def _string_view_data(ids: list[int], regions: list[str]) -> pa.Table: + if not hasattr(pa, "string_view"): + pytest.skip("pyarrow does not support string_view") + return pa.table( + {"id": pa.array(ids, type=pa.int32()), "region": pa.array(regions, type=pa.string_view())}, + schema=pa.schema( + [ + pa.field("id", pa.int32(), nullable=True), + pa.field("region", pa.string_view(), nullable=True), + ] + ), + ) + + +def _rows(table: pa.Table) -> list[dict[str, Any]]: + return sorted(table.to_pylist(), key=lambda row: row["id"]) + + +def test_coerce_arrow_input() -> None: + table = _data([1, 2, 3], ["us", "eu", "us"]) + + assert _coerce_arrow_input(table) is table + reader = table.to_reader() + assert _coerce_arrow_input(reader) is reader + + coerced = _coerce_arrow_input(_ArrowStreamWrapper(table)) + assert isinstance(coerced, pa.RecordBatchReader) + assert coerced.read_all().num_rows == 3 + + with pytest.raises(ValueError, match="Expected pa.Table, pa.RecordBatchReader"): + _coerce_arrow_input(object()) + + +@pytest.mark.parametrize( + "make_input", + [ + pytest.param(lambda data: data, id="table"), + pytest.param(lambda data: data.to_reader(), id="reader"), + pytest.param(lambda data: _ArrowStreamWrapper(data), id="stream"), + pytest.param( + lambda data: _ArrowStreamWrapper(pa.Table.from_batches(data.to_batches(max_chunksize=1))), + id="multi_batch_stream", + ), + ], +) +def test_append_accepts_arrow_inputs(catalog: InMemoryCatalog, make_input: Callable[[pa.Table], object]) -> None: + tbl = catalog.create_table("default.append", schema=SCHEMA) + + tbl.append(make_input(_data([1, 2, 3], ["us", "eu", "us"]))) + + assert _rows(tbl.scan().to_arrow()) == _rows(_data([1, 2, 3], ["us", "eu", "us"])) + + +def test_overwrite_accepts_arrow_stream(catalog: InMemoryCatalog) -> None: + tbl = catalog.create_table("default.overwrite_stream", schema=SCHEMA) + tbl.append(_data([1, 2], ["us", "eu"])) + + tbl.overwrite(_ArrowStreamWrapper(_data([9], ["jp"]))) + + assert _rows(tbl.scan().to_arrow()) == _rows(_data([9], ["jp"])) + + +def test_append_accepts_arrow_stream_with_string_view(catalog: InMemoryCatalog) -> None: + tbl = catalog.create_table("default.append_string_view", schema=SCHEMA) + + tbl.append(_ArrowStreamWrapper(_string_view_data([10, 11], ["ca", "mx"]))) + + assert _rows(tbl.scan().to_arrow()) == _rows(_data([10, 11], ["ca", "mx"])) + + +def test_append_table_to_partitioned_table_keeps_partitioned_write_path(catalog: InMemoryCatalog) -> None: + tbl = catalog.create_table("default.append_partitioned", schema=SCHEMA, partition_spec=PARTITION_SPEC) + + tbl.append(_data([1, 2], ["us", "eu"])) + + assert tbl.scan().to_arrow().num_rows == 2 + + +@pytest.mark.parametrize( + "produce", + [ + pytest.param(lambda tbl: tbl, id="table"), + pytest.param(lambda tbl: tbl.scan(), id="scan"), + ], +) +def test_supports_arrow_c_stream(catalog: InMemoryCatalog, produce: Callable[[Table], object]) -> None: + tbl = catalog.create_table("default.stream", schema=SCHEMA) + tbl.append(_data([1, 2, 3], ["us", "eu", "us"])) + + consumed = pa.table(produce(tbl)) + + assert _rows(consumed) == _rows(tbl.scan().to_arrow()) + + +def test_scan_arrow_c_stream_respects_filter_and_projection(catalog: InMemoryCatalog) -> None: + tbl = catalog.create_table("default.scan_stream_filtered", schema=SCHEMA) + tbl.append(_data([1, 2, 3], ["us", "eu", "us"])) + + scan = tbl.scan(row_filter="region == 'us'", selected_fields=("id",)) + consumed = pa.table(scan) + + assert consumed.column_names == ["id"] + assert sorted(consumed.column("id").to_pylist()) == [1, 3] + + +def test_arrow_stream_roundtrip_scan_into_append(catalog: InMemoryCatalog) -> None: + src = catalog.create_table("default.roundtrip_src", schema=SCHEMA) + src.append(_data([1, 2, 3], ["us", "eu", "us"])) + dst = catalog.create_table("default.roundtrip_dst", schema=SCHEMA) + + dst.append(src.scan()) + + assert _rows(dst.scan().to_arrow()) == _rows(src.scan().to_arrow()) + + +def test_scan_to_duckdb_registers_stream(catalog: InMemoryCatalog) -> None: + pytest.importorskip("duckdb") + tbl = catalog.create_table("default.duckdb_stream", schema=SCHEMA) + tbl.append(_data([1, 2, 3], ["us", "eu", "us"])) + + con = tbl.scan().to_duckdb("iceberg_table") + + assert con.sql("select count(*) as count, sum(id) as total from iceberg_table").fetchall() == [(3, 6)] + + +def test_scan_to_polars_consumes_stream(catalog: InMemoryCatalog) -> None: + pytest.importorskip("polars") + tbl = catalog.create_table("default.polars_stream", schema=SCHEMA) + tbl.append(_data([1, 2, 3], ["us", "eu", "us"])) + + result = tbl.scan(row_filter="region == 'us'", selected_fields=("id",)).to_polars() + + assert sorted(result["id"].to_list()) == [1, 3] diff --git a/tests/table/test_scan_planning.py b/tests/table/test_scan_planning.py index 75c5c0ace7..7f9a1a3ac5 100644 --- a/tests/table/test_scan_planning.py +++ b/tests/table/test_scan_planning.py @@ -22,7 +22,7 @@ import pyarrow as pa import pytest -from pyiceberg.table import Table +from pyiceberg.table import PYICEBERG_RUST_SCAN_MODE, Table, TableProperties, _scan_backend_candidates from pyiceberg.table.scan_planning import ( LocalScanPlanner, RestScanPlanner, @@ -147,3 +147,36 @@ def _batch_reader(self: Any) -> pa.RecordBatchReader: assert seen == [scan] assert table.column("id").to_pylist() == [1, 2] + + +def test_scan_backend_candidates_preserve_legacy_env_precedence(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("PYICEBERG_RUST_SCAN_PLANNING", "1") + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SCAN", "1") + + assert [candidate.value for candidate in _scan_backend_candidates({})] == [ + "rust-plan-and-read", + "rust-read", + "pyarrow", + ] + + +def test_scan_backend_candidates_use_env_mode_before_table_property(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(PYICEBERG_RUST_SCAN_MODE, "rust-read") + + assert [ + candidate.value + for candidate in _scan_backend_candidates({TableProperties.PYICEBERG_CORE_SCAN_MODE: "rust-plan-and-read"}) + ] == [ + "rust-read", + "pyarrow", + ] + + +def test_scan_backend_candidates_use_table_property() -> None: + assert [ + candidate.value + for candidate in _scan_backend_candidates({TableProperties.PYICEBERG_CORE_SCAN_MODE: "rust-plan-and-read"}) + ] == [ + "rust-plan-and-read", + "pyarrow", + ] diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 08f90c6600..5786deed50 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. from pathlib import PosixPath +from typing import Any import pyarrow as pa import pytest @@ -119,6 +120,39 @@ def assert_upsert_result(res: UpsertResult, expected_updated: int, expected_inse assert res.rows_inserted == expected_inserted, f"rows inserted should be {expected_inserted}, but got {res.rows_inserted}" +def test_upsert_can_use_native_match_filter(catalog: Catalog, monkeypatch: pytest.MonkeyPatch) -> None: + identifier = "default.test_upsert_native_match_filter" + schema = Schema( + NestedField(1, "id", IntegerType(), required=False), + NestedField(2, "data", StringType(), required=False), + ) + table = catalog.create_table(identifier, schema=schema) + source = pa.table({"id": pa.array([1, 2], type=pa.int32()), "data": ["a", "b"]}) + native_calls: list[tuple[Any, ...]] = [] + + def _native_filter(df: pa.Table, join_cols: list[str]) -> object: + native_calls.append(("predicate", df.num_rows, join_cols)) + return object() + + def _native_read(*args: object, **kwargs: object) -> pa.RecordBatchReader: + native_calls.append(("read", kwargs["native_predicate"])) + return pa.RecordBatchReader.from_batches(source.schema, []) + + def _python_filter(*args: object, **kwargs: object) -> object: + raise AssertionError("Python match filter should not be built when native path succeeds") + + monkeypatch.setenv("PYICEBERG_RUST_UPSERT_PREDICATE", "1") + monkeypatch.setattr("pyiceberg.io.pyiceberg_core.match_filter_to_pyiceberg_core", _native_filter) + monkeypatch.setattr("pyiceberg.io.pyiceberg_core.plan_and_read_with_pyiceberg_core", _native_read) + monkeypatch.setattr("pyiceberg.table.upsert_util.create_match_filter", _python_filter) + + result = table.upsert(source, join_cols=["id"]) + + assert_upsert_result(result, expected_updated=0, expected_inserted=2) + assert native_calls[0] == ("predicate", 2, ["id"]) + assert native_calls[1][0] == "read" + + @pytest.mark.parametrize( ( "join_cols, src_start_row, src_end_row, target_start_row, target_end_row, "