From b4a7a849aa0814f27d75399288aa3aa84c0d7875 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 20:54:24 +0000 Subject: [PATCH] feat(storage): integrate full-object checksum in AsyncMultiRangeDownloader --- .../asyncio/async_multi_range_downloader.py | 42 +++++- .../tests/system/test_zonal.py | 78 ++++++++++++ .../test_async_multi_range_downloader.py | 120 +++++++++++++++++- 3 files changed, 231 insertions(+), 9 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py index ac0844519e2d..fa680e95dba2 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -45,6 +45,7 @@ _ReadResumptionStrategy, ) +from google.cloud.storage.exceptions import DataCorruption from ._utils import raise_if_no_fast_crc32c _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 @@ -219,8 +220,6 @@ def __init__( ) generation = kwargs.pop("generation_number") - raise_if_no_fast_crc32c() - self.client = client self.bucket_name = bucket_name self.object_name = object_name @@ -232,6 +231,8 @@ def __init__( self._multiplexer: Optional[_StreamMultiplexer] = None self.persisted_size: Optional[int] = None # updated after opening the stream self._open_retries: int = 0 + self.is_finalized: bool = False + self.full_obj_server_crc32c: Optional[int] = None async def __aenter__(self): """Opens the underlying bidi-gRPC connection to read from the object.""" @@ -327,6 +328,8 @@ async def _do_open(): self.read_handle = self.read_obj_str.read_handle if self.read_obj_str.persisted_size is not None: self.persisted_size = self.read_obj_str.persisted_size + self.is_finalized = self.read_obj_str.is_finalized + self.full_obj_server_crc32c = self.read_obj_str.full_obj_server_crc32c self._is_stream_open = True @@ -363,6 +366,8 @@ async def factory(): self.generation = stream.generation_number if stream.read_handle: self.read_handle = stream.read_handle + self.is_finalized = stream.is_finalized + self.full_obj_server_crc32c = stream.full_obj_server_crc32c self.read_obj_str = stream self._is_stream_open = True @@ -377,6 +382,7 @@ async def download_ranges( lock: asyncio.Lock = None, retry_policy: Optional[AsyncRetry] = None, metadata: Optional[List[Tuple[str, str]]] = None, + enable_checksum: bool = True, ) -> None: """Downloads multiple byte ranges from the object into the buffers provided by user with automatic retries. @@ -412,6 +418,9 @@ async def download_ranges( "Invalid input - length of read_ranges cannot be more than 1000" ) + if enable_checksum: + raise_if_no_fast_crc32c() + if not self._is_stream_open: raise ValueError("Underlying bidi-gRPC stream is not open") @@ -422,16 +431,29 @@ async def download_ranges( download_states = {} for read_range in read_ranges: read_id = generate_random_56_bit_integer() + # Unpack tuple into self-documenting variable names to improve readability. + offset, length, user_buffer = read_range + + # Heuristic to detect full object reads: + # - Implicit full object read: start offset is 0 and length is 0 (read all). + # - Explicit full object read: start offset is 0 and length matches the exact persisted size. + is_full_object_read = ( + (offset == 0 and length == 0) or + (self.persisted_size is not None and offset == 0 and length == self.persisted_size) + ) download_states[read_id] = _DownloadState( - initial_offset=read_range[0], - initial_length=read_range[1], - user_buffer=read_range[2], + initial_offset=offset, + initial_length=length, + user_buffer=user_buffer, + is_full_object_read=is_full_object_read, ) initial_state = { "download_states": download_states, "read_handle": self.read_handle, "routing_token": None, + "enable_checksum": enable_checksum, + "full_obj_server_crc32c": self.full_obj_server_crc32c, } read_ids = set(download_states.keys()) @@ -519,12 +541,18 @@ async def generator(): strategy, send_and_recv_via_multiplexer ) - await retry_manager.execute(initial_state, retry_policy) + try: + await retry_manager.execute(initial_state, retry_policy) + except DataCorruption: + if self.is_stream_open: + await self.close() + raise if initial_state.get("read_handle"): self.read_handle = initial_state["read_handle"] finally: - self._multiplexer.unregister(read_ids) + if self._multiplexer is not None: + self._multiplexer.unregister(read_ids) async def close(self): """ diff --git a/packages/google-cloud-storage/tests/system/test_zonal.py b/packages/google-cloud-storage/tests/system/test_zonal.py index 20e172a1adee..b11f584192fe 100644 --- a/packages/google-cloud-storage/tests/system/test_zonal.py +++ b/packages/google-cloud-storage/tests/system/test_zonal.py @@ -26,6 +26,7 @@ ObjectContexts, ObjectCustomContextPayload, ) +from google.cloud.storage.exceptions import DataCorruption pytestmark = pytest.mark.skipif( os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True", @@ -961,3 +962,80 @@ async def _run(): blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) event_loop.run_until_complete(_run()) + + +@pytest.mark.parametrize( + "read_start, read_length, enable_checksum", + [ + (0, 0, True), + (0, 1024 * 1024, True), + (0, 0, False), + ], +) +def test_mrd_checksum_validation( + storage_client, blobs_to_delete, event_loop, grpc_client_direct, read_start, read_length, enable_checksum +): + """ + Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects. + """ + object_size = 1024 * 1024 # 1MB + object_name = f"test_mrd_chksum-{uuid.uuid4()}" + + async def _run(): + object_data = os.urandom(object_size) + + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + await writer.append(object_data) + await writer.close(finalize_on_close=True) + + async with AsyncMultiRangeDownloader( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) as mrd: + buffer = BytesIO() + await mrd.download_ranges([(read_start, read_length, buffer)], enable_checksum=enable_checksum) + assert buffer.getvalue() == object_data + + # cleanup + del writer + gc.collect() + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + + event_loop.run_until_complete(_run()) + + +def test_mrd_checksum_unfinalized_appendable_skipped( + storage_client, blobs_to_delete, event_loop, grpc_client_direct +): + """ + Verifies that live, unfinalized appendable objects skip the full-object checksum check + naturally without raising any exceptions. + """ + object_name = f"test_mrd_chksum_unfin-{uuid.uuid4()}" + + async def _run(): + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.flush() # Flushed but not finalized! + + # Download the unfinalized appendable object with enable_checksum=True + async with AsyncMultiRangeDownloader( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) as mrd: + buffer = BytesIO() + # Since it's unfinalized, it should skip the checksum check without raising + await mrd.download_ranges([(0, 0, buffer)], enable_checksum=True) + assert buffer.getvalue() == _BYTES_TO_UPLOAD + + # cleanup + await writer.close() + del writer + gc.collect() + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + + event_loop.run_until_complete(_run()) diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py index 24a632b68131..047c749b94c4 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -308,12 +308,14 @@ async def test_downloading_without_opening_should_throw_error(self): assert not mrd.is_stream_open @mock.patch("google.cloud.storage.asyncio._utils.google_crc32c") - def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c): + @pytest.mark.asyncio + async def test_download_ranges_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c): mock_google_crc32c.implementation = "python" mock_client = mock.MagicMock() + mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object") with pytest.raises(exceptions.FailedPrecondition) as exc_info: - AsyncMultiRangeDownloader(mock_client, "bucket", "object") + await mrd.download_ranges([(0, 10, BytesIO())]) assert "The google-crc32c package is not installed with C support" in str( exc_info.value @@ -579,3 +581,117 @@ async def staged_recv(): # Assert mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.") + + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_open_populates_checksum_properties(self, mock_cls_async_read_object_stream): + # Arrange + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + mock_stream.generation_number = 123 + mock_stream.persisted_size = 100 + mock_stream.read_handle = b"h" + mock_stream.is_finalized = True + mock_stream.full_obj_server_crc32c = 999 + + mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object") + assert mrd.is_finalized is False + assert mrd.full_obj_server_crc32c is None + + # Act + await mrd.open() + + # Assert + assert mrd.is_finalized is True + assert mrd.full_obj_server_crc32c == 999 + + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_download_ranges_configures_full_object_read_state( + self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls + ): + # Arrange + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + mock_stream.persisted_size = 100 + mock_stream.is_finalized = True + mock_stream.full_obj_server_crc32c = 999 + + mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o") + + mock_retry_manager = mock_retry_manager_cls.return_value + mock_retry_manager.execute = AsyncMock() + + # Act + # Implicit full read (0, 0) and explicit full read (0, persisted_size=100) + ranges = [(0, 0, BytesIO()), (0, 100, BytesIO()), (10, 20, BytesIO())] + await mrd.download_ranges(ranges, enable_checksum=True) + + # Assert + mock_retry_manager.execute.assert_called_once() + initial_state = mock_retry_manager.execute.call_args[0][0] + + download_states = initial_state["download_states"] + assert len(download_states) == 3 + + states_list = list(download_states.values()) + # First state: (0, 0) -> is_full_object_read is True + assert states_list[0].is_full_object_read is True + assert states_list[0].rolling_checksum is not None + + # Second state: (0, 100) -> is_full_object_read is True + assert states_list[1].is_full_object_read is True + assert states_list[1].rolling_checksum is not None + + # Third state: (10, 20) -> is_full_object_read is False + assert states_list[2].is_full_object_read is False + assert states_list[2].rolling_checksum is None + + # State values for enable_checksum and crc32c + assert initial_state["enable_checksum"] is True + assert initial_state["full_obj_server_crc32c"] == 999 + + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_download_ranges_closes_on_datacorruption( + self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls + ): + # Arrange + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + + mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o") + mrd.close = AsyncMock() + + mock_retry_manager = mock_retry_manager_cls.return_value + mock_retry_manager.execute = AsyncMock(side_effect=DataCorruption(None, "corrupted")) + + # Act & Assert + with pytest.raises(DataCorruption): + await mrd.download_ranges([(0, 0, BytesIO())]) + + mrd.close.assert_called_once()