From 7830bfbf4ba83e98bce742b983ab3bcc15086060 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 12:41:29 +0530 Subject: [PATCH 1/3] feat: add dictionary_columns parameter to Table.scan() for memory-efficient reads Columns that contain large or frequently repeated strings (e.g. JSON blobs, low-cardinality categoricals) can exhaust memory when PyArrow loads them as plain string arrays. PyArrow's Parquet reader supports reading such columns as dictionary-encoded arrays, which deduplicates values and can dramatically reduce memory usage. Add a dictionary_columns: tuple[str, ...] parameter to Table.scan() (and the underlying TableScan / ArrowScan classes) that is forwarded to _get_file_format() as PyArrow's dictionary_columns kwarg. Only applies to Parquet files; silently ignored for ORC. Usage: table.scan(dictionary_columns=("payload",)).to_arrow() Co-Authored-By: Claude Sonnet 4.6 --- pyiceberg/io/pyarrow.py | 12 ++++++- pyiceberg/table/__init__.py | 42 +++++++++++++++++----- tests/io/test_pyarrow.py | 70 +++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 10 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4ec7a73afe..8b0b07116a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1625,8 +1625,12 @@ def _task_to_record_batches( partition_spec: PartitionSpec | None = None, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, downcast_ns_timestamp_to_us: bool | None = None, + dictionary_columns: frozenset[str] = frozenset(), ) -> Iterator[pa.RecordBatch]: - arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) + format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8} + if dictionary_columns and task.file.file_format == FileFormat.PARQUET: + format_kwargs["dictionary_columns"] = tuple(dictionary_columns) + arrow_format = _get_file_format(task.file.file_format, **format_kwargs) with io.new_input(task.file.file_path).open() as fin: fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema @@ -1729,6 +1733,7 @@ class ArrowScan: _case_sensitive: bool _limit: int | None _downcast_ns_timestamp_to_us: bool | None + _dictionary_columns: frozenset[str] """Scan the Iceberg Table and create an Arrow construct. Attributes: @@ -1738,6 +1743,7 @@ class ArrowScan: _bound_row_filter: Schema bound row expression to filter the data with _case_sensitive: Case sensitivity when looking up column names _limit: Limit the number of records. + _dictionary_columns: Column names to read as dictionary-encoded arrays. """ def __init__( @@ -1748,6 +1754,8 @@ def __init__( row_filter: BooleanExpression, case_sensitive: bool = True, limit: int | None = None, + *, + dictionary_columns: tuple[str, ...] = (), ) -> None: self._table_metadata = table_metadata self._io = io @@ -1756,6 +1764,7 @@ def __init__( self._case_sensitive = case_sensitive self._limit = limit self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) + self._dictionary_columns = frozenset(dictionary_columns) @property def _projected_field_ids(self) -> set[int]: @@ -1866,6 +1875,7 @@ def _record_batches_from_scan_tasks_and_deletes( self._table_metadata.specs().get(task.file.spec_id), self._table_metadata.format_version, self._downcast_ns_timestamp_to_us, + self._dictionary_columns, ) for batch in batches: if self._limit is not None: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 64ad10050d..ea2752217b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1219,6 +1219,7 @@ def scan( snapshot_id: int | None = None, options: Properties = EMPTY_DICT, limit: int | None = None, + dictionary_columns: tuple[str, ...] = (), ) -> DataScan: """Fetch a DataScan based on the table's current metadata. @@ -1245,6 +1246,14 @@ def scan( An integer representing the number of rows to return in the scan result. If None, fetches all matching rows. + dictionary_columns: + A tuple of column names that PyArrow should read as + dictionary-encoded (``pa.DictionaryArray``). Dictionary + encoding can substantially reduce memory usage for columns + that contain large or frequently repeated string values + (e.g. large JSON blobs or low-cardinality categoricals). + Only applies to Parquet files; silently ignored for ORC. + Columns absent from the file are silently skipped. Returns: A DataScan based on the table's current metadata. @@ -1260,6 +1269,7 @@ def scan( limit=limit, catalog=self.catalog, table_identifier=self._identifier, + dictionary_columns=dictionary_columns, ) @property @@ -1775,6 +1785,7 @@ def scan( snapshot_id: int | None = None, options: Properties = EMPTY_DICT, limit: int | None = None, + dictionary_columns: tuple[str, ...] = (), ) -> DataScan: raise ValueError("Cannot scan a staged table") @@ -1809,6 +1820,7 @@ class TableScan(ABC): limit: int | None catalog: Catalog | None table_identifier: Identifier | None + dictionary_columns: tuple[str, ...] def __init__( self, @@ -1822,6 +1834,7 @@ def __init__( limit: int | None = None, catalog: Catalog | None = None, table_identifier: Identifier | None = None, + dictionary_columns: tuple[str, ...] = (), ): self.table_metadata = table_metadata self.io = io @@ -1833,6 +1846,7 @@ def __init__( self.limit = limit self.catalog = catalog self.table_identifier = table_identifier + self.dictionary_columns = dictionary_columns def snapshot(self) -> Snapshot | None: if self.snapshot_id: @@ -2072,13 +2086,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + return lambda datafile: residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), ) @staticmethod @@ -2224,7 +2236,13 @@ def to_arrow(self) -> pa.Table: from pyiceberg.io.pyarrow import ArrowScan return ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + self.table_metadata, + self.io, + self.projection(), + self.row_filter, + self.case_sensitive, + self.limit, + dictionary_columns=self.dictionary_columns, ).to_table(self.plan_files()) def to_arrow_batch_reader(self) -> pa.RecordBatchReader: @@ -2244,7 +2262,13 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: target_schema = schema_to_pyarrow(self.projection()) batches = ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + self.table_metadata, + self.io, + self.projection(), + self.row_filter, + self.case_sensitive, + self.limit, + dictionary_columns=self.dictionary_columns, ).to_record_batches(self.plan_files()) return pa.RecordBatchReader.from_batches( diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2f36661a1f..bcbf873c2a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -5103,3 +5103,73 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata result_sorted = result.sort_by("name") assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"] assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"] + + +def test_dictionary_columns_produces_dict_encoded_output(tmpdir: str) -> None: + """dictionary_columns passed to ArrowScan must yield dictionary-encoded arrays. + + Verifies that: + 1. The requested column is returned as a pa.DictionaryArray. + 2. Values are identical to a plain (non-dict) scan. + 3. A column NOT in dictionary_columns is still returned as a plain array. + """ + from pyiceberg.expressions import AlwaysTrue + from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO + from pyiceberg.partitioning import PartitionSpec + from pyiceberg.table import FileScanTask + from pyiceberg.table.metadata import TableMetadataV2 + + arrow_schema = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}), + pa.field("label", pa.string(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "2"}), + ] + ) + arrow_table = pa.table( + [pa.array([1, 2, 3, 4], type=pa.int32()), pa.array(["a", "b", "a", "b"], type=pa.string())], + schema=arrow_schema, + ) + data_file = _write_table_to_data_file(f"{tmpdir}/test_dict_cols.parquet", arrow_schema, arrow_table) + data_file.spec_id = 0 + + iceberg_schema = Schema( + NestedField(1, "id", IntegerType(), required=False), + NestedField(2, "label", StringType(), required=False), + ) + table_metadata = TableMetadataV2( + location=f"file://{tmpdir}", + last_column_id=2, + format_version=2, + schemas=[iceberg_schema], + partition_specs=[PartitionSpec()], + ) + io = PyArrowFileIO() + task = FileScanTask(data_file) + + scan_plain = ArrowScan( + table_metadata=table_metadata, + io=io, + projected_schema=iceberg_schema, + row_filter=AlwaysTrue(), + ) + scan_dict = ArrowScan( + table_metadata=table_metadata, + io=io, + projected_schema=iceberg_schema, + row_filter=AlwaysTrue(), + dictionary_columns=("label",), + ) + + result_plain = scan_plain.to_table([task]) + result_dict = scan_dict.to_table([task]) + + # id column is not in dictionary_columns — both scans should return int32 + assert result_plain.schema.field("id").type == pa.int32() + assert result_dict.schema.field("id").type == pa.int32() + + # label column: plain scan → string, dict scan → dictionary + assert result_plain.schema.field("label").type == pa.string() + assert pa.types.is_dictionary(result_dict.schema.field("label").type) + + # Values must be identical + assert result_plain.column("label").to_pylist() == result_dict.column("label").to_pylist() From c9a8132372862d1325565053d1b54919bcb07b10 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Sat, 6 Jun 2026 12:57:55 +0530 Subject: [PATCH 2/3] refactor: move dictionary_columns from scan() to to_arrow() / to_arrow_batch_reader() Addresses reviewer feedback that dictionary_columns is an Arrow-specific concern and should not be part of the general-purpose scan() public API. The parameter is now accepted directly by the Arrow output methods: table.scan(...).to_arrow(dictionary_columns=("payload",)) table.scan(...).to_arrow_batch_reader(dictionary_columns=("payload",)) ArrowScan still accepts dictionary_columns for lower-level use. Co-Authored-By: Claude Sonnet 4.6 --- pyiceberg/table/__init__.py | 38 +++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ea2752217b..463ea60d52 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1219,7 +1219,6 @@ def scan( snapshot_id: int | None = None, options: Properties = EMPTY_DICT, limit: int | None = None, - dictionary_columns: tuple[str, ...] = (), ) -> DataScan: """Fetch a DataScan based on the table's current metadata. @@ -1246,14 +1245,6 @@ def scan( An integer representing the number of rows to return in the scan result. If None, fetches all matching rows. - dictionary_columns: - A tuple of column names that PyArrow should read as - dictionary-encoded (``pa.DictionaryArray``). Dictionary - encoding can substantially reduce memory usage for columns - that contain large or frequently repeated string values - (e.g. large JSON blobs or low-cardinality categoricals). - Only applies to Parquet files; silently ignored for ORC. - Columns absent from the file are silently skipped. Returns: A DataScan based on the table's current metadata. @@ -1269,7 +1260,6 @@ def scan( limit=limit, catalog=self.catalog, table_identifier=self._identifier, - dictionary_columns=dictionary_columns, ) @property @@ -1785,7 +1775,6 @@ def scan( snapshot_id: int | None = None, options: Properties = EMPTY_DICT, limit: int | None = None, - dictionary_columns: tuple[str, ...] = (), ) -> DataScan: raise ValueError("Cannot scan a staged table") @@ -1820,7 +1809,6 @@ class TableScan(ABC): limit: int | None catalog: Catalog | None table_identifier: Identifier | None - dictionary_columns: tuple[str, ...] def __init__( self, @@ -1834,7 +1822,6 @@ def __init__( limit: int | None = None, catalog: Catalog | None = None, table_identifier: Identifier | None = None, - dictionary_columns: tuple[str, ...] = (), ): self.table_metadata = table_metadata self.io = io @@ -1846,7 +1833,6 @@ def __init__( self.limit = limit self.catalog = catalog self.table_identifier = table_identifier - self.dictionary_columns = dictionary_columns def snapshot(self) -> Snapshot | None: if self.snapshot_id: @@ -2225,11 +2211,19 @@ def plan_files(self) -> Iterable[FileScanTask]: return self._plan_files_server_side() return self._plan_files_local() - def to_arrow(self) -> pa.Table: + def to_arrow(self, dictionary_columns: tuple[str, ...] = ()) -> pa.Table: """Read an Arrow table eagerly from this DataScan. All rows will be loaded into memory at once. + Args: + dictionary_columns: + A tuple of column names that PyArrow should read as + dictionary-encoded (``pa.DictionaryArray``). Dictionary + encoding can substantially reduce memory usage for columns + with low-cardinality repeated string values. + Only applies to Parquet files; silently ignored for ORC. + Returns: pa.Table: Materialized Arrow Table from the Iceberg table's DataScan """ @@ -2242,16 +2236,24 @@ def to_arrow(self) -> pa.Table: self.row_filter, self.case_sensitive, self.limit, - dictionary_columns=self.dictionary_columns, + dictionary_columns=dictionary_columns, ).to_table(self.plan_files()) - def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa.RecordBatchReader: """Return an Arrow RecordBatchReader from this DataScan. For large results, using a RecordBatchReader requires less memory than loading an Arrow Table for the same DataScan, because a RecordBatch is read one at a time. + Args: + dictionary_columns: + A tuple of column names that PyArrow should read as + dictionary-encoded (``pa.DictionaryArray``). Dictionary + encoding can substantially reduce memory usage for columns + with low-cardinality repeated string values. + Only applies to Parquet files; silently ignored for ORC. + Returns: pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan which can be used to read a stream of record batches one by one. @@ -2268,7 +2270,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: self.row_filter, self.case_sensitive, self.limit, - dictionary_columns=self.dictionary_columns, + dictionary_columns=dictionary_columns, ).to_record_batches(self.plan_files()) return pa.RecordBatchReader.from_batches( From e429e48b2927ab66ea30f4ec03ef848b9d04e898 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Sat, 6 Jun 2026 13:21:36 +0530 Subject: [PATCH 3/3] ci: re-trigger CI after transient runner cache failure