diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 95ceaa539f..fcb1465c68 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -22,7 +22,7 @@ import re import uuid from abc import ABC, abstractmethod -from collections.abc import Callable +from collections.abc import Callable, Iterator from dataclasses import dataclass from enum import Enum from typing import ( @@ -607,42 +607,42 @@ def drop_namespace(self, namespace: str | Identifier) -> None: """ @abstractmethod - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: """List views under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of view identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. diff --git a/pyiceberg/catalog/bigquery_metastore.py b/pyiceberg/catalog/bigquery_metastore.py index 938ac6992f..c9c46a6d1c 100644 --- a/pyiceberg/catalog/bigquery_metastore.py +++ b/pyiceberg/catalog/bigquery_metastore.py @@ -17,6 +17,7 @@ from __future__ import annotations import json +from collections.abc import Iterator from typing import TYPE_CHECKING, Any from google.api_core.exceptions import NotFound @@ -252,7 +253,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: database_name = self.identifier_to_database(namespace) iceberg_tables: list[Identifier] = [] try: @@ -264,10 +265,10 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: iceberg_tables.append((database_name, bq_table_list_item.table_id)) except NotFound: raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}' not found.") from None - return iceberg_tables + return iter(iceberg_tables) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: # Since this catalog only supports one-level namespaces, it always returns an empty list unless # passed an empty namespace to list all namespaces within the catalog. if namespace: @@ -275,7 +276,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: # List top-level datasets datasets_iterator = self.client.list_datasets() - return [(dataset.dataset_id,) for dataset in datasets_iterator] + return iter([(dataset.dataset_id,) for dataset in datasets_iterator]) @override def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table: @@ -314,7 +315,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o return self.load_table(identifier=identifier) @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 74c0be6c9a..0a07c51771 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import uuid +from collections.abc import Iterator from time import time from typing import ( TYPE_CHECKING, @@ -396,8 +397,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None: database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) table_identifiers = self.list_tables(namespace=database_name) - if len(table_identifiers) > 0: + try: + next(table_identifiers) raise NamespaceNotEmptyError(f"Database {database_name} is not empty") + except StopIteration: + pass try: self._delete_dynamo_item( @@ -409,14 +413,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) @@ -451,20 +455,20 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: table_identifiers.append(self.identifier_to_tuple(identifier_col)) - return table_identifiers + return iter(table_identifiers) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List top-level namespaces from the catalog. We do not support hierarchical namespace. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return iter([]) paginator = self.dynamodb.get_paginator("query") @@ -494,7 +498,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: namespace_col = _dict[DYNAMODB_COL_NAMESPACE] database_identifiers.append(self.identifier_to_tuple(namespace_col)) - return database_identifiers + return iter(database_identifiers) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: @@ -565,7 +569,7 @@ def create_view( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 12b36efc5c..e06716d82d 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -17,6 +17,7 @@ import logging +from collections.abc import Iterator from typing import ( TYPE_CHECKING, Any, @@ -860,14 +861,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None: self.glue.delete_database(Name=database_name) @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. @@ -889,18 +890,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e - return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)] + return iter([(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return iter([]) database_list: list[DatabaseTypeDef] = [] next_token: str | None = None @@ -912,7 +913,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: if not next_token: break - return [self.identifier_to_tuple(database["Name"]) for database in database_list] + return iter([self.identifier_to_tuple(database["Name"]) for database in database_list]) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: @@ -982,7 +983,7 @@ def create_view( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 181f9d4661..4e34426c5a 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -18,6 +18,7 @@ import logging import socket import time +from collections.abc import Iterator from types import TracebackType from typing import ( TYPE_CHECKING, @@ -479,7 +480,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o return self._convert_hive_into_iceberg(hive_table) @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override @@ -760,7 +761,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. When the database doesn't exist, it will just return an empty list. @@ -769,34 +770,36 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: namespace: Database to list. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) with self._client as open_client: - return [ - (database_name, table.tableName) - for table in open_client.get_table_objects_by_name( - dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name) - ) - if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG - ] + return iter( + [ + (database_name, table.tableName) + for table in open_client.get_table_objects_by_name( + dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name) + ) + if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG + ] + ) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return iter([]) with self._client as open_client: - return list(map(self.identifier_to_tuple, open_client.get_all_databases())) + return iter(list(map(self.identifier_to_tuple, open_client.get_all_databases()))) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index aeb3c72843..f1afae7a67 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +from collections.abc import Iterator from typing import ( TYPE_CHECKING, ) @@ -124,11 +125,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NotImplementedError @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: raise NotImplementedError @override @@ -142,7 +143,7 @@ def update_namespace_properties( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index d085c6fd87..8b42e775b3 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -17,6 +17,7 @@ from __future__ import annotations from collections import deque +from collections.abc import Iterator from enum import Enum from typing import ( TYPE_CHECKING, @@ -1037,8 +1038,16 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @retry(**_RETRY_ARGS) + def _fetch_tables_page(self, url: str, params: dict[str, str]) -> ListTablesResponse: + response = self._session.get(url, params=params) + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + return ListTablesResponse.model_validate_json(response.text) + @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: self._check_endpoint(Capability.V1_LIST_TABLES) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = self._encode_namespace_path(namespace_tuple) @@ -1051,27 +1060,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: raise ValueError(f"{PAGE_SIZE} must be a positive integer") params["pageSize"] = str(page_size) - tables: list[Identifier] = [] page_token: str | None = None while True: if page_token: params["pageToken"] = page_token - response = self._session.get(url, params=params) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - - parsed = ListTablesResponse.model_validate_json(response.text) - tables.extend([(*table.namespace, table.name) for table in parsed.identifiers]) + parsed = self._fetch_tables_page(url, params) + yield from [(*table.namespace, table.name) for table in parsed.identifiers] if not parsed.next_page_token: break page_token = parsed.next_page_token - return tables - @retry(**_RETRY_ARGS) @override def load_table(self, identifier: str | Identifier) -> Table: @@ -1150,10 +1150,18 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm return table_request @retry(**_RETRY_ARGS) + def _fetch_views_page(self, url: str, params: dict[str, str]) -> ListViewsResponse: + response = self._session.get(url, params=params) + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + return ListViewsResponse.model_validate_json(response.text) + @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: if Capability.V1_LIST_VIEWS not in self._supported_endpoints: - return [] + return namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = self._encode_namespace_path(namespace_tuple) url = self.url(Endpoints.list_views, namespace=namespace_concat) @@ -1165,28 +1173,18 @@ def list_views(self, namespace: str | Identifier) -> list[Identifier]: raise ValueError(f"{PAGE_SIZE} must be a positive integer") params["pageSize"] = str(page_size) - views: list[Identifier] = [] page_token: str | None = None while True: if page_token: params["pageToken"] = page_token - - response = self._session.get(url, params=params) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - - parsed = ListViewsResponse.model_validate_json(response.text) - views.extend([(*view.namespace, view.name) for view in parsed.identifiers]) + parsed = self._fetch_views_page(url, params) + yield from [(*view.namespace, view.name) for view in parsed.identifiers] if not parsed.next_page_token: break page_token = parsed.next_page_token - return views - @retry(**_RETRY_ARGS) @override def load_view(self, identifier: str | Identifier) -> View: @@ -1275,8 +1273,16 @@ def drop_namespace(self, namespace: str | Identifier) -> None: _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) @retry(**_RETRY_ARGS) + def _fetch_namespaces_page(self, params: dict[str, str]) -> ListNamespaceResponse: + response = self._session.get(self.url(Endpoints.list_namespaces), params=params) + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + return ListNamespaceResponse.model_validate_json(response.text) + @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: self._check_endpoint(Capability.V1_LIST_NAMESPACES) namespace_tuple = self.identifier_to_tuple(namespace) @@ -1287,7 +1293,6 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: raise ValueError(f"{PAGE_SIZE} must be a positive integer") params["pageSize"] = str(page_size) - namespaces: list[Identifier] = [] page_token: str | None = None while True: @@ -1295,22 +1300,13 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: params["parent"] = self._encode_namespace_path(namespace_tuple) if page_token: params["pageToken"] = page_token - response = self._session.get(self.url(Endpoints.list_namespaces), params=params) - - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - - parsed = ListNamespaceResponse.model_validate_json(response.text) - namespaces.extend(parsed.namespaces) + parsed = self._fetch_namespaces_page(params) + yield from parsed.namespaces if not parsed.next_page_token: break page_token = parsed.next_page_token - return namespaces - @retry(**_RETRY_ARGS) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 87446bd58b..0112dcb677 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -17,6 +17,7 @@ from __future__ import annotations +from collections.abc import Iterator from typing import ( TYPE_CHECKING, ) @@ -586,8 +587,12 @@ def drop_namespace(self, namespace: str | Identifier) -> None: raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") namespace_str = Catalog.namespace_to_string(namespace) - if tables := self.list_tables(namespace): - raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty. {len(tables)} tables exist.") + tables_iter = self.list_tables(namespace) + try: + next(tables_iter) + raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty.") + except StopIteration: + pass with Session(self.engine) as session: session.execute( @@ -599,14 +604,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None: session.commit() @override - def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: an iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. @@ -618,17 +623,17 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace) with Session(self.engine) as session: result = session.scalars(stmt) - return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] + return iter([(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result]) @override - def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. @@ -660,7 +665,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: } ) - return namespaces + return iter(namespaces) @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: @@ -755,7 +760,7 @@ def create_view( raise NotImplementedError @override - def list_views(self, namespace: str | Identifier) -> list[Identifier]: + def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]: raise NotImplementedError @override diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 3feed9fb21..a151e09d44 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=broad-except,redefined-builtin,redefined-outer-name +import itertools import logging -from collections.abc import Callable +from collections.abc import Callable, Iterator from functools import wraps from typing import ( Any, @@ -33,6 +34,7 @@ from pyiceberg.io import WAREHOUSE from pyiceberg.table import TableProperties from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.typedef import Identifier from pyiceberg.utils.properties import property_as_int @@ -130,14 +132,19 @@ def _catalog_and_output(ctx: Context) -> tuple[Catalog, Output]: def list(ctx: Context, parent: str | None) -> None: # pylint: disable=redefined-builtin """List tables or namespaces.""" catalog, output = _catalog_and_output(ctx) + identifiers: Iterator[Identifier] - identifiers = [] if parent: - # Do we have tables under parent namespace? - identifiers = catalog.list_tables(parent) - if not identifiers: - # List hierarchical namespaces if parent, root namespaces otherwise. - identifiers = catalog.list_namespaces(parent or ()) + # Do we have tables under parent namespace? Peek at first element. + tables = catalog.list_tables(parent) + try: + first = next(tables) + identifiers = itertools.chain([first], tables) + except StopIteration: + # No tables found; list hierarchical namespaces instead. + identifiers = catalog.list_namespaces(parent) + else: + identifiers = catalog.list_namespaces(()) output.identifiers(identifiers) diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index 332221008c..5584bd22a5 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -16,6 +16,7 @@ # under the License. import json from abc import ABC, abstractmethod +from collections.abc import Iterable from typing import ( Any, ) @@ -40,7 +41,7 @@ class Output(ABC): def exception(self, ex: Exception) -> None: ... @abstractmethod - def identifiers(self, identifiers: list[Identifier]) -> None: ... + def identifiers(self, identifiers: Iterable[Identifier]) -> None: ... @abstractmethod def describe_table(self, table: Table) -> None: ... @@ -88,7 +89,7 @@ def exception(self, ex: Exception) -> None: else: Console(stderr=True).print(ex) - def identifiers(self, identifiers: list[Identifier]) -> None: + def identifiers(self, identifiers: Iterable[Identifier]) -> None: table = self._table for identifier in identifiers: table.add_row(".".join(identifier)) @@ -199,7 +200,7 @@ def _out(self, d: Any) -> None: def exception(self, ex: Exception) -> None: self._out({"type": ex.__class__.__name__, "message": str(ex)}) - def identifiers(self, identifiers: list[Identifier]) -> None: + def identifiers(self, identifiers: Iterable[Identifier]) -> None: self._out([".".join(identifier) for identifier in identifiers]) def describe_table(self, table: Table) -> None: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 64ad10050d..c1e222bb0b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -586,10 +586,127 @@ def dynamic_partition_overwrite( ) partitions_to_overwrite = {data_file.partition for data_file in data_files} - delete_filter = self._build_partition_predicate( - partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema() + current_spec = self.table_metadata.spec() + all_specs = self.table_metadata.specs() + schema = self.table_metadata.schema() + + # Keep the existing dynamic overwrite behavior for non-evolution tables. + # We only need per-spec predicate handling when some historical specs are + # missing current partition source IDs. + current_source_ids = {field.source_id for field in current_spec.fields} + has_missing_partition_fields_in_history = any( + current_source_ids - {field.source_id for field in historical_spec.fields} for historical_spec in all_specs.values() ) - self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + + if not has_missing_partition_fields_in_history: + delete_filter = self._build_partition_predicate( + partition_records=partitions_to_overwrite, + spec=current_spec, + schema=schema, + ) + self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) + else: + # Build per-spec delete predicates to handle partition spec evolution correctly. + # + # When a partition field was added via spec evolution, data files written under + # older specs carry NULL for that field (because it was absent from the schema at + # write time). A single "category=A AND region=us" predicate would never match + # those files because the strict-metrics evaluator sees region=NULL != "us". + # + # To fix this, we compute a per-spec predicate for every historical spec: + # - For specs that include all current partition fields -> use exact-match predicate. + # - For specs that are missing some current partition fields -> also accept NULL + # for the missing fields. + # + # These per-spec predicates are stored on the delete snapshot producer so that + # _compute_deletes uses the right predicate when evaluating each manifest file. + source_id_to_pos = {field.source_id: pos for pos, field in enumerate(current_spec.fields)} + source_id_to_col = {field.source_id: schema.find_field(field.source_id).name for field in current_spec.fields} + exact_delete_filter = self._build_partition_predicate( + partition_records=partitions_to_overwrite, + spec=current_spec, + schema=schema, + ) + + per_spec_predicates: dict[int, BooleanExpression] = {} + for spec_id, hist_spec in all_specs.items(): + hist_source_ids = {field.source_id for field in hist_spec.fields} + missing_source_ids = current_source_ids - hist_source_ids + has_overlap_with_current = bool(hist_source_ids & current_source_ids) + + per_record_exprs: list[BooleanExpression] = [] + for partition_record in partitions_to_overwrite: + predicates: list[BooleanExpression] = [] + for source_id, col_name in source_id_to_col.items(): + value = partition_record[source_id_to_pos[source_id]] + if value is not None: + field_pred: BooleanExpression = EqualTo(Reference(col_name), value) + if source_id in missing_source_ids and has_overlap_with_current: + field_pred = Or(field_pred, IsNull(Reference(col_name))) + else: + field_pred = IsNull(Reference(col_name)) + predicates.append(field_pred) + + per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0]) + + per_spec_predicates[spec_id] = Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] + + # Open the delete snapshot and set per-spec predicates before committing. + # This mirrors Transaction.delete() but injects per_spec_predicates so that + # _compute_deletes uses the right predicate for each historical spec. + from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: + delete_snapshot._per_spec_predicates = per_spec_predicates + delete_snapshot.delete_by_predicate(exact_delete_filter) + + # Handle partial-match files that need to be rewritten (copy-on-write). + if delete_snapshot.rewrites_needed is True: + bound_delete_filter = bind(self.table_metadata.schema(), exact_delete_filter, case_sensitive=True) + preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter, self.table_metadata.schema()) + + file_scan = self._scan(row_filter=exact_delete_filter) + if branch is not None: + file_scan = file_scan.use_ref(branch) + + rewrite_uuid = uuid.uuid4() + rewrite_counter = itertools.count(0) + replaced_files: list[tuple[DataFile, list[DataFile]]] = [] + for original_file in file_scan.plan_files(): + df_orig = ArrowScan( + table_metadata=self.table_metadata, + io=self._table.io, + projected_schema=self.table_metadata.schema(), + row_filter=AlwaysTrue(), + ).to_table(tasks=[original_file]) + filtered_df = df_orig.filter(preserve_row_filter) + if len(filtered_df) == 0: + replaced_files.append((original_file.file, [])) + elif len(df_orig) != len(filtered_df): + replaced_files.append( + ( + original_file.file, + list( + _dataframe_to_data_files( + io=self._table.io, + df=filtered_df, + table_metadata=self.table_metadata, + write_uuid=rewrite_uuid, + counter=rewrite_counter, + ) + ), + ) + ) + + if replaced_files: + with self.update_snapshot( + snapshot_properties=snapshot_properties, branch=branch + ).overwrite() as overwrite_snapshot: + overwrite_snapshot.commit_uuid = rewrite_uuid + for original_data_file, replacement_data_files in replaced_files: + overwrite_snapshot.delete_data_file(original_data_file) + for replacement_data_file in replacement_data_files: + overwrite_snapshot.append_data_file(replacement_data_file) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: append_files.commit_uuid = append_snapshot_commit_uuid @@ -2072,13 +2189,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + return lambda datafile: residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), ) @staticmethod diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 7931edacdd..7172a6fa3e 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -104,6 +104,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _target_branch: str | None _predicate: BooleanExpression _case_sensitive: bool + _per_spec_predicates: dict[int, BooleanExpression] def __init__( self, @@ -134,6 +135,7 @@ def __init__( ) self._predicate = AlwaysFalse() self._case_sensitive = True + self._per_spec_predicates = {} def _validate_target_branch(self, branch: str | None) -> str | None: # if branch is none, write will be written into a staging snapshot @@ -360,7 +362,8 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T def _build_partition_projection(self, spec_id: int) -> BooleanExpression: project = inclusive_projection(self.schema(), self.spec(spec_id), self._case_sensitive) - return project(self._predicate) + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return project(predicate) @cached_property def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: @@ -431,10 +434,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> schema = table_metadata.schema() manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval - inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - schema, self._predicate, case_sensitive=self._case_sensitive - ).eval + + def _strict_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return _StrictMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval + + def _inclusive_metrics_for_spec(spec_id: int) -> Callable[[DataFile], bool]: + predicate = self._per_spec_predicates.get(spec_id, self._predicate) + return _InclusiveMetricsEvaluator(schema, predicate, case_sensitive=self._case_sensitive).eval existing_manifests = [] total_deleted_entries = [] @@ -454,6 +461,9 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> existing_manifests.append(manifest_file) else: # It is relevant, let's check out the content + spec_id = manifest_file.partition_spec_id + strict_metrics_evaluator = _strict_metrics_for_spec(spec_id) + inclusive_metrics_evaluator = _inclusive_metrics_for_spec(spec_id) deleted_entries = [] existing_entries = [] for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): diff --git a/tests/catalog/integration_test_dynamodb.py b/tests/catalog/integration_test_dynamodb.py index 6ae14bca06..87988f9237 100644 --- a/tests/catalog/integration_test_dynamodb.py +++ b/tests/catalog/integration_test_dynamodb.py @@ -119,7 +119,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == LIST_TEST_NUMBER for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -207,10 +207,10 @@ def test_create_namespace_with_comment_and_location(test_catalog: Catalog, datab def test_list_namespaces(test_catalog: Catalog, database_list: list[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None: diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index c429770268..c73035b48e 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -227,7 +227,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == LIST_TEST_NUMBER for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -316,10 +316,10 @@ def test_create_namespace_with_comment_and_location(test_catalog: Catalog, datab def test_list_namespaces(test_catalog: Catalog, database_list: list[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None: diff --git a/tests/catalog/test_bigquery_metastore.py b/tests/catalog/test_bigquery_metastore.py index c8c7584262..bfca9ec081 100644 --- a/tests/catalog/test_bigquery_metastore.py +++ b/tests/catalog/test_bigquery_metastore.py @@ -151,7 +151,7 @@ def test_list_tables(mocker: MockFixture, gcp_dataset_name: str) -> None: catalog_name = "test_catalog" test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.bigquery.project-id": "my-project"}) - tables = test_catalog.list_tables(gcp_dataset_name) + tables = list(test_catalog.list_tables(gcp_dataset_name)) # Assert that all tables returned by client.list_tables are listed assert len(tables) == 2 @@ -173,7 +173,7 @@ def test_list_namespaces(mocker: MockFixture) -> None: catalog_name = "test_catalog" test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.bigquery.project-id": "my-project"}) - namespaces = test_catalog.list_namespaces() + namespaces = list(test_catalog.list_namespaces()) assert len(namespaces) == 2 assert ("dataset1",) in namespaces assert ("dataset2",) in namespaces diff --git a/tests/catalog/test_catalog_behaviors.py b/tests/catalog/test_catalog_behaviors.py index b859e2d541..bf92ef10e6 100644 --- a/tests/catalog/test_catalog_behaviors.py +++ b/tests/catalog/test_catalog_behaviors.py @@ -517,11 +517,11 @@ def test_list_tables( catalog.create_namespace(namespace_2) catalog.create_table(test_table_identifier, table_schema_nested) catalog.create_table(another_table_identifier, table_schema_nested) - identifier_list = catalog.list_tables(namespace_1) + identifier_list = list(catalog.list_tables(namespace_1)) assert len(identifier_list) == 1 assert test_table_identifier in identifier_list - identifier_list = catalog.list_tables(namespace_2) + identifier_list = list(catalog.list_tables(namespace_2)) assert len(identifier_list) == 1 assert another_table_identifier in identifier_list @@ -532,8 +532,8 @@ def test_list_tables_under_a_namespace(catalog: Catalog, table_schema_nested: Sc catalog.create_table(test_table_identifier, table_schema_nested) new_namespace = ("new", "namespace") catalog.create_namespace(new_namespace) - all_tables = catalog.list_tables(namespace=namespace) - new_namespace_tables = catalog.list_tables(new_namespace) + all_tables = list(catalog.list_tables(namespace=namespace)) + new_namespace_tables = list(catalog.list_tables(new_namespace)) assert all_tables assert test_table_identifier in all_tables assert new_namespace_tables == [] @@ -541,7 +541,7 @@ def test_list_tables_under_a_namespace(catalog: Catalog, table_schema_nested: Sc def test_list_tables_when_missing_namespace(catalog: Catalog, test_namespace: Identifier) -> None: with pytest.raises(NoSuchNamespaceError): - catalog.list_tables(test_namespace) + list(catalog.list_tables(test_namespace)) # Commit table tests @@ -1002,7 +1002,7 @@ def test_create_namespace_with_comment_and_location(catalog: Catalog, test_names "location": test_location, } catalog.create_namespace(namespace=test_namespace, properties=test_properties) - loaded_database_list = catalog.list_namespaces() + loaded_database_list = list(catalog.list_namespaces()) assert Catalog.identifier_to_tuple(test_namespace)[:1] in loaded_database_list properties = catalog.load_namespace_properties(test_namespace) assert properties["comment"] == "this is a test description" @@ -1088,17 +1088,17 @@ def test_list_namespaces(catalog: Catalog) -> None: if not catalog.namespace_exists(namespace): catalog.create_namespace(namespace) - ns_list = catalog.list_namespaces() + ns_list = list(catalog.list_namespaces()) for ns in [("db",), ("db%",), ("db2",)]: assert ns in ns_list - ns_list = catalog.list_namespaces("db") + ns_list = list(catalog.list_namespaces("db")) assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")] - ns_list = catalog.list_namespaces("db.ns1") + ns_list = list(catalog.list_namespaces("db.ns1")) assert sorted(ns_list) == [("db", "ns1", "ns2")] - ns_list = catalog.list_namespaces("db.ns1.ns2") + ns_list = list(catalog.list_namespaces("db.ns1.ns2")) assert len(ns_list) == 0 @@ -1108,14 +1108,14 @@ def test_list_namespaces_fuzzy_match(catalog: Catalog) -> None: if not catalog.namespace_exists(namespace): catalog.create_namespace(namespace) - assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")] + assert list(catalog.list_namespaces("db.ns1")) == [("db", "ns1", "ns2")] - assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")] + assert list(catalog.list_namespaces("db_.ns1")) == [("db_", "ns1", "ns2")] def test_list_non_existing_namespaces(catalog: Catalog) -> None: with pytest.raises(NoSuchNamespaceError): - catalog.list_namespaces("does_not_exist") + list(catalog.list_namespaces("does_not_exist")) # Update namespace properties tests diff --git a/tests/catalog/test_dynamodb.py b/tests/catalog/test_dynamodb.py index 5933e7d472..4bfac46b79 100644 --- a/tests/catalog/test_dynamodb.py +++ b/tests/catalog/test_dynamodb.py @@ -398,7 +398,7 @@ def test_list_tables( test_catalog.create_namespace(namespace=database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - loaded_table_list = test_catalog.list_tables(database_name) + loaded_table_list = list(test_catalog.list_tables(database_name)) for table_name in table_list: assert (database_name, table_name) in loaded_table_list @@ -408,7 +408,7 @@ def test_list_namespaces(_bucket_initialize: None, database_list: list[str]) -> test_catalog = DynamoDbCatalog("test_ddb_catalog") for database_name in database_list: test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in loaded_database_list @@ -417,7 +417,7 @@ def test_list_namespaces(_bucket_initialize: None, database_list: list[str]) -> def test_create_namespace_no_properties(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -433,7 +433,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, da } test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name, properties=test_properties) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -445,7 +445,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, da def test_create_duplicated_namespace(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list with pytest.raises(NamespaceAlreadyExistsError): @@ -456,11 +456,11 @@ def test_create_duplicated_namespace(_bucket_initialize: None, database_name: st def test_drop_namespace(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list test_catalog.drop_namespace(database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 0 @@ -472,7 +472,7 @@ def test_drop_non_empty_namespace( test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) test_catalog.create_table(identifier, table_schema_nested) - assert len(test_catalog.list_tables(database_name)) == 1 + assert len(list(test_catalog.list_tables(database_name))) == 1 with pytest.raises(NamespaceNotEmptyError): test_catalog.drop_namespace(database_name) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index c8da49a87e..b2e1796906 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -516,7 +516,7 @@ def test_list_tables( for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - loaded_table_list = test_catalog.list_tables(database_name) + loaded_table_list = list(test_catalog.list_tables(database_name)) assert (database_name, non_iceberg_table_name) not in loaded_table_list assert (database_name, non_table_type_table_name) not in loaded_table_list @@ -529,7 +529,7 @@ def test_list_namespaces(_bucket_initialize: None, moto_endpoint_url: str, datab test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) for database_name in database_list: test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in loaded_database_list @@ -538,7 +538,7 @@ def test_list_namespaces(_bucket_initialize: None, moto_endpoint_url: str, datab def test_create_namespace_no_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -554,7 +554,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, mo } test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name, properties=test_properties) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -566,7 +566,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, mo def test_create_duplicated_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list with pytest.raises(NamespaceAlreadyExistsError): @@ -577,11 +577,11 @@ def test_create_duplicated_namespace(_bucket_initialize: None, moto_endpoint_url def test_drop_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list test_catalog.drop_namespace(database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 0 @@ -593,7 +593,7 @@ def test_drop_non_empty_namespace( test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"}) test_catalog.create_namespace(namespace=database_name) test_catalog.create_table(identifier, table_schema_nested) - assert len(test_catalog.list_tables(database_name)) == 1 + assert len(list(test_catalog.list_tables(database_name))) == 1 with pytest.raises(NamespaceNotEmptyError): test_catalog.drop_namespace(database_name) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 09bb5ab920..1b2ee7f744 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -1043,7 +1043,7 @@ def test_list_tables(hive_table: HiveTable) -> None: catalog._client.__enter__().get_all_tables.return_value = ["table1", "table2", "table3", "table4"] catalog._client.__enter__().get_table_objects_by_name.return_value = [tbl1, tbl2, tbl3, tbl4] - got_tables = catalog.list_tables("database") + got_tables = list(catalog.list_tables("database")) assert got_tables == [("database", "table1"), ("database", "table2")] catalog._client.__enter__().get_all_tables.assert_called_with(db_name="database") catalog._client.__enter__().get_table_objects_by_name.assert_called_with( @@ -1057,7 +1057,7 @@ def test_list_namespaces() -> None: catalog._client = MagicMock() catalog._client.__enter__().get_all_databases.return_value = ["namespace1", "namespace2"] - assert catalog.list_namespaces() == [("namespace1",), ("namespace2",)] + assert list(catalog.list_namespaces()) == [("namespace1",), ("namespace2",)] catalog._client.__enter__().get_all_databases.assert_called() diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 1eb9f26a56..b4e352436c 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -478,7 +478,7 @@ def test_list_tables_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")] + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) == [("examples", "fooshare")] def test_list_tables_paginated_200(rest_mock: Mocker) -> None: @@ -520,7 +520,7 @@ def test_list_tables_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) assert result == [ ("examples", "table1"), ("examples", "table2"), @@ -557,7 +557,7 @@ def test_list_tables_paginated_200_none_next_page_token(rest_mock: Mocker) -> No request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) assert result == [ ("examples", "table1"), ("examples", "table2"), @@ -579,7 +579,7 @@ def test_list_tables_page_size(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_tables(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_tables(namespace)) assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/examples/tables?pageSize=100" assert result == [ @@ -597,9 +597,9 @@ def test_list_tables_200_sigv4(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_tables(namespace) == [ - ("examples", "fooshare") - ] + assert list( + RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_tables(namespace) + ) == [("examples", "fooshare")] assert rest_mock.called @@ -734,7 +734,7 @@ def test_list_tables_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) with pytest.raises(NoSuchNamespaceError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) assert "Namespace does not exist" in str(e.value) @@ -747,7 +747,7 @@ def test_list_views_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")] + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) == [("examples", "fooshare")] def test_list_views_paginated_200(rest_mock: Mocker) -> None: @@ -789,7 +789,7 @@ def test_list_views_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) assert result == [ ("examples", "view1"), ("examples", "view2"), @@ -826,7 +826,7 @@ def test_list_views_paginated_200_none_next_page_token(rest_mock: Mocker) -> Non request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) assert result == [ ("examples", "view1"), ("examples", "view2"), @@ -848,7 +848,7 @@ def test_list_views_page_size(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_views(namespace) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_views(namespace)) assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/examples/views?pageSize=100" assert result == [ @@ -872,7 +872,7 @@ def test_list_views_invalid_page_size(rest_mock: Mocker) -> None: ) with pytest.raises(ValueError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "0"}).list_views(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "0"}).list_views(namespace)) assert str(e.value) == "rest-page-size must be a positive integer" @@ -885,9 +885,9 @@ def test_list_views_200_sigv4(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) == [ - ("examples", "fooshare") - ] + assert list( + RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) + ) == [("examples", "fooshare")] assert rest_mock.called @@ -906,7 +906,7 @@ def test_list_views_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) with pytest.raises(NoSuchNamespaceError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) assert "Namespace does not exist" in str(e.value) @@ -953,7 +953,7 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None: status_code=200, request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [ + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) == [ ("default",), ("examples",), ("fokko",), @@ -968,7 +968,7 @@ def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: status_code=200, request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",)) == [ + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",))) == [ ("accounting", "tax"), ] @@ -1004,7 +1004,7 @@ def test_list_namespaces_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) assert result == [ ("ns1",), ("ns2",), @@ -1035,7 +1035,7 @@ def test_list_namespaces_with_parent_paginated_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",)) + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",))) assert result == [ ("accounting", "tax"), ("accounting", "payroll"), @@ -1064,7 +1064,7 @@ def test_list_namespaces_paginated_200_none_next_page_token(rest_mock: Mocker) - request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) assert result == [ ("ns1",), ("ns2",), @@ -1082,7 +1082,7 @@ def test_list_namespaces_page_size(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - result = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_namespaces() + result = list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{PAGE_SIZE: "100"}).list_namespaces()) assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces?pageSize=100" assert result == [ @@ -1106,7 +1106,7 @@ def test_list_namespace_with_parent_404(rest_mock: Mocker) -> None: ) with pytest.raises(NoSuchNamespaceError): - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("some_namespace",)) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("some_namespace",))) @pytest.mark.filterwarnings( @@ -1161,7 +1161,7 @@ def test_list_namespaces_token_expired_success_on_retries(rest_mock: Mocker, sta # which results in the token being refreshed twice when the RestCatalog is initialized. assert tokens.call_count == 2 - assert catalog.list_namespaces() == [ + assert list(catalog.list_namespaces()) == [ ("default",), ("examples",), ("fokko",), @@ -1170,7 +1170,7 @@ def test_list_namespaces_token_expired_success_on_retries(rest_mock: Mocker, sta assert namespaces.call_count == 2 assert tokens.call_count == 3 - assert catalog.list_namespaces() == [ + assert list(catalog.list_namespaces()) == [ ("default",), ("examples",), ("fokko",), diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 4188ad83db..fa0867770c 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -189,7 +189,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == len(table_list) for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -463,10 +463,10 @@ def test_create_namespace_with_comment(test_catalog: Catalog, database_name: str def test_list_namespaces(test_catalog: Catalog, database_list: list[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 @pytest.mark.integration diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 0c4ea258f3..5193bb2710 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1766,3 +1766,78 @@ def test_build_large_partition_predicate(table_v2: Table) -> None: ) bind(table_v2.metadata.schema(), expr, case_sensitive=True) + + +def test_dynamic_partition_overwrite_spec_evolution(tmp_path: Any) -> None: + """Regression test for https://github.com/apache/iceberg-python/issues/3148. + + After partition spec evolution, dynamic_partition_overwrite must delete data files + written under the old spec (where the new partition field was absent / NULL) when + overwriting the matching logical partition. + """ + import tempfile + + import pyarrow as pa + + from pyiceberg.catalog import load_catalog + from pyiceberg.transforms import IdentityTransform + from pyiceberg.types import LongType + + with tempfile.TemporaryDirectory() as warehouse: + catalog = load_catalog("test", type="sql", uri=f"sqlite:///{warehouse}/catalog.db", warehouse=f"file://{warehouse}") + catalog.create_namespace("default") + + schema = Schema( + NestedField(1, "category", StringType(), required=False), + NestedField(2, "region", StringType(), required=False), + NestedField(3, "value", LongType(), required=False), + ) + spec_v0 = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category")) + table = catalog.create_table("default.test_spec_evo", schema=schema, partition_spec=spec_v0) + + # Write under spec-0 (region is NULL — field exists in schema but not in partition spec) + table.append( + pa.table( + { + "category": pa.array(["A", "A", "B"], type=pa.string()), + "region": pa.array([None, None, None], type=pa.string()), + "value": pa.array([1, 2, 10], type=pa.int64()), + } + ) + ) + + # Evolve to spec-1: add region as a partition field + with table.update_spec() as u: + u.add_field("region", IdentityTransform(), "region") + table = catalog.load_table("default.test_spec_evo") + + # Write under spec-1 + table.append( + pa.table( + { + "category": pa.array(["A", "B"], type=pa.string()), + "region": pa.array(["us", "us"], type=pa.string()), + "value": pa.array([100, 200], type=pa.int64()), + } + ) + ) + + # Overwrite partition {A, us} — must also delete stale spec-0 {A} files + table.dynamic_partition_overwrite( + pa.table( + { + "category": pa.array(["A"], type=pa.string()), + "region": pa.array(["us"], type=pa.string()), + "value": pa.array([999], type=pa.int64()), + } + ) + ) + + result = table.scan().to_arrow().to_pydict() + a_values = sorted([v for c, v in zip(result["category"], result["value"], strict=True) if c == "A"]) + b_values = sorted([v for c, v in zip(result["category"], result["value"], strict=True) if c == "B"]) + + # Spec-0 rows 1,2 (category=A, region=NULL) should be gone; only 999 remains + assert a_values == [999], f"Expected [999] but got {a_values}" + # B rows from both specs should be untouched + assert b_values == [10, 200], f"Expected [10, 200] but got {b_values}"