Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1625,8 +1625,12 @@ def _task_to_record_batches(
partition_spec: PartitionSpec | None = None,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
downcast_ns_timestamp_to_us: bool | None = None,
dictionary_columns: frozenset[str] = frozenset(),
) -> Iterator[pa.RecordBatch]:
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8}
if dictionary_columns and task.file.file_format == FileFormat.PARQUET:
format_kwargs["dictionary_columns"] = tuple(dictionary_columns)
arrow_format = _get_file_format(task.file.file_format, **format_kwargs)
with io.new_input(task.file.file_path).open() as fin:
fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
Expand Down Expand Up @@ -1729,6 +1733,7 @@ class ArrowScan:
_case_sensitive: bool
_limit: int | None
_downcast_ns_timestamp_to_us: bool | None
_dictionary_columns: frozenset[str]
"""Scan the Iceberg Table and create an Arrow construct.

Attributes:
Expand All @@ -1738,6 +1743,7 @@ class ArrowScan:
_bound_row_filter: Schema bound row expression to filter the data with
_case_sensitive: Case sensitivity when looking up column names
_limit: Limit the number of records.
_dictionary_columns: Column names to read as dictionary-encoded arrays.
"""

def __init__(
Expand All @@ -1748,6 +1754,8 @@ def __init__(
row_filter: BooleanExpression,
case_sensitive: bool = True,
limit: int | None = None,
*,
dictionary_columns: tuple[str, ...] = (),
) -> None:
self._table_metadata = table_metadata
self._io = io
Expand All @@ -1756,6 +1764,7 @@ def __init__(
self._case_sensitive = case_sensitive
self._limit = limit
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
self._dictionary_columns = frozenset(dictionary_columns)

@property
def _projected_field_ids(self) -> set[int]:
Expand Down Expand Up @@ -1866,6 +1875,7 @@ def _record_batches_from_scan_tasks_and_deletes(
self._table_metadata.specs().get(task.file.spec_id),
self._table_metadata.format_version,
self._downcast_ns_timestamp_to_us,
self._dictionary_columns,
)
for batch in batches:
if self._limit is not None:
Expand Down
48 changes: 37 additions & 11 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2072,13 +2072,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
return lambda datafile: residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)

@staticmethod
Expand Down Expand Up @@ -2213,27 +2211,49 @@ def plan_files(self) -> Iterable[FileScanTask]:
return self._plan_files_server_side()
return self._plan_files_local()

def to_arrow(self) -> pa.Table:
def to_arrow(self, dictionary_columns: tuple[str, ...] = ()) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.

All rows will be loaded into memory at once.

Args:
dictionary_columns:
A tuple of column names that PyArrow should read as
dictionary-encoded (``pa.DictionaryArray``). Dictionary
encoding can substantially reduce memory usage for columns
with low-cardinality repeated string values.
Only applies to Parquet files; silently ignored for ORC.

Returns:
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
"""
from pyiceberg.io.pyarrow import ArrowScan

return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
self.table_metadata,
self.io,
self.projection(),
self.row_filter,
self.case_sensitive,
self.limit,
dictionary_columns=dictionary_columns,
).to_table(self.plan_files())

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.

Args:
dictionary_columns:
A tuple of column names that PyArrow should read as
dictionary-encoded (``pa.DictionaryArray``). Dictionary
encoding can substantially reduce memory usage for columns
with low-cardinality repeated string values.
Only applies to Parquet files; silently ignored for ORC.

Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
which can be used to read a stream of record batches one by one.
Expand All @@ -2244,7 +2264,13 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:

target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
self.table_metadata,
self.io,
self.projection(),
self.row_filter,
self.case_sensitive,
self.limit,
dictionary_columns=dictionary_columns,
).to_record_batches(self.plan_files())

return pa.RecordBatchReader.from_batches(
Expand Down
70 changes: 70 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5103,3 +5103,73 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata
result_sorted = result.sort_by("name")
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]


def test_dictionary_columns_produces_dict_encoded_output(tmpdir: str) -> None:
"""dictionary_columns passed to ArrowScan must yield dictionary-encoded arrays.

Verifies that:
1. The requested column is returned as a pa.DictionaryArray.
2. Values are identical to a plain (non-dict) scan.
3. A column NOT in dictionary_columns is still returned as a plain array.
"""
from pyiceberg.expressions import AlwaysTrue
from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table import FileScanTask
from pyiceberg.table.metadata import TableMetadataV2

arrow_schema = pa.schema(
[
pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),
pa.field("label", pa.string(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "2"}),
]
)
arrow_table = pa.table(
[pa.array([1, 2, 3, 4], type=pa.int32()), pa.array(["a", "b", "a", "b"], type=pa.string())],
schema=arrow_schema,
)
data_file = _write_table_to_data_file(f"{tmpdir}/test_dict_cols.parquet", arrow_schema, arrow_table)
data_file.spec_id = 0

iceberg_schema = Schema(
NestedField(1, "id", IntegerType(), required=False),
NestedField(2, "label", StringType(), required=False),
)
table_metadata = TableMetadataV2(
location=f"file://{tmpdir}",
last_column_id=2,
format_version=2,
schemas=[iceberg_schema],
partition_specs=[PartitionSpec()],
)
io = PyArrowFileIO()
task = FileScanTask(data_file)

scan_plain = ArrowScan(
table_metadata=table_metadata,
io=io,
projected_schema=iceberg_schema,
row_filter=AlwaysTrue(),
)
scan_dict = ArrowScan(
table_metadata=table_metadata,
io=io,
projected_schema=iceberg_schema,
row_filter=AlwaysTrue(),
dictionary_columns=("label",),
)

result_plain = scan_plain.to_table([task])
result_dict = scan_dict.to_table([task])

# id column is not in dictionary_columns — both scans should return int32
assert result_plain.schema.field("id").type == pa.int32()
assert result_dict.schema.field("id").type == pa.int32()

# label column: plain scan → string, dict scan → dictionary<values=string, indices=int32>
assert result_plain.schema.field("label").type == pa.string()
assert pa.types.is_dictionary(result_dict.schema.field("label").type)

# Values must be identical
assert result_plain.column("label").to_pylist() == result_dict.column("label").to_pylist()