From b79fff7d611a23bcbbbdcd13af2afb8909493bf3 Mon Sep 17 00:00:00 2001 From: jayceslesar <47452474+jayceslesar@users.noreply.github.com> Date: Thu, 28 May 2026 07:52:21 +0530 Subject: [PATCH 1/8] Modify list_* methods in catalogs to return Iterators --- pyiceberg/catalog/__init__.py | 14 +++--- pyiceberg/catalog/bigquery_metastore.py | 11 ++--- pyiceberg/catalog/dynamodb.py | 22 +++++---- pyiceberg/catalog/glue.py | 17 +++---- pyiceberg/catalog/hive.py | 31 +++++++------ pyiceberg/catalog/noop.py | 7 +-- pyiceberg/catalog/rest/__init__.py | 13 +++--- pyiceberg/catalog/sql.py | 23 ++++++---- pyiceberg/cli/console.py | 21 ++++++--- pyiceberg/cli/output.py | 7 +-- tests/catalog/integration_test_dynamodb.py | 6 +-- tests/catalog/integration_test_glue.py | 6 +-- tests/catalog/test_bigquery_metastore.py | 4 +- tests/catalog/test_catalog_behaviors.py | 26 +++++------ tests/catalog/test_dynamodb.py | 16 +++---- tests/catalog/test_glue.py | 16 +++---- tests/catalog/test_hive.py | 4 +- tests/catalog/test_rest.py | 52 +++++++++++----------- tests/integration/test_catalog.py | 6 +-- 19 files changed, 163 insertions(+), 139 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 95ceaa539f..fcb1465c68 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -22,7 +22,7 @@ import re import uuid from abc import ABC, abstractmethod -from collections.abc import Callable +from collections.abc import Callable, Iterator from dataclasses import dataclass from enum import Enum from typing import ( @@ -607,42 +607,42 @@ def drop_namespace(self, namespace: str | Identifier) -> None: """ @abstractmethod - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: """List views under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of view identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. diff --git a/pyiceberg/catalog/bigquery_metastore.py b/pyiceberg/catalog/bigquery_metastore.py index 938ac6992f..c9c46a6d1c 100644 --- a/pyiceberg/catalog/bigquery_metastore.py +++ b/pyiceberg/catalog/bigquery_metastore.py @@ -17,6 +17,7 @@ from __future__ import annotations import json +from collections.abc import Iterator from typing import TYPE_CHECKING, Any from google.api_core.exceptions import NotFound @@ -252,7 +253,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: database_name = self.identifier_to_database(namespace) iceberg_tables: list[Identifier] = [] try: @@ -264,10 +265,10 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: iceberg_tables.append((database_name, bq_table_list_item.table_id)) except NotFound: raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}' not found.") from None - return iceberg_tables + return iter(iceberg_tables) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: # Since this catalog only supports one-level namespaces, it always returns an empty list unless # passed an empty namespace to list all namespaces within the catalog. if namespace: @@ -275,7 +276,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: # List top-level datasets datasets_iterator = self.client.list_datasets() - return [(dataset.dataset_id,) for dataset in datasets_iterator] + return iter([(dataset.dataset_id,) for dataset in datasets_iterator]) @override def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table: @@ -314,7 +315,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o return self.load_table(identifier=identifier) @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 74c0be6c9a..0a07c51771 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import uuid +from collections.abc import Iterator from time import time from typing import ( TYPE_CHECKING, @@ -396,8 +397,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None: database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) table_identifiers = self.list_tables(namespace=database_name) - if len(table_identifiers) > 0: + try: + next(table_identifiers) raise NamespaceNotEmptyError(f"Database {database_name} is not empty") + except StopIteration: + pass try: self._delete_dynamo_item( @@ -409,14 +413,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) @@ -451,20 +455,20 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: table_identifiers.append(self.identifier_to_tuple(identifier_col)) - return table_identifiers + return iter(table_identifiers) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List top-level namespaces from the catalog. We do not support hierarchical namespace. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return iter([]) paginator = self.dynamodb.get_paginator("query") @@ -494,7 +498,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: namespace_col = _dict[DYNAMODB_COL_NAMESPACE] database_identifiers.append(self.identifier_to_tuple(namespace_col)) - return database_identifiers + return iter(database_identifiers) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: @@ -565,7 +569,7 @@ def create_view( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 12b36efc5c..e06716d82d 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -17,6 +17,7 @@ import logging +from collections.abc import Iterator from typing import ( TYPE_CHECKING, Any, @@ -860,14 +861,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None: self.glue.delete_database(Name=database_name) @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. @@ -889,18 +890,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e - return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)] + return iter([(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return iter([]) database_list: list[DatabaseTypeDef] = [] next_token: str | None = None @@ -912,7 +913,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: if not next_token: break - return [self.identifier_to_tuple(database["Name"]) for database in database_list] + return iter([self.identifier_to_tuple(database["Name"]) for database in database_list]) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: @@ -982,7 +983,7 @@ def create_view( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 181f9d4661..4e34426c5a 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -18,6 +18,7 @@ import logging import socket import time +from collections.abc import Iterator from types import TracebackType from typing import ( TYPE_CHECKING, @@ -479,7 +480,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o return self._convert_hive_into_iceberg(hive_table) @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override @@ -760,7 +761,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. When the database doesn't exist, it will just return an empty list. @@ -769,34 +770,36 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: namespace: Database to list. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) with self._client as open_client: - return [ - (database_name, table.tableName) - for table in open_client.get_table_objects_by_name( - dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name) - ) - if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG - ] + return iter( + [ + (database_name, table.tableName) + for table in open_client.get_table_objects_by_name( + dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name) + ) + if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG + ] + ) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return iter([]) with self._client as open_client: - return list(map(self.identifier_to_tuple, open_client.get_all_databases())) + return iter(list(map(self.identifier_to_tuple, open_client.get_all_databases()))) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index aeb3c72843..f1afae7a67 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +from collections.abc import Iterator from typing import ( TYPE_CHECKING, ) @@ -124,11 +125,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NotImplementedError @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: raise NotImplementedError @override @@ -142,7 +143,7 @@ def update_namespace_properties( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index d085c6fd87..b83b136049 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -17,6 +17,7 @@ from __future__ import annotations from collections import deque +from collections.abc import Iterator from enum import Enum from typing import ( TYPE_CHECKING, @@ -1038,7 +1039,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o @retry(**_RETRY_ARGS) @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: self._check_endpoint(Capability.V1_LIST_TABLES) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = self._encode_namespace_path(namespace_tuple) @@ -1070,7 +1071,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: break page_token = parsed.next_page_token - return tables + return iter(tables) @retry(**_RETRY_ARGS) @override @@ -1151,7 +1152,7 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm @retry(**_RETRY_ARGS) @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: if Capability.V1_LIST_VIEWS not in self._supported_endpoints: return [] namespace_tuple = self._check_valid_namespace_identifier(namespace) @@ -1185,7 +1186,7 @@ def list_views(self, namespace: str | Identifier) -> list[Identifier]: break page_token = parsed.next_page_token - return views + return iter(views) @retry(**_RETRY_ARGS) @override @@ -1276,7 +1277,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: @retry(**_RETRY_ARGS) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: self._check_endpoint(Capability.V1_LIST_NAMESPACES) namespace_tuple = self.identifier_to_tuple(namespace) @@ -1309,7 +1310,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: break page_token = parsed.next_page_token - return namespaces + return iter(namespaces) @retry(**_RETRY_ARGS) @override diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 87446bd58b..0112dcb677 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -17,6 +17,7 @@ from __future__ import annotations +from collections.abc import Iterator from typing import ( TYPE_CHECKING, ) @@ -586,8 +587,12 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") namespace_str = Catalog.namespace_to_string(namespace) - if tables := self.list_tables(namespace): - raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty. {len(tables)} tables exist.") + tables_iter = self.list_tables(namespace) + try: + next(tables_iter) + raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty.") + except StopIteration: + pass with Session(self.engine) as session: session.execute( @@ -599,14 +604,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None: session.commit() @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. @@ -618,17 +623,17 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace) with Session(self.engine) as session: result = session.scalars(stmt) - return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] + return iter([(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result]) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. @@ -660,7 +665,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: } ) - return namespaces + return iter(namespaces) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: @@ -755,7 +760,7 @@ def create_view( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 3feed9fb21..a151e09d44 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=broad-except,redefined-builtin,redefined-outer-name +import itertools import logging -from collections.abc import Callable +from collections.abc import Callable, Iterator from functools import wraps from typing import ( Any, @@ -33,6 +34,7 @@ from pyiceberg.io import WAREHOUSE from pyiceberg.table import TableProperties from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.typedef import Identifier from pyiceberg.utils.properties import property_as_int @@ -130,14 +132,19 @@ def _catalog_and_output(ctx: Context) -> tuple[Catalog, Output]: def list(ctx: Context, parent: str | None) -> None: # pylint: disable=redefined-builtin """List tables or namespaces.""" catalog, output = _catalog_and_output(ctx) + identifiers: Iterator[Identifier] - identifiers = [] if parent: - # Do we have tables under parent namespace? - identifiers = catalog.list_tables(parent) - if not identifiers: - # List hierarchical namespaces if parent, root namespaces otherwise. - identifiers = catalog.list_namespaces(parent or ()) + # Do we have tables under parent namespace? Peek at first element. + tables = catalog.list_tables(parent) + try: + first = next(tables) + identifiers = itertools.chain([first], tables) + except StopIteration: + # No tables found; list hierarchical namespaces instead. + identifiers = catalog.list_namespaces(parent) + else: + identifiers = catalog.list_namespaces(()) output.identifiers(identifiers) diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index 332221008c..5584bd22a5 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -16,6 +16,7 @@ # under the License. import json from abc import ABC, abstractmethod +from collections.abc import Iterable from typing import ( Any, ) @@ -40,7 +41,7 @@ class Output(ABC): def exception(self, ex: Exception) -> None: ... @abstractmethod - def identifiers(self, identifiers: list[Identifier]) -> None: ... + def identifiers(self, identifiers: Iterable[Identifier]) -> None: ... @abstractmethod def describe_table(self, table: Table) -> None: ... @@ -88,7 +89,7 @@ def exception(self, ex: Exception) -> None: else: Console(stderr=True).print(ex) - def identifiers(self, identifiers: list[Identifier]) -> None: + def identifiers(self, identifiers: Iterable[Identifier]) -> None: table = self._table for identifier in identifiers: table.add_row(".".join(identifier)) @@ -199,7 +200,7 @@ def _out(self, d: Any) -> None: def exception(self, ex: Exception) -> None: self._out({"type": ex.__class__.__name__, "message": str(ex)}) - def identifiers(self, identifiers: list[Identifier]) -> None: + def identifiers(self, identifiers: Iterable[Identifier]) -> None: self._out([".".join(identifier) for identifier in identifiers]) def describe_table(self, table: Table) -> None: diff --git a/tests/catalog/integration_test_dynamodb.py b/tests/catalog/integration_test_dynamodb.py index 6ae14bca06..87988f9237 100644 --- a/tests/catalog/integration_test_dynamodb.py +++ b/tests/catalog/integration_test_dynamodb.py @@ -119,7 +119,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == LIST_TEST_NUMBER for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -207,10 +207,10 @@ def test_create_namespace_with_comment_and_location(test_catalog: Catalog, datab def test_list_namespaces(test_catalog: Catalog, database_list: list[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None: diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index c429770268..c73035b48e 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -227,7 +227,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == LIST_TEST_NUMBER for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -316,10 +316,10 @@ def test_create_namespace_with_comment_and_location(test_catalog: Catalog, datab def test_list_namespaces(test_catalog: Catalog, database_list: list[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None: diff --git a/tests/catalog/test_bigquery_metastore.py b/tests/catalog/test_bigquery_metastore.py index c8c7584262..bfca9ec081 100644 --- a/tests/catalog/test_bigquery_metastore.py +++ b/tests/catalog/test_bigquery_metastore.py @@ -151,7 +151,7 @@ def test_list_tables(mocker: MockFixture, gcp_dataset_name: str) -> None: catalog_name = "test_catalog" test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.bigquery.project-id": "my-project"}) - tables = test_catalog.list_tables(gcp_dataset_name) + tables = list(test_catalog.list_tables(gcp_dataset_name)) # Assert that all tables returned by client.list_tables are listed assert len(tables) == 2 @@ -173,7 +173,7 @@ def test_list_namespaces(mocker: MockFixture) -> None: catalog_name = "test_catalog" test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.bigquery.project-id": "my-project"}) - namespaces = test_catalog.list_namespaces() + namespaces = list(test_catalog.list_namespaces()) assert len(namespaces) == 2 assert ("dataset1",) in namespaces assert ("dataset2",) in namespaces diff --git a/tests/catalog/test_catalog_behaviors.py b/tests/catalog/test_catalog_behaviors.py index b859e2d541..bf92ef10e6 100644 --- a/tests/catalog/test_catalog_behaviors.py +++ b/tests/catalog/test_catalog_behaviors.py @@ -517,11 +517,11 @@ def test_list_tables( catalog.create_namespace(namespace_2) catalog.create_table(test_table_identifier, table_schema_nested) catalog.create_table(another_table_identifier, table_schema_nested) - identifier_list = catalog.list_tables(namespace_1) + identifier_list = list(catalog.list_tables(namespace_1)) assert len(identifier_list) == 1 assert test_table_identifier in identifier_list - identifier_list = catalog.list_tables(namespace_2) + identifier_list = list(catalog.list_tables(namespace_2)) assert len(identifier_list) == 1 assert another_table_identifier in identifier_list @@ -532,8 +532,8 @@ def test_list_tables_under_a_namespace(catalog: Catalog, table_schema_nested: Sc catalog.create_table(test_table_identifier, table_schema_nested) new_namespace = ("new", "namespace") catalog.create_namespace(new_namespace) - all_tables = catalog.list_tables(namespace=namespace) - new_namespace_tables = catalog.list_tables(new_namespace) + all_tables = list(catalog.list_tables(namespace=namespace)) + new_namespace_tables = list(catalog.list_tables(new_namespace)) assert all_tables assert test_table_identifier in all_tables assert new_namespace_tables == [] @@ -541,7 +541,7 @@ def test_list_tables_under_a_namespace(catalog: Catalog, table_schema_nested: Sc def test_list_tables_when_missing_namespace(catalog: Catalog, test_namespace: Identifier) -> None: with pytest.raises(NoSuchNamespaceError): - catalog.list_tables(test_namespace) + list(catalog.list_tables(test_namespace)) # Commit table tests @@ -1002,7 +1002,7 @@ def test_create_namespace_with_comment_and_location(catalog: Catalog, test_names "location": test_location, } catalog.create_namespace(namespace=test_namespace, properties=test_properties) - loaded_database_list = catalog.list_namespaces() + loaded_database_list = list(catalog.list_namespaces()) assert Catalog.identifier_to_tuple(test_namespace)[:1] in loaded_database_list properties = catalog.load_namespace_properties(test_namespace) assert properties["comment"] == "this is a test description" @@ -1088,17 +1088,17 @@ def test_list_namespaces(catalog: Catalog) -> None: if not catalog.namespace_exists(namespace): catalog.create_namespace(namespace) - ns_list = catalog.list_namespaces() + ns_list = list(catalog.list_namespaces()) for ns in [("db",), ("db%",), ("db2",)]: assert ns in ns_list - ns_list = catalog.list_namespaces("db") + ns_list = list(catalog.list_namespaces("db")) assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")] - ns_list = catalog.list_namespaces("db.ns1") + ns_list = list(catalog.list_namespaces("db.ns1")) assert sorted(ns_list) == [("db", "ns1", "ns2")] - ns_list = catalog.list_namespaces("db.ns1.ns2") + ns_list = list(catalog.list_namespaces("db.ns1.ns2")) assert len(ns_list) == 0 @@ -1108,14 +1108,14 @@ def test_list_namespaces_fuzzy_match(catalog: Catalog) -> None: if not catalog.namespace_exists(namespace): catalog.create_namespace(namespace) - assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")] + assert list(catalog.list_namespaces("db.ns1")) == [("db", "ns1", "ns2")] - assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")] + assert list(catalog.list_namespaces("db_.ns1")) == [("db_", "ns1", "ns2")] def test_list_non_existing_namespaces(catalog: Catalog) -> None: with pytest.raises(NoSuchNamespaceError): - catalog.list_namespaces("does_not_exist") + list(catalog.list_namespaces("does_not_exist")) # Update namespace properties tests diff --git a/tests/catalog/test_dynamodb.py b/tests/catalog/test_dynamodb.py index 5933e7d472..4bfac46b79 100644 --- a/tests/catalog/test_dynamodb.py +++ b/tests/catalog/test_dynamodb.py @@ -398,7 +398,7 @@ def test_list_tables( test_catalog.create_namespace(namespace=database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - loaded_table_list = test_catalog.list_tables(database_name) + loaded_table_list = list(test_catalog.list_tables(database_name)) for table_name in table_list: assert (database_name, table_name) in loaded_table_list @@ -408,7 +408,7 @@ def test_list_namespaces(_bucket_initialize: None, database_list: list[str]) -> test_catalog = DynamoDbCatalog("test_ddb_catalog") for database_name in database_list: test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in loaded_database_list @@ -417,7 +417,7 @@ def test_list_namespaces(_bucket_initialize: None, database_list: list[str]) -> def test_create_namespace_no_properties(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -433,7 +433,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, da } test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name, properties=test_properties) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -445,7 +445,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, da def test_create_duplicated_namespace(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list with pytest.raises(NamespaceAlreadyExistsError): @@ -456,11 +456,11 @@ def test_create_duplicated_namespace(_bucket_initialize: None, database_name: st def test_drop_namespace(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list test_catalog.drop_namespace(database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 0 @@ -472,7 +472,7 @@ def test_drop_non_empty_namespace( test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) test_catalog.create_table(identifier, table_schema_nested) - assert len(test_catalog.list_tables(database_name)) == 1 + assert len(list(test_catalog.list_tables(database_name))) == 1 with pytest.raises(NamespaceNotEmptyError): test_catalog.drop_namespace(database_name) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index c8da49a87e..b2e1796906 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -516,7 +516,7 @@ def test_list_tables( for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - loaded_table_list = test_catalog.list_tables(database_name) + loaded_table_list = list(test_catalog.list_tables(database_name)) assert (database_name, non_iceberg_table_name) not in loaded_table_list assert (database_name, non_table_type_table_name) not in loaded_table_list @@ -529,7 +529,7 @@ def test_list_namespaces(_bucket_initialize: None, moto_endpoint_url: str, datab test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) for database_name in database_list: test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in loaded_database_list @@ -538,7 +538,7 @@ def test_list_namespaces(_bucket_initialize: None, moto_endpoint_url: str, datab def test_create_namespace_no_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -554,7 +554,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, mo } test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name, properties=test_properties) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -566,7 +566,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, mo def test_create_duplicated_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list with pytest.raises(NamespaceAlreadyExistsError): @@ -577,11 +577,11 @@ def test_create_duplicated_namespace(_bucket_initialize: None, moto_endpoint_url def test_drop_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list test_catalog.drop_namespace(database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 0 @@ -593,7 +593,7 @@ def test_drop_non_empty_namespace( test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"}) test_catalog.create_namespace(namespace=database_name) test_catalog.create_table(identifier, table_schema_nested) - assert len(test_catalog.list_tables(database_name)) == 1 + assert len(list(test_catalog.list_tables(database_name))) == 1 with pytest.raises(NamespaceNotEmptyError): test_catalog.drop_namespace(database_name) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 09bb5ab920..1b2ee7f744 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -1043,7 +1043,7 @@ def test_list_tables(hive_table: HiveTable) -> None: catalog._client.__enter__().get_all_tables.return_value = ["table1", "table2", "table3", "table4"] catalog._client.__enter__().get_table_objects_by_name.return_value = [tbl1, tbl2, tbl3, tbl4] - got_tables = catalog.list_tables("database") + got_tables = list(catalog.list_tables("database")) assert got_tables == [("database", "table1"), ("database", "table2")] catalog._client.__enter__().get_all_tables.assert_called_with(db_name="database") catalog._client.__enter__().get_table_objects_by_name.assert_called_with( @@ -1057,7 +1057,7 @@ def test_list_namespaces() -> None: catalog._client = MagicMock() catalog._client.__enter__().get_all_databases.return_value = ["namespace1", "namespace2"] - assert catalog.list_namespaces() == [("namespace1",), ("namespace2",)] + assert list(catalog.list_namespaces()) == [("namespace1",), ("namespace2",)] catalog._client.__enter__().get_all_databases.assert_called() diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 1eb9f26a56..b4e352436c 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -478,7 +478,7 @@ def test_list_tables_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")] + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) == [("examples", "fooshare")] def test_list_tables_paginated_200(rest_mock: Mocker) -> None: @@ -520,7 +520,7 @@ def test_list_tables_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) assert result == [ ("examples", "table1"), ("examples", "table2"), @@ -557,7 +557,7 @@ def test_list_tables_paginated_200_none_next_page_token(rest_mock: Mocker) -> No request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) assert result == [ ("examples", "table1"), ("examples", "table2"), @@ -579,7 +579,7 @@ def test_list_tables_page_size(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_tables(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_tables(namespace)) assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/examples/tables?pageSize=100" assert result == [ @@ -597,9 +597,9 @@ def test_list_tables_200_sigv4(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_tables(namespace) == [ - ("examples", "fooshare") - ] + assert list( + RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_tables(namespace) + ) == [("examples", "fooshare")] assert rest_mock.called @@ -734,7 +734,7 @@ def test_list_tables_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) with pytest.raises(NoSuchNamespaceError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) assert "Namespace does not exist" in str(e.value) @@ -747,7 +747,7 @@ def test_list_views_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")] + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) == [("examples", "fooshare")] def test_list_views_paginated_200(rest_mock: Mocker) -> None: @@ -789,7 +789,7 @@ def test_list_views_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) assert result == [ ("examples", "view1"), ("examples", "view2"), @@ -826,7 +826,7 @@ def test_list_views_paginated_200_none_next_page_token(rest_mock: Mocker) -> Non request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) assert result == [ ("examples", "view1"), ("examples", "view2"), @@ -848,7 +848,7 @@ def test_list_views_page_size(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_views(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_views(namespace)) assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/examples/views?pageSize=100" assert result == [ @@ -872,7 +872,7 @@ def test_list_views_invalid_page_size(rest_mock: Mocker) -> None: ) with pytest.raises(ValueError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "0"}).list_views(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "0"}).list_views(namespace)) assert str(e.value) == "rest-page-size must be a positive integer" @@ -885,9 +885,9 @@ def test_list_views_200_sigv4(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) == [ - ("examples", "fooshare") - ] + assert list( + RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) + ) == [("examples", "fooshare")] assert rest_mock.called @@ -906,7 +906,7 @@ def test_list_views_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) with pytest.raises(NoSuchNamespaceError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) assert "Namespace does not exist" in str(e.value) @@ -953,7 +953,7 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None: status_code=200, request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [ + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) == [ ("default",), ("examples",), ("fokko",), @@ -968,7 +968,7 @@ def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: status_code=200, request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",)) == [ + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",))) == [ ("accounting", "tax"), ] @@ -1004,7 +1004,7 @@ def test_list_namespaces_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) assert result == [ ("ns1",), ("ns2",), @@ -1035,7 +1035,7 @@ def test_list_namespaces_with_parent_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",)) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",))) assert result == [ ("accounting", "tax"), ("accounting", "payroll"), @@ -1064,7 +1064,7 @@ def test_list_namespaces_paginated_200_none_next_page_token(rest_mock: Mocker) - request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) assert result == [ ("ns1",), ("ns2",), @@ -1082,7 +1082,7 @@ def test_list_namespaces_page_size(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_namespaces() + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_namespaces()) assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces?pageSize=100" assert result == [ @@ -1106,7 +1106,7 @@ def test_list_namespace_with_parent_404(rest_mock: Mocker) -> None: ) with pytest.raises(NoSuchNamespaceError): - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("some_namespace",)) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("some_namespace",))) @pytest.mark.filterwarnings( @@ -1161,7 +1161,7 @@ def test_list_namespaces_token_expired_success_on_retries(rest_mock: Mocker, sta # which results in the token being refreshed twice when the RestCatalog is initialized. assert tokens.call_count == 2 - assert catalog.list_namespaces() == [ + assert list(catalog.list_namespaces()) == [ ("default",), ("examples",), ("fokko",), @@ -1170,7 +1170,7 @@ def test_list_namespaces_token_expired_success_on_retries(rest_mock: Mocker, sta assert namespaces.call_count == 2 assert tokens.call_count == 3 - assert catalog.list_namespaces() == [ + assert list(catalog.list_namespaces()) == [ ("default",), ("examples",), ("fokko",), diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 4188ad83db..fa0867770c 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -189,7 +189,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == len(table_list) for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -463,10 +463,10 @@ def test_create_namespace_with_comment(test_catalog: Catalog, database_name: str def test_list_namespaces(test_catalog: Catalog, database_list: list[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 @pytest.mark.integration From 54d8f646b5a1bfd36d8bb38aa06e3b1bbec3bbf6 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Thu, 28 May 2026 07:53:04 +0530 Subject: [PATCH 2/8] REST catalog: implement lazy pagination generators for list_* methods Replace the collect-then-return approach with proper generator functions that yield results page by page. Extract per-page fetch logic into dedicated helper methods (_fetch_tables_page, _fetch_views_page, _fetch_namespaces_page) decorated with @retry so authentication retries work correctly per page. Co-authored-by: Yuya Ebihara --- pyiceberg/catalog/rest/__init__.py | 67 ++++++++++++++---------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index b83b136049..8b42e775b3 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -1038,6 +1038,14 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @retry(**_RETRY_ARGS) + def _fetch_tables_page(self, url: str, params: dict[str, str]) -> ListTablesResponse: + response = self._session.get(url, params=params) + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + return ListTablesResponse.model_validate_json(response.text) + @override def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: self._check_endpoint(Capability.V1_LIST_TABLES) @@ -1052,27 +1060,18 @@ def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: raise ValueError(f"{PAGE_SIZE} must be a positive integer") params["pageSize"] = str(page_size) - tables: list[Identifier] = [] page_token: str | None = None while True: if page_token: params["pageToken"] = page_token - response = self._session.get(url, params=params) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - - parsed = ListTablesResponse.model_validate_json(response.text) - tables.extend([(*table.namespace, table.name) for table in parsed.identifiers]) + parsed = self._fetch_tables_page(url, params) + yield from [(*table.namespace, table.name) for table in parsed.identifiers] if not parsed.next_page_token: break page_token = parsed.next_page_token - return iter(tables) - @retry(**_RETRY_ARGS) @override def load_table(self, identifier: str | Identifier) -> Table: @@ -1151,10 +1150,18 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm return table_request @retry(**_RETRY_ARGS) + def _fetch_views_page(self, url: str, params: dict[str, str]) -> ListViewsResponse: + response = self._session.get(url, params=params) + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + return ListViewsResponse.model_validate_json(response.text) + @override def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: if Capability.V1_LIST_VIEWS not in self._supported_endpoints: - return [] + return namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = self._encode_namespace_path(namespace_tuple) url = self.url(Endpoints.list_views, namespace=namespace_concat) @@ -1166,28 +1173,18 @@ def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise ValueError(f"{PAGE_SIZE} must be a positive integer") params["pageSize"] = str(page_size) - views: list[Identifier] = [] page_token: str | None = None while True: if page_token: params["pageToken"] = page_token - - response = self._session.get(url, params=params) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - - parsed = ListViewsResponse.model_validate_json(response.text) - views.extend([(*view.namespace, view.name) for view in parsed.identifiers]) + parsed = self._fetch_views_page(url, params) + yield from [(*view.namespace, view.name) for view in parsed.identifiers] if not parsed.next_page_token: break page_token = parsed.next_page_token - return iter(views) - @retry(**_RETRY_ARGS) @override def load_view(self, identifier: str | Identifier) -> View: @@ -1276,6 +1273,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None: _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) @retry(**_RETRY_ARGS) + def _fetch_namespaces_page(self, params: dict[str, str]) -> ListNamespaceResponse: + response = self._session.get(self.url(Endpoints.list_namespaces), params=params) + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + return ListNamespaceResponse.model_validate_json(response.text) + @override def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: self._check_endpoint(Capability.V1_LIST_NAMESPACES) @@ -1288,7 +1293,6 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifi raise ValueError(f"{PAGE_SIZE} must be a positive integer") params["pageSize"] = str(page_size) - namespaces: list[Identifier] = [] page_token: str | None = None while True: @@ -1296,22 +1300,13 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifi params["parent"] = self._encode_namespace_path(namespace_tuple) if page_token: params["pageToken"] = page_token - response = self._session.get(self.url(Endpoints.list_namespaces), params=params) - - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - - parsed = ListNamespaceResponse.model_validate_json(response.text) - namespaces.extend(parsed.namespaces) + parsed = self._fetch_namespaces_page(params) + yield from parsed.namespaces if not parsed.next_page_token: break page_token = parsed.next_page_token - return iter(namespaces) - @retry(**_RETRY_ARGS) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: From eee266783d40a369df34be793d437ffc0fea2d13 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 12:21:30 +0530 Subject: [PATCH 3/8] fix: include NULL in delete predicate for evolved partition fields MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dynamic_partition_overwrite builds a delete predicate from the current partition spec. After spec evolution, data files written under older specs may carry NULL for fields that were added later. The predicate category=A AND region=us never matches those files (region=NULL ≠ us), so they are silently left behind after the overwrite. Fix: detect which fields in the current spec are absent from at least one historical spec (i.e., were added via evolution), and extend the predicate for those fields to also accept NULL: category=A AND (region=us OR region IS NULL) This ensures pre-evolution data files are included in the delete pass while files from other partitions (region=eu, etc.) are correctly kept. Co-Authored-By: Claude Sonnet 4.6 --- pyiceberg/table/__init__.py | 65 +++++++++++++++++++++++++++---------- tests/table/test_init.py | 62 +++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 17 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 64ad10050d..d7e610e7fa 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -355,7 +355,11 @@ def _set_ref_snapshot( return updates, requirements def _build_partition_predicate( - self, partition_records: set[Record], spec: PartitionSpec, schema: Schema + self, + partition_records: set[Record], + spec: PartitionSpec, + schema: Schema, + evolved_source_ids: set[int] | None = None, ) -> BooleanExpression: """Build a filter predicate matching any of the input partition records. @@ -363,21 +367,34 @@ def _build_partition_predicate( partition_records: A set of partition records to match spec: An optional partition spec, if none then defaults to current schema: An optional schema, if none then defaults to current + evolved_source_ids: Source IDs of partition fields that were added via spec + evolution and therefore may be absent (NULL) in data written under older + specs. When a field is in this set and the partition value is non-NULL, + the predicate also accepts NULL so that pre-evolution files are included. Returns: A predicate matching any of the input partition records. """ - partition_fields = [schema.find_field(field.source_id).name for field in spec.fields] - if not partition_records or not partition_fields: + spec_fields = spec.fields + partition_field_names = [schema.find_field(field.source_id).name for field in spec_fields] + if not partition_records or not partition_field_names: return AlwaysFalse() + nullable_source_ids: set[int] = evolved_source_ids or set() + per_record_exprs: list[BooleanExpression] = [] for partition_record in partition_records: - predicates: list[BooleanExpression] = [ - EqualTo(Reference(partition_field), partition_record[pos]) - if partition_record[pos] is not None - else IsNull(Reference(partition_field)) - for pos, partition_field in enumerate(partition_fields) - ] + predicates: list[BooleanExpression] = [] + for pos, (field_name, spec_field) in enumerate(zip(partition_field_names, spec_fields, strict=True)): + value = partition_record[pos] + if value is not None: + field_pred: BooleanExpression = EqualTo(Reference(field_name), value) + if spec_field.source_id in nullable_source_ids: + # Also match NULL: pre-evolution files may have no value for this + # field because it was not in the schema when the data was written. + field_pred = Or(field_pred, IsNull(Reference(field_name))) + else: + field_pred = IsNull(Reference(field_name)) + predicates.append(field_pred) per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0]) return Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] @@ -586,8 +603,24 @@ def dynamic_partition_overwrite( ) partitions_to_overwrite = {data_file.partition for data_file in data_files} + + # Determine which partition fields were introduced via spec evolution. Data + # written under older specs may have NULL for those fields even when the new + # partition value is non-NULL, so the delete predicate must also match NULL. + current_spec = self.table_metadata.spec() + all_specs = self.table_metadata.specs() + # A field is "evolved" if it is absent from at least one historical spec. + evolved_source_ids = { + f.source_id + for f in current_spec.fields + if not all(any(hf.source_id == f.source_id for hf in hs.fields) for hs in all_specs.values()) + } + delete_filter = self._build_partition_predicate( - partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema() + partition_records=partitions_to_overwrite, + spec=current_spec, + schema=self.table_metadata.schema(), + evolved_source_ids=evolved_source_ids, ) self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) @@ -2072,13 +2105,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + return lambda datafile: residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), ) @staticmethod diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 0c4ea258f3..470cbc9762 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -31,6 +31,8 @@ And, EqualTo, In, + IsNull, + Or, ) from pyiceberg.expressions.visitors import bind from pyiceberg.io import PY_IO_IMPL, load_file_io @@ -1766,3 +1768,63 @@ def test_build_large_partition_predicate(table_v2: Table) -> None: ) bind(table_v2.metadata.schema(), expr, case_sensitive=True) + + +def test_build_partition_predicate_with_evolved_source_ids(table_v2: Table) -> None: + """Regression test for https://github.com/apache/iceberg-python/issues/3148. + + When a partition field is added via spec evolution, data written under older specs + may have NULL for that field. _build_partition_predicate must include + ``OR field IS NULL`` for evolved fields so that pre-evolution files are matched by + the delete predicate and are not silently left behind. + """ + from pyiceberg.transforms import IdentityTransform + + schema = Schema( + NestedField(1, "category", StringType(), required=False), + NestedField(2, "region", StringType(), required=False), + ) + spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category"), + PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="region"), + ) + + # region (source_id=2) was added via spec evolution → it is an evolved field + with table_v2.transaction() as tx: + expr = tx._build_partition_predicate( + partition_records={Record("A", "us")}, + spec=spec, + schema=schema, + evolved_source_ids={2}, + ) + + # category=A AND (region=us OR region IS NULL) + expected = And( + EqualTo("category", "A"), + Or(EqualTo("region", "us"), IsNull("region")), + ) + assert repr(expr) == repr(expected) + + +def test_build_partition_predicate_without_evolved_source_ids(table_v2: Table) -> None: + """Without evolved_source_ids, the predicate matches exact values only.""" + from pyiceberg.transforms import IdentityTransform + + schema = Schema( + NestedField(1, "category", StringType(), required=False), + NestedField(2, "region", StringType(), required=False), + ) + spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category"), + PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="region"), + ) + + with table_v2.transaction() as tx: + expr = tx._build_partition_predicate( + partition_records={Record("A", "us")}, + spec=spec, + schema=schema, + ) + + expected = And(EqualTo("category", "A"), EqualTo("region", "us")) + assert repr(expr) == repr(expected) From 47b78903ccf62fe086c798cd659d1c83bab38796 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 13:14:11 +0530 Subject: [PATCH 4/8] fix: use per-spec predicates in dynamic_partition_overwrite for spec evolution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After partition spec evolution, data files written under older specs carry NULL for fields that didn't exist as partition fields at write time. A single delete predicate (e.g. category=A AND region=us) never matches those files because the strict-metrics evaluator sees region=NULL ≠ us. The previous attempt (OR IS NULL in the predicate) was too broad: it also deleted spec-1 null-partition files that should be preserved. Correct fix: build a per-spec delete predicate map where each historical spec gets a predicate that: - Uses exact-match for fields the spec already had (region=us for spec-1) - Also accepts NULL for fields the spec was missing (region=us OR NULL for spec-0, since spec-0 had no region partition field) These are stored on _SnapshotProducer._per_spec_predicates. _compute_deletes looks up the right predicate for each manifest's spec_id instead of using the single global predicate for metrics evaluation. The global predicate (OR of all per-spec predicates) is still used for the manifest evaluator so no manifests are skipped. The rewrite logic from Transaction.delete() is inlined into dynamic_partition_overwrite so that partial-match files are still rewritten correctly. Co-Authored-By: Claude Sonnet 4.6 --- pyiceberg/table/__init__.py | 160 +++++++++++++++++++++-------- pyiceberg/table/update/snapshot.py | 17 ++- tests/table/test_init.py | 107 ++++++++++--------- 3 files changed, 190 insertions(+), 94 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d7e610e7fa..a88022bb4d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -355,11 +355,7 @@ def _set_ref_snapshot( return updates, requirements def _build_partition_predicate( - self, - partition_records: set[Record], - spec: PartitionSpec, - schema: Schema, - evolved_source_ids: set[int] | None = None, + self, partition_records: set[Record], spec: PartitionSpec, schema: Schema ) -> BooleanExpression: """Build a filter predicate matching any of the input partition records. @@ -367,34 +363,21 @@ def _build_partition_predicate( partition_records: A set of partition records to match spec: An optional partition spec, if none then defaults to current schema: An optional schema, if none then defaults to current - evolved_source_ids: Source IDs of partition fields that were added via spec - evolution and therefore may be absent (NULL) in data written under older - specs. When a field is in this set and the partition value is non-NULL, - the predicate also accepts NULL so that pre-evolution files are included. Returns: A predicate matching any of the input partition records. """ - spec_fields = spec.fields - partition_field_names = [schema.find_field(field.source_id).name for field in spec_fields] - if not partition_records or not partition_field_names: + partition_fields = [schema.find_field(field.source_id).name for field in spec.fields] + if not partition_records or not partition_fields: return AlwaysFalse() - nullable_source_ids: set[int] = evolved_source_ids or set() - per_record_exprs: list[BooleanExpression] = [] for partition_record in partition_records: - predicates: list[BooleanExpression] = [] - for pos, (field_name, spec_field) in enumerate(zip(partition_field_names, spec_fields, strict=True)): - value = partition_record[pos] - if value is not None: - field_pred: BooleanExpression = EqualTo(Reference(field_name), value) - if spec_field.source_id in nullable_source_ids: - # Also match NULL: pre-evolution files may have no value for this - # field because it was not in the schema when the data was written. - field_pred = Or(field_pred, IsNull(Reference(field_name))) - else: - field_pred = IsNull(Reference(field_name)) - predicates.append(field_pred) + predicates: list[BooleanExpression] = [ + EqualTo(Reference(partition_field), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_field)) + for pos, partition_field in enumerate(partition_fields) + ] per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0]) return Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] @@ -604,25 +587,116 @@ def dynamic_partition_overwrite( partitions_to_overwrite = {data_file.partition for data_file in data_files} - # Determine which partition fields were introduced via spec evolution. Data - # written under older specs may have NULL for those fields even when the new - # partition value is non-NULL, so the delete predicate must also match NULL. + # Build per-spec delete predicates to handle partition spec evolution correctly. + # + # When a partition field was added via spec evolution, data files written under + # older specs carry NULL for that field (because it was absent from the schema at + # write time). A single "category=A AND region=us" predicate would never match + # those files because the strict-metrics evaluator sees region=NULL ≠ "us". + # + # To fix this, we compute a per-spec predicate for every historical spec: + # - For specs that include all current partition fields → use exact-match predicate. + # - For specs that are missing some current partition fields → also accept NULL + # for the missing fields (NULL means the value was absent at write time and can + # legitimately belong to any partition of that field). + # + # These per-spec predicates are stored on the delete snapshot producer so that + # _compute_deletes uses the right predicate when evaluating each manifest file. current_spec = self.table_metadata.spec() all_specs = self.table_metadata.specs() - # A field is "evolved" if it is absent from at least one historical spec. - evolved_source_ids = { - f.source_id - for f in current_spec.fields - if not all(any(hf.source_id == f.source_id for hf in hs.fields) for hs in all_specs.values()) - } - - delete_filter = self._build_partition_predicate( - partition_records=partitions_to_overwrite, - spec=current_spec, - schema=self.table_metadata.schema(), - evolved_source_ids=evolved_source_ids, - ) - self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + schema = self.table_metadata.schema() + + # source_ids in the current spec + current_source_ids = {f.source_id for f in current_spec.fields} + # map source_id → position in the current spec's partition record + source_id_to_pos = {f.source_id: pos for pos, f in enumerate(current_spec.fields)} + # map source_id → data-column name + source_id_to_col = {f.source_id: schema.find_field(f.source_id).name for f in current_spec.fields} + + per_spec_predicates: dict[int, BooleanExpression] = {} + for spec_id, hist_spec in all_specs.items(): + hist_source_ids = {f.source_id for f in hist_spec.fields} + # Fields present in the current spec but absent from this historical spec + missing_source_ids = current_source_ids - hist_source_ids + + per_record_exprs: list[BooleanExpression] = [] + for partition_record in partitions_to_overwrite: + predicates: list[BooleanExpression] = [] + for source_id, col_name in source_id_to_col.items(): + value = partition_record[source_id_to_pos[source_id]] + if value is not None: + field_pred: BooleanExpression = EqualTo(Reference(col_name), value) + if source_id in missing_source_ids: + # Pre-evolution files have NULL for this field; also match NULL + # so those files are included in the delete. + field_pred = Or(field_pred, IsNull(Reference(col_name))) + else: + field_pred = IsNull(Reference(col_name)) + predicates.append(field_pred) + per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0]) + + per_spec_predicates[spec_id] = Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] + + # The global predicate (used by manifest evaluators) is the union of all + # per-spec predicates; this ensures no manifests are skipped. + global_delete_filter = Or(*per_spec_predicates.values()) if per_spec_predicates else AlwaysFalse() + + # Open the delete snapshot and set per-spec predicates before committing. + # This mirrors Transaction.delete() but injects per_spec_predicates so that + # _compute_deletes uses the right predicate for each historical spec. + from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: + delete_snapshot._per_spec_predicates = per_spec_predicates + delete_snapshot.delete_by_predicate(global_delete_filter) + + # Handle partial-match files that need to be rewritten (copy-on-write). + if delete_snapshot.rewrites_needed is True: + bound_delete_filter = bind(self.table_metadata.schema(), global_delete_filter, case_sensitive=True) + preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) + + file_scan = self._scan(row_filter=global_delete_filter) + if branch is not None: + file_scan = file_scan.use_ref(branch) + + rewrite_uuid = uuid.uuid4() + rewrite_counter = itertools.count(0) + replaced_files: list[tuple[DataFile, list[DataFile]]] = [] + for original_file in file_scan.plan_files(): + df_orig = ArrowScan( + table_metadata=self.table_metadata, + io=self._table.io, + projected_schema=self.table_metadata.schema(), + row_filter=AlwaysTrue(), + ).to_table(tasks=[original_file]) + filtered_df = df_orig.filter(preserve_row_filter) + if len(filtered_df) == 0: + replaced_files.append((original_file.file, [])) + elif len(df_orig) != len(filtered_df): + replaced_files.append( + ( + original_file.file, + list( + _dataframe_to_data_files( + io=self._table.io, + df=filtered_df, + table_metadata=self.table_metadata, + write_uuid=rewrite_uuid, + counter=rewrite_counter, + ) + ), + ) + ) + + if replaced_files: + with self.update_snapshot( + snapshot_properties=snapshot_properties, branch=branch + ).overwrite() as overwrite_snapshot: + overwrite_snapshot.commit_uuid = rewrite_uuid + for original_data_file, replacement_data_files in replaced_files: + overwrite_snapshot.delete_data_file(original_data_file) + for replacement_data_file in replacement_data_files: + overwrite_snapshot.append_data_file(replacement_data_file) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: append_files.commit_uuid = append_snapshot_commit_uuid diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 7931edacdd..eacc32d439 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -104,6 +104,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _target_branch: str | None _predicate: BooleanExpression _case_sensitive: bool + _per_spec_predicates: dict[int, BooleanExpression] def __init__( self, @@ -134,6 +135,7 @@ def __init__( ) self._predicate = AlwaysFalse() self._case_sensitive = True + self._per_spec_predicates = {} def _validate_target_branch(self, branch: str | None) -> str | None: # if branch is none, write will be written into a staging snapshot @@ -431,10 +433,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> schema = table_metadata.schema() manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval - inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - schema, self._predicate, case_sensitive=self._case_sensitive - ).eval + + def _strict_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return _StrictMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval + + def _inclusive_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return _InclusiveMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval existing_manifests = [] total_deleted_entries = [] @@ -454,6 +460,9 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> existing_manifests.append(manifest_file) else: # It is relevant, let's check out the content + spec_id = manifest_file.partition_spec_id + strict_metrics_evaluator = _strict_metrics_for_spec(spec_id) + inclusive_metrics_evaluator = _inclusive_metrics_for_spec(spec_id) deleted_entries = [] existing_entries = [] for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 470cbc9762..5193bb2710 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -31,8 +31,6 @@ And, EqualTo, In, - IsNull, - Or, ) from pyiceberg.expressions.visitors import bind from pyiceberg.io import PY_IO_IMPL, load_file_io @@ -1770,61 +1768,76 @@ def test_build_large_partition_predicate(table_v2: Table) -> None: bind(table_v2.metadata.schema(), expr, case_sensitive=True) -def test_build_partition_predicate_with_evolved_source_ids(table_v2: Table) -> None: +def test_dynamic_partition_overwrite_spec_evolution(tmp_path: Any) -> None: """Regression test for https://github.com/apache/iceberg-python/issues/3148. - When a partition field is added via spec evolution, data written under older specs - may have NULL for that field. _build_partition_predicate must include - ``OR field IS NULL`` for evolved fields so that pre-evolution files are matched by - the delete predicate and are not silently left behind. + After partition spec evolution, dynamic_partition_overwrite must delete data files + written under the old spec (where the new partition field was absent / NULL) when + overwriting the matching logical partition. """ + import tempfile + + import pyarrow as pa + + from pyiceberg.catalog import load_catalog from pyiceberg.transforms import IdentityTransform + from pyiceberg.types import LongType - schema = Schema( - NestedField(1, "category", StringType(), required=False), - NestedField(2, "region", StringType(), required=False), - ) - spec = PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category"), - PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="region"), - ) + with tempfile.TemporaryDirectory() as warehouse: + catalog = load_catalog("test", type="sql", uri=f"sqlite:///{warehouse}/catalog.db", warehouse=f"file://{warehouse}") + catalog.create_namespace("default") - # region (source_id=2) was added via spec evolution → it is an evolved field - with table_v2.transaction() as tx: - expr = tx._build_partition_predicate( - partition_records={Record("A", "us")}, - spec=spec, - schema=schema, - evolved_source_ids={2}, + schema = Schema( + NestedField(1, "category", StringType(), required=False), + NestedField(2, "region", StringType(), required=False), + NestedField(3, "value", LongType(), required=False), ) + spec_v0 = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category")) + table = catalog.create_table("default.test_spec_evo", schema=schema, partition_spec=spec_v0) - # category=A AND (region=us OR region IS NULL) - expected = And( - EqualTo("category", "A"), - Or(EqualTo("region", "us"), IsNull("region")), - ) - assert repr(expr) == repr(expected) + # Write under spec-0 (region is NULL — field exists in schema but not in partition spec) + table.append( + pa.table( + { + "category": pa.array(["A", "A", "B"], type=pa.string()), + "region": pa.array([None, None, None], type=pa.string()), + "value": pa.array([1, 2, 10], type=pa.int64()), + } + ) + ) + # Evolve to spec-1: add region as a partition field + with table.update_spec() as u: + u.add_field("region", IdentityTransform(), "region") + table = catalog.load_table("default.test_spec_evo") -def test_build_partition_predicate_without_evolved_source_ids(table_v2: Table) -> None: - """Without evolved_source_ids, the predicate matches exact values only.""" - from pyiceberg.transforms import IdentityTransform - - schema = Schema( - NestedField(1, "category", StringType(), required=False), - NestedField(2, "region", StringType(), required=False), - ) - spec = PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category"), - PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="region"), - ) + # Write under spec-1 + table.append( + pa.table( + { + "category": pa.array(["A", "B"], type=pa.string()), + "region": pa.array(["us", "us"], type=pa.string()), + "value": pa.array([100, 200], type=pa.int64()), + } + ) + ) - with table_v2.transaction() as tx: - expr = tx._build_partition_predicate( - partition_records={Record("A", "us")}, - spec=spec, - schema=schema, + # Overwrite partition {A, us} — must also delete stale spec-0 {A} files + table.dynamic_partition_overwrite( + pa.table( + { + "category": pa.array(["A"], type=pa.string()), + "region": pa.array(["us"], type=pa.string()), + "value": pa.array([999], type=pa.int64()), + } + ) ) - expected = And(EqualTo("category", "A"), EqualTo("region", "us")) - assert repr(expr) == repr(expected) + result = table.scan().to_arrow().to_pydict() + a_values = sorted([v for c, v in zip(result["category"], result["value"], strict=True) if c == "A"]) + b_values = sorted([v for c, v in zip(result["category"], result["value"], strict=True) if c == "B"]) + + # Spec-0 rows 1,2 (category=A, region=NULL) should be gone; only 999 remains + assert a_values == [999], f"Expected [999] but got {a_values}" + # B rows from both specs should be untouched + assert b_values == [10, 200], f"Expected [10, 200] but got {b_values}" From 9dc1e84cd89581e4e5f8101e31997516e3b9db2c Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 13:26:52 +0530 Subject: [PATCH 5/8] fix: handle single-spec case in global_delete_filter construction Or() requires exactly two arguments; when a table has only one spec, Or(*[single_pred]) raises TypeError. Guard with len check. Co-Authored-By: Claude Sonnet 4.6 --- pyiceberg/table/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a88022bb4d..39120e9ef3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -639,7 +639,8 @@ def dynamic_partition_overwrite( # The global predicate (used by manifest evaluators) is the union of all # per-spec predicates; this ensures no manifests are skipped. - global_delete_filter = Or(*per_spec_predicates.values()) if per_spec_predicates else AlwaysFalse() + preds = list(per_spec_predicates.values()) + global_delete_filter = Or(*preds) if len(preds) > 1 else preds[0] if preds else AlwaysFalse() # Open the delete snapshot and set per-spec predicates before committing. # This mirrors Transaction.delete() but injects per_spec_predicates so that From 8a05f4413988d0b7102bce27b66d55f71c6468ab Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 14:14:40 +0530 Subject: [PATCH 6/8] fix dynamic overwrite regression for non-evolution specs --- pyiceberg/table/__init__.py | 216 ++++++++++++++++++------------------ 1 file changed, 111 insertions(+), 105 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 39120e9ef3..c7bb6df50c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -586,118 +586,124 @@ def dynamic_partition_overwrite( ) partitions_to_overwrite = {data_file.partition for data_file in data_files} - - # Build per-spec delete predicates to handle partition spec evolution correctly. - # - # When a partition field was added via spec evolution, data files written under - # older specs carry NULL for that field (because it was absent from the schema at - # write time). A single "category=A AND region=us" predicate would never match - # those files because the strict-metrics evaluator sees region=NULL ≠ "us". - # - # To fix this, we compute a per-spec predicate for every historical spec: - # - For specs that include all current partition fields → use exact-match predicate. - # - For specs that are missing some current partition fields → also accept NULL - # for the missing fields (NULL means the value was absent at write time and can - # legitimately belong to any partition of that field). - # - # These per-spec predicates are stored on the delete snapshot producer so that - # _compute_deletes uses the right predicate when evaluating each manifest file. current_spec = self.table_metadata.spec() all_specs = self.table_metadata.specs() schema = self.table_metadata.schema() - # source_ids in the current spec - current_source_ids = {f.source_id for f in current_spec.fields} - # map source_id → position in the current spec's partition record - source_id_to_pos = {f.source_id: pos for pos, f in enumerate(current_spec.fields)} - # map source_id → data-column name - source_id_to_col = {f.source_id: schema.find_field(f.source_id).name for f in current_spec.fields} - - per_spec_predicates: dict[int, BooleanExpression] = {} - for spec_id, hist_spec in all_specs.items(): - hist_source_ids = {f.source_id for f in hist_spec.fields} - # Fields present in the current spec but absent from this historical spec - missing_source_ids = current_source_ids - hist_source_ids - - per_record_exprs: list[BooleanExpression] = [] - for partition_record in partitions_to_overwrite: - predicates: list[BooleanExpression] = [] - for source_id, col_name in source_id_to_col.items(): - value = partition_record[source_id_to_pos[source_id]] - if value is not None: - field_pred: BooleanExpression = EqualTo(Reference(col_name), value) - if source_id in missing_source_ids: - # Pre-evolution files have NULL for this field; also match NULL - # so those files are included in the delete. - field_pred = Or(field_pred, IsNull(Reference(col_name))) - else: - field_pred = IsNull(Reference(col_name)) - predicates.append(field_pred) - per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0]) - - per_spec_predicates[spec_id] = Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] - - # The global predicate (used by manifest evaluators) is the union of all - # per-spec predicates; this ensures no manifests are skipped. - preds = list(per_spec_predicates.values()) - global_delete_filter = Or(*preds) if len(preds) > 1 else preds[0] if preds else AlwaysFalse() - - # Open the delete snapshot and set per-spec predicates before committing. - # This mirrors Transaction.delete() but injects per_spec_predicates so that - # _compute_deletes uses the right predicate for each historical spec. - from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow - - with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: - delete_snapshot._per_spec_predicates = per_spec_predicates - delete_snapshot.delete_by_predicate(global_delete_filter) - - # Handle partial-match files that need to be rewritten (copy-on-write). - if delete_snapshot.rewrites_needed is True: - bound_delete_filter = bind(self.table_metadata.schema(), global_delete_filter, case_sensitive=True) - preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) - - file_scan = self._scan(row_filter=global_delete_filter) - if branch is not None: - file_scan = file_scan.use_ref(branch) + # Keep the existing dynamic overwrite behavior for non-evolution tables. + # We only need per-spec predicate handling when some historical specs are + # missing current partition source IDs. + current_source_ids = {field.source_id for field in current_spec.fields} + has_missing_partition_fields_in_history = any( + current_source_ids - {field.source_id for field in historical_spec.fields} for historical_spec in all_specs.values() + ) - rewrite_uuid = uuid.uuid4() - rewrite_counter = itertools.count(0) - replaced_files: list[tuple[DataFile, list[DataFile]]] = [] - for original_file in file_scan.plan_files(): - df_orig = ArrowScan( - table_metadata=self.table_metadata, - io=self._table.io, - projected_schema=self.table_metadata.schema(), - row_filter=AlwaysTrue(), - ).to_table(tasks=[original_file]) - filtered_df = df_orig.filter(preserve_row_filter) - if len(filtered_df) == 0: - replaced_files.append((original_file.file, [])) - elif len(df_orig) != len(filtered_df): - replaced_files.append( - ( - original_file.file, - list( - _dataframe_to_data_files( - io=self._table.io, - df=filtered_df, - table_metadata=self.table_metadata, - write_uuid=rewrite_uuid, - counter=rewrite_counter, - ) - ), + if not has_missing_partition_fields_in_history: + delete_filter = self._build_partition_predicate( + partition_records=partitions_to_overwrite, + spec=current_spec, + schema=schema, + ) + self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + else: + # Build per-spec delete predicates to handle partition spec evolution correctly. + # + # When a partition field was added via spec evolution, data files written under + # older specs carry NULL for that field (because it was absent from the schema at + # write time). A single "category=A AND region=us" predicate would never match + # those files because the strict-metrics evaluator sees region=NULL != "us". + # + # To fix this, we compute a per-spec predicate for every historical spec: + # - For specs that include all current partition fields -> use exact-match predicate. + # - For specs that are missing some current partition fields -> also accept NULL + # for the missing fields. + # + # These per-spec predicates are stored on the delete snapshot producer so that + # _compute_deletes uses the right predicate when evaluating each manifest file. + source_id_to_pos = {field.source_id: pos for pos, field in enumerate(current_spec.fields)} + source_id_to_col = {field.source_id: schema.find_field(field.source_id).name for field in current_spec.fields} + + per_spec_predicates: dict[int, BooleanExpression] = {} + for spec_id, hist_spec in all_specs.items(): + hist_source_ids = {field.source_id for field in hist_spec.fields} + missing_source_ids = current_source_ids - hist_source_ids + + per_record_exprs: list[BooleanExpression] = [] + for partition_record in partitions_to_overwrite: + predicates: list[BooleanExpression] = [] + for source_id, col_name in source_id_to_col.items(): + value = partition_record[source_id_to_pos[source_id]] + if value is not None: + field_pred: BooleanExpression = EqualTo(Reference(col_name), value) + if source_id in missing_source_ids: + field_pred = Or(field_pred, IsNull(Reference(col_name))) + else: + field_pred = IsNull(Reference(col_name)) + predicates.append(field_pred) + + per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0]) + + per_spec_predicates[spec_id] = Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] + + preds = list(per_spec_predicates.values()) + global_delete_filter = Or(*preds) if len(preds) > 1 else preds[0] if preds else AlwaysFalse() + + # Open the delete snapshot and set per-spec predicates before committing. + # This mirrors Transaction.delete() but injects per_spec_predicates so that + # _compute_deletes uses the right predicate for each historical spec. + from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: + delete_snapshot._per_spec_predicates = per_spec_predicates + delete_snapshot.delete_by_predicate(global_delete_filter) + + # Handle partial-match files that need to be rewritten (copy-on-write). + if delete_snapshot.rewrites_needed is True: + bound_delete_filter = bind(self.table_metadata.schema(), global_delete_filter, case_sensitive=True) + preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) + + file_scan = self._scan(row_filter=global_delete_filter) + if branch is not None: + file_scan = file_scan.use_ref(branch) + + rewrite_uuid = uuid.uuid4() + rewrite_counter = itertools.count(0) + replaced_files: list[tuple[DataFile, list[DataFile]]] = [] + for original_file in file_scan.plan_files(): + df_orig = ArrowScan( + table_metadata=self.table_metadata, + io=self._table.io, + projected_schema=self.table_metadata.schema(), + row_filter=AlwaysTrue(), + ).to_table(tasks=[original_file]) + filtered_df = df_orig.filter(preserve_row_filter) + if len(filtered_df) == 0: + replaced_files.append((original_file.file, [])) + elif len(df_orig) != len(filtered_df): + replaced_files.append( + ( + original_file.file, + list( + _dataframe_to_data_files( + io=self._table.io, + df=filtered_df, + table_metadata=self.table_metadata, + write_uuid=rewrite_uuid, + counter=rewrite_counter, + ) + ), + ) ) - ) - if replaced_files: - with self.update_snapshot( - snapshot_properties=snapshot_properties, branch=branch - ).overwrite() as overwrite_snapshot: - overwrite_snapshot.commit_uuid = rewrite_uuid - for original_data_file, replacement_data_files in replaced_files: - overwrite_snapshot.delete_data_file(original_data_file) - for replacement_data_file in replacement_data_files: - overwrite_snapshot.append_data_file(replacement_data_file) + if replaced_files: + with self.update_snapshot( + snapshot_properties=snapshot_properties, branch=branch + ).overwrite() as overwrite_snapshot: + overwrite_snapshot.commit_uuid = rewrite_uuid + for original_data_file, replacement_data_files in replaced_files: + overwrite_snapshot.delete_data_file(original_data_file) + for replacement_data_file in replacement_data_files: + overwrite_snapshot.append_data_file(replacement_data_file) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: append_files.commit_uuid = append_snapshot_commit_uuid From 40661d07c1b621c4421b458c0da95de9779ab54f Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 19:09:49 +0530 Subject: [PATCH 7/8] fix dynamic overwrite null-row regression after spec evolution --- pyiceberg/table/__init__.py | 14 ++++++++------ pyiceberg/table/update/snapshot.py | 17 ++++++----------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c7bb6df50c..536933d759 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -622,6 +622,11 @@ def dynamic_partition_overwrite( # _compute_deletes uses the right predicate when evaluating each manifest file. source_id_to_pos = {field.source_id: pos for pos, field in enumerate(current_spec.fields)} source_id_to_col = {field.source_id: schema.find_field(field.source_id).name for field in current_spec.fields} + exact_delete_filter = self._build_partition_predicate( + partition_records=partitions_to_overwrite, + spec=current_spec, + schema=schema, + ) per_spec_predicates: dict[int, BooleanExpression] = {} for spec_id, hist_spec in all_specs.items(): @@ -645,9 +650,6 @@ def dynamic_partition_overwrite( per_spec_predicates[spec_id] = Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] - preds = list(per_spec_predicates.values()) - global_delete_filter = Or(*preds) if len(preds) > 1 else preds[0] if preds else AlwaysFalse() - # Open the delete snapshot and set per-spec predicates before committing. # This mirrors Transaction.delete() but injects per_spec_predicates so that # _compute_deletes uses the right predicate for each historical spec. @@ -655,14 +657,14 @@ def dynamic_partition_overwrite( with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: delete_snapshot._per_spec_predicates = per_spec_predicates - delete_snapshot.delete_by_predicate(global_delete_filter) + delete_snapshot.delete_by_predicate(exact_delete_filter) # Handle partial-match files that need to be rewritten (copy-on-write). if delete_snapshot.rewrites_needed is True: - bound_delete_filter = bind(self.table_metadata.schema(), global_delete_filter, case_sensitive=True) + bound_delete_filter = bind(self.table_metadata.schema(), exact_delete_filter, case_sensitive=True) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) - file_scan = self._scan(row_filter=global_delete_filter) + file_scan = self._scan(row_filter=exact_delete_filter) if branch is not None: file_scan = file_scan.use_ref(branch) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index eacc32d439..b7fa154fec 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -362,7 +362,8 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T def _build_partition_projection(self, spec_id: int) -> BooleanExpression: project = inclusive_projection(self.schema(), self.spec(spec_id), self._case_sensitive) - return project(self._predicate) + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return project(predicate) @cached_property def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: @@ -434,13 +435,10 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - def _strict_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: - predicate = self._per_spec_predicates.get(spec_id, self._predicate) - return _StrictMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval - - def _inclusive_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: - predicate = self._per_spec_predicates.get(spec_id, self._predicate) - return _InclusiveMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval + inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( + schema, self._predicate, case_sensitive=self._case_sensitive + ).eval existing_manifests = [] total_deleted_entries = [] @@ -460,9 +458,6 @@ def _inclusive_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: existing_manifests.append(manifest_file) else: # It is relevant, let's check out the content - spec_id = manifest_file.partition_spec_id - strict_metrics_evaluator = _strict_metrics_for_spec(spec_id) - inclusive_metrics_evaluator = _inclusive_metrics_for_spec(spec_id) deleted_entries = [] existing_entries = [] for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): From 0567a6ef2724b97564a7f99ebc073b99f5df546c Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Fri, 5 Jun 2026 19:34:51 +0530 Subject: [PATCH 8/8] fix spec-evolution overwrite predicate matching --- pyiceberg/table/__init__.py | 3 ++- pyiceberg/table/update/snapshot.py | 14 ++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 536933d759..c1e222bb0b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -632,6 +632,7 @@ def dynamic_partition_overwrite( for spec_id, hist_spec in all_specs.items(): hist_source_ids = {field.source_id for field in hist_spec.fields} missing_source_ids = current_source_ids - hist_source_ids + has_overlap_with_current = bool(hist_source_ids & current_source_ids) per_record_exprs: list[BooleanExpression] = [] for partition_record in partitions_to_overwrite: @@ -640,7 +641,7 @@ def dynamic_partition_overwrite( value = partition_record[source_id_to_pos[source_id]] if value is not None: field_pred: BooleanExpression = EqualTo(Reference(col_name), value) - if source_id in missing_source_ids: + if source_id in missing_source_ids and has_overlap_with_current: field_pred = Or(field_pred, IsNull(Reference(col_name))) else: field_pred = IsNull(Reference(col_name)) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index b7fa154fec..7172a6fa3e 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -435,10 +435,13 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval - inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - schema, self._predicate, case_sensitive=self._case_sensitive - ).eval + def _strict_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return _StrictMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval + + def _inclusive_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return _InclusiveMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval existing_manifests = [] total_deleted_entries = [] @@ -458,6 +461,9 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> existing_manifests.append(manifest_file) else: # It is relevant, let's check out the content + spec_id = manifest_file.partition_spec_id + strict_metrics_evaluator = _strict_metrics_for_spec(spec_id) + inclusive_metrics_evaluator = _inclusive_metrics_for_spec(spec_id) deleted_entries = [] existing_entries = [] for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):