Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
_ReadResumptionStrategy,
)

from google.cloud.storage.exceptions import DataCorruption
from ._utils import raise_if_no_fast_crc32c

_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
Expand Down Expand Up @@ -219,8 +220,6 @@ def __init__(
)
generation = kwargs.pop("generation_number")

raise_if_no_fast_crc32c()

self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
Expand All @@ -232,6 +231,8 @@ def __init__(
self._multiplexer: Optional[_StreamMultiplexer] = None
self.persisted_size: Optional[int] = None # updated after opening the stream
self._open_retries: int = 0
self.is_finalized: bool = False
self.full_obj_server_crc32c: Optional[int] = None

async def __aenter__(self):
"""Opens the underlying bidi-gRPC connection to read from the object."""
Expand Down Expand Up @@ -327,6 +328,8 @@ async def _do_open():
self.read_handle = self.read_obj_str.read_handle
if self.read_obj_str.persisted_size is not None:
self.persisted_size = self.read_obj_str.persisted_size
self.is_finalized = self.read_obj_str.is_finalized
self.full_obj_server_crc32c = self.read_obj_str.full_obj_server_crc32c

self._is_stream_open = True

Expand Down Expand Up @@ -363,6 +366,8 @@ async def factory():
self.generation = stream.generation_number
if stream.read_handle:
self.read_handle = stream.read_handle
self.is_finalized = stream.is_finalized
self.full_obj_server_crc32c = stream.full_obj_server_crc32c

self.read_obj_str = stream
self._is_stream_open = True
Expand All @@ -377,6 +382,7 @@ async def download_ranges(
lock: asyncio.Lock = None,
retry_policy: Optional[AsyncRetry] = None,
metadata: Optional[List[Tuple[str, str]]] = None,
enable_checksum: bool = True,
) -> None:
"""Downloads multiple byte ranges from the object into the buffers
provided by user with automatic retries.
Expand Down Expand Up @@ -412,6 +418,9 @@ async def download_ranges(
"Invalid input - length of read_ranges cannot be more than 1000"
)

if enable_checksum:
raise_if_no_fast_crc32c()

if not self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is not open")

Expand All @@ -422,16 +431,29 @@ async def download_ranges(
download_states = {}
for read_range in read_ranges:
read_id = generate_random_56_bit_integer()
# Unpack tuple into self-documenting variable names to improve readability.
offset, length, user_buffer = read_range

# Heuristic to detect full object reads:
# - Implicit full object read: start offset is 0 and length is 0 (read all).
# - Explicit full object read: start offset is 0 and length matches the exact persisted size.
is_full_object_read = (
(offset == 0 and length == 0) or
(self.persisted_size is not None and offset == 0 and length == self.persisted_size)
)
Comment thread
chandra-siri marked this conversation as resolved.
download_states[read_id] = _DownloadState(
initial_offset=read_range[0],
initial_length=read_range[1],
user_buffer=read_range[2],
initial_offset=offset,
initial_length=length,
user_buffer=user_buffer,
is_full_object_read=is_full_object_read,
)

initial_state = {
"download_states": download_states,
"read_handle": self.read_handle,
"routing_token": None,
"enable_checksum": enable_checksum,
"full_obj_server_crc32c": self.full_obj_server_crc32c,
}

read_ids = set(download_states.keys())
Expand Down Expand Up @@ -519,12 +541,18 @@ async def generator():
strategy, send_and_recv_via_multiplexer
)

await retry_manager.execute(initial_state, retry_policy)
try:
await retry_manager.execute(initial_state, retry_policy)
except DataCorruption:
if self.is_stream_open:
await self.close()
raise

if initial_state.get("read_handle"):
self.read_handle = initial_state["read_handle"]
finally:
self._multiplexer.unregister(read_ids)
if self._multiplexer is not None:
self._multiplexer.unregister(read_ids)

async def close(self):
"""
Expand Down
78 changes: 78 additions & 0 deletions packages/google-cloud-storage/tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ObjectContexts,
ObjectCustomContextPayload,
)
from google.cloud.storage.exceptions import DataCorruption

pytestmark = pytest.mark.skipif(
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
Expand Down Expand Up @@ -961,3 +962,80 @@ async def _run():
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())


@pytest.mark.parametrize(
"read_start, read_length, enable_checksum",
[
(0, 0, True),
(0, 1024 * 1024, True),
(0, 0, False),
],
)
def test_mrd_checksum_validation(
storage_client, blobs_to_delete, event_loop, grpc_client_direct, read_start, read_length, enable_checksum
):
"""
Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects.
"""
object_size = 1024 * 1024 # 1MB
object_name = f"test_mrd_chksum-{uuid.uuid4()}"

async def _run():
object_data = os.urandom(object_size)

writer = AsyncAppendableObjectWriter(
grpc_client_direct, _ZONAL_BUCKET, object_name
)
await writer.open()
await writer.append(object_data)
await writer.close(finalize_on_close=True)

async with AsyncMultiRangeDownloader(
grpc_client_direct, _ZONAL_BUCKET, object_name
) as mrd:
buffer = BytesIO()
await mrd.download_ranges([(read_start, read_length, buffer)], enable_checksum=enable_checksum)
assert buffer.getvalue() == object_data

# cleanup
del writer
gc.collect()
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())


def test_mrd_checksum_unfinalized_appendable_skipped(
storage_client, blobs_to_delete, event_loop, grpc_client_direct
):
"""
Verifies that live, unfinalized appendable objects skip the full-object checksum check
naturally without raising any exceptions.
"""
object_name = f"test_mrd_chksum_unfin-{uuid.uuid4()}"

async def _run():
writer = AsyncAppendableObjectWriter(
grpc_client_direct, _ZONAL_BUCKET, object_name
)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.flush() # Flushed but not finalized!

# Download the unfinalized appendable object with enable_checksum=True
async with AsyncMultiRangeDownloader(
grpc_client_direct, _ZONAL_BUCKET, object_name
) as mrd:
buffer = BytesIO()
# Since it's unfinalized, it should skip the checksum check without raising
await mrd.download_ranges([(0, 0, buffer)], enable_checksum=True)
assert buffer.getvalue() == _BYTES_TO_UPLOAD

# cleanup
await writer.close()
del writer
gc.collect()
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,14 @@ async def test_downloading_without_opening_should_throw_error(self):
assert not mrd.is_stream_open

@mock.patch("google.cloud.storage.asyncio._utils.google_crc32c")
def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c):
@pytest.mark.asyncio
async def test_download_ranges_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c):
mock_google_crc32c.implementation = "python"
mock_client = mock.MagicMock()
mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")

with pytest.raises(exceptions.FailedPrecondition) as exc_info:
AsyncMultiRangeDownloader(mock_client, "bucket", "object")
await mrd.download_ranges([(0, 10, BytesIO())])

assert "The google-crc32c package is not installed with C support" in str(
exc_info.value
Expand Down Expand Up @@ -579,3 +581,117 @@ async def staged_recv():

# Assert
mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.")

@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_open_populates_checksum_properties(self, mock_cls_async_read_object_stream):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.generation_number = 123
mock_stream.persisted_size = 100
mock_stream.read_handle = b"h"
mock_stream.is_finalized = True
mock_stream.full_obj_server_crc32c = 999

mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")
assert mrd.is_finalized is False
assert mrd.full_obj_server_crc32c is None

# Act
await mrd.open()

# Assert
assert mrd.is_finalized is True
assert mrd.full_obj_server_crc32c == 999

@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_download_ranges_configures_full_object_read_state(
self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls
):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.persisted_size = 100
mock_stream.is_finalized = True
mock_stream.full_obj_server_crc32c = 999

mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")

mock_retry_manager = mock_retry_manager_cls.return_value
mock_retry_manager.execute = AsyncMock()

# Act
# Implicit full read (0, 0) and explicit full read (0, persisted_size=100)
ranges = [(0, 0, BytesIO()), (0, 100, BytesIO()), (10, 20, BytesIO())]
await mrd.download_ranges(ranges, enable_checksum=True)

# Assert
mock_retry_manager.execute.assert_called_once()
initial_state = mock_retry_manager.execute.call_args[0][0]

download_states = initial_state["download_states"]
assert len(download_states) == 3

states_list = list(download_states.values())
# First state: (0, 0) -> is_full_object_read is True
assert states_list[0].is_full_object_read is True
assert states_list[0].rolling_checksum is not None

# Second state: (0, 100) -> is_full_object_read is True
assert states_list[1].is_full_object_read is True
assert states_list[1].rolling_checksum is not None

# Third state: (10, 20) -> is_full_object_read is False
assert states_list[2].is_full_object_read is False
assert states_list[2].rolling_checksum is None

# State values for enable_checksum and crc32c
assert initial_state["enable_checksum"] is True
assert initial_state["full_obj_server_crc32c"] == 999

@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_download_ranges_closes_on_datacorruption(
self, mock_cls_async_read_object_stream, mock_retry_manager_cls, mock_strategy_cls
):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()

mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")
mrd.close = AsyncMock()

mock_retry_manager = mock_retry_manager_cls.return_value
mock_retry_manager.execute = AsyncMock(side_effect=DataCorruption(None, "corrupted"))

# Act & Assert
with pytest.raises(DataCorruption):
await mrd.download_ranges([(0, 0, BytesIO())])

mrd.close.assert_called_once()