Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ htmlcov
pyiceberg/avro/decoder_fast.c
pyiceberg/avro/*.html
pyiceberg/avro/*.so
research/
241 changes: 241 additions & 0 deletions dev/bench_native_product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Benchmark PyIceberg scan backends and Arrow stream handoffs.

The parent process launches one child per case so wall time, CPU time, and peak
RSS are isolated from previous cases. Native routes report ``unsupported`` when
the installed ``pyiceberg-core`` wheel does not expose the scan bindings.
"""

from __future__ import annotations

import argparse
import importlib
import json
import os
import resource
import shutil
import subprocess
import sys
import tempfile
import time
import warnings
from collections.abc import Callable
from pathlib import Path
from typing import Any

import pyarrow as pa

from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.schema import Schema
from pyiceberg.table import PYICEBERG_RUST_SCAN_MODE
from pyiceberg.types import DoubleType, IntegerType, LongType, NestedField

ENGINES = ("pyarrow", "rust-read", "rust-plan-and-read", "capsule-pyarrow", "capsule-polars", "duckdb")
SHAPES = ("full", "projection", "filter")


def _rss_kib() -> int:
return int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)


def _table_data(start: int, rows: int) -> pa.Table:
ids = pa.array(range(start, start + rows), type=pa.int64())
part = pa.array(((value % 8) for value in range(start, start + rows)), type=pa.int32())
payload = pa.array((float(value % 1024) for value in range(start, start + rows)), type=pa.float64())
return pa.table({"id": ids, "part": part, "payload": payload})


def _create_table(warehouse: Path, rows: int, files: int) -> Any:
if warehouse.exists():
shutil.rmtree(warehouse)
warehouse.mkdir(parents=True, exist_ok=True)

catalog = InMemoryCatalog("bench", warehouse=warehouse.as_uri())
catalog.create_namespace("default")
schema = Schema(
NestedField(1, "id", LongType(), required=False),
NestedField(2, "part", IntegerType(), required=False),
NestedField(3, "payload", DoubleType(), required=False),
)
table = catalog.create_table("default.scan_bench", schema=schema)

rows_per_file = max(1, rows // files)
offset = 0
for file_idx in range(files):
chunk_rows = rows - offset if file_idx == files - 1 else rows_per_file
table.append(_table_data(offset, chunk_rows))
offset += chunk_rows
return table


def _scan_for_shape(table: Any, shape: str) -> Any:
if shape == "full":
return table.scan()
if shape == "projection":
return table.scan(selected_fields=("id", "payload"))
if shape == "filter":
return table.scan(row_filter="part == 1", selected_fields=("id", "payload"))
raise ValueError(f"Unknown shape: {shape}")


def _table_digest(table: pa.Table) -> dict[str, Any]:
if "id" not in table.column_names:
return {"rows": table.num_rows, "sum_id": None}
return {"rows": table.num_rows, "sum_id": table["id"].combine_chunks().sum().as_py()}


def _run_scan(scan: Any, engine: str) -> dict[str, Any]:
os.environ.pop(PYICEBERG_RUST_SCAN_MODE, None)
if engine == "rust-read":
importlib.import_module("pyiceberg_core.scan")
os.environ[PYICEBERG_RUST_SCAN_MODE] = "rust-read"
elif engine == "rust-plan-and-read":
importlib.import_module("pyiceberg_core.scan")
os.environ[PYICEBERG_RUST_SCAN_MODE] = "rust-plan-and-read"

if engine in {"rust-read", "rust-plan-and-read"}:
with warnings.catch_warnings():
warnings.filterwarnings("error", message="Falling back to PyArrow scan.*", category=RuntimeWarning)
return _table_digest(scan.to_arrow_batch_reader().read_all())
if engine == "pyarrow":
return _table_digest(scan.to_arrow_batch_reader().read_all())
if engine == "capsule-pyarrow":
return _table_digest(pa.table(scan))
if engine == "capsule-polars":
import polars as pl

frame = pl.DataFrame(scan)
return {"rows": frame.height, "sum_id": frame["id"].sum() if "id" in frame.columns else None}
if engine == "duckdb":
con = scan.to_duckdb("iceberg_scan")
rows, sum_id = con.sql("select count(*) as rows, sum(id) as sum_id from iceberg_scan").fetchone()
return {"rows": rows, "sum_id": sum_id}
raise ValueError(f"Unknown engine: {engine}")


def _measure(fn: Callable[[], dict[str, Any]]) -> dict[str, Any]:
rss_before = _rss_kib()
cpu_before = time.process_time()
wall_before = time.perf_counter()
digest = fn()
wall_ms = (time.perf_counter() - wall_before) * 1000
cpu_ms = (time.process_time() - cpu_before) * 1000
rss_after = _rss_kib()
return {
**digest,
"wall_ms": wall_ms,
"cpu_ms": cpu_ms,
"peak_rss_kib": rss_after,
"delta_peak_rss_kib": max(0, rss_after - rss_before),
}


def _child(args: argparse.Namespace) -> int:
result: dict[str, Any] = {
"engine": args.engine,
"shape": args.shape,
"rows_input": args.rows,
"files": args.files,
"status": "ok",
}
try:
table = _create_table(Path(args.warehouse), args.rows, args.files)
scan = _scan_for_shape(table, args.shape)
result.update(_measure(lambda: _run_scan(scan, args.engine)))
except Exception as exc:
error_message = str(exc)
result["status"] = (
"unsupported"
if "pyiceberg-core" in error_message
or "pyiceberg_core" in error_message
or "Falling back to PyArrow scan" in error_message
else "error"
)
result["error_type"] = type(exc).__name__
result["error"] = str(exc)
print(json.dumps(result, sort_keys=True))
return 0


def _parent(args: argparse.Namespace) -> int:
engines = args.engines.split(",") if args.engines else list(ENGINES)
shapes = args.shapes.split(",") if args.shapes else list(SHAPES)
for engine in engines:
if engine not in ENGINES:
raise ValueError(f"Unknown engine: {engine}")
for shape in shapes:
if shape not in SHAPES:
raise ValueError(f"Unknown shape: {shape}")

records: list[dict[str, Any]] = []
for shape in shapes:
for engine in engines:
for repeat in range(args.repeats):
with tempfile.TemporaryDirectory(prefix="pyiceberg-bench-") as tmp:
cmd = [
sys.executable,
__file__,
"--child",
"--engine",
engine,
"--shape",
shape,
"--rows",
str(args.rows),
"--files",
str(args.files),
"--warehouse",
tmp,
]
proc = subprocess.run(cmd, text=True, check=False, capture_output=True)
if proc.stderr:
print(proc.stderr, file=sys.stderr, end="")
record = json.loads(proc.stdout.strip().splitlines()[-1])
record["repeat"] = repeat
records.append(record)
print(json.dumps(record, sort_keys=True))

if args.output:
Path(args.output).write_text(json.dumps(records, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return 0


def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--rows", type=int, default=1_000_000)
parser.add_argument("--files", type=int, default=16)
parser.add_argument("--repeats", type=int, default=3)
parser.add_argument("--engines", help=f"Comma-separated subset of: {', '.join(ENGINES)}")
parser.add_argument("--shapes", help=f"Comma-separated subset of: {', '.join(SHAPES)}")
parser.add_argument("--output")
parser.add_argument("--child", action="store_true")
parser.add_argument("--engine", choices=ENGINES)
parser.add_argument("--shape", choices=SHAPES)
parser.add_argument("--warehouse", default="")
return parser.parse_args()


def main() -> int:
args = _parse_args()
if args.child:
return _child(args)
return _parent(args)


if __name__ == "__main__":
raise SystemExit(main())
54 changes: 40 additions & 14 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.table.puffin import PuffinFile
from pyiceberg.transforms import IdentityTransform, TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
from pyiceberg.typedef import EMPTY_DICT, ArrowStreamExportable, Properties, Record, TableVersion
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -2680,30 +2680,40 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``.

Note:
``target_file_size`` is measured in **uncompressed in-memory** Arrow bytes
(``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk Parquet
bytes. The resulting Parquet file after compression (zstd by default,
plus dictionary/RLE encoding) is typically 3-10× smaller than
``target_file_size``. This is a coarse proxy for the spec-defined
``target_file_size`` is measured in **uncompressed in-memory** Arrow
bytes, not compressed on-disk Parquet bytes. The size estimate uses
``nbytes`` when available and falls back to referenced buffer size for
Arrow view types that do not support ``nbytes``. The resulting Parquet
file after compression (zstd by default, plus dictionary/RLE encoding)
is typically 3-10× smaller than ``target_file_size``. This is a coarse
proxy for the spec-defined
``write.target-file-size-bytes`` and will be tightened to true on-disk
bytes once the writer is switched to a rolling-``ParquetWriter`` with
``OutputStream.tell()`` (#2998).
``OutputStream.tell()``.
"""
from pyiceberg.utils.bin_packing import PackingIterator

avg_row_size_bytes = tbl.nbytes / tbl.num_rows
avg_row_size_bytes = _arrow_data_size(tbl) / tbl.num_rows
target_rows_per_file = max(1, int(target_file_size / avg_row_size_bytes))
batches = tbl.to_batches(max_chunksize=target_rows_per_file)
bin_packed_record_batches = PackingIterator(
items=batches,
target_weight=target_file_size,
lookback=len(batches), # ignore lookback
weight_func=lambda x: x.nbytes,
weight_func=_arrow_data_size,
largest_bin_first=False,
)
return bin_packed_record_batches


def _arrow_data_size(data: pa.Table | pa.RecordBatch) -> int:
"""Estimate Arrow data size for writer bin-packing."""
try:
return data.nbytes
except pyarrow.lib.ArrowTypeError:
return data.get_total_buffer_size()


def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
"""Microbatch a single-pass stream of RecordBatches into target-sized groups.

Expand All @@ -2719,18 +2729,20 @@ def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size:

Note:
``target_file_size`` is measured in **uncompressed in-memory** Arrow
bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes.
The resulting Parquet file after compression is typically 3-10×
smaller than ``target_file_size``. Matches the existing
bytes, not compressed on-disk Parquet bytes. The size estimate uses
``nbytes`` when available and falls back to referenced buffer size for
Arrow view types that do not support ``nbytes``. The resulting Parquet
file after compression is typically 3-10× smaller than
``target_file_size``. Matches the existing
:func:`bin_pack_arrow_table` semantics; both will be tightened to true
on-disk bytes once the writer is switched to a rolling-
``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
``ParquetWriter`` with ``OutputStream.tell()``.
"""
buffer: list[pa.RecordBatch] = []
buffer_bytes = 0
for batch in batches:
buffer.append(batch)
buffer_bytes += batch.nbytes
buffer_bytes += _arrow_data_size(batch)
if buffer_bytes >= target_file_size:
yield buffer
buffer = []
Expand Down Expand Up @@ -3033,3 +3045,17 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
field_array = arrow_table[path_parts[0]]
# Navigate into the struct using the remaining path parts
return pc.struct_field(field_array, path_parts[1:])


def _coerce_arrow_input(df: pa.Table | pa.RecordBatchReader | ArrowStreamExportable) -> pa.Table | pa.RecordBatchReader:
"""Normalize Arrow write input to a PyArrow table or stream reader."""
if isinstance(df, (pa.Table, pa.RecordBatchReader)):
return df

if hasattr(df, "__arrow_c_stream__"):
return pa.RecordBatchReader.from_stream(df)

raise ValueError(
f"Expected pa.Table, pa.RecordBatchReader, or an object implementing the "
f"Arrow PyCapsule interface (__arrow_c_stream__), got: {df!r}"
)
Loading