From 4101ef44b4165aee98ed1b42709a22ba52c5cca2 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Mon, 25 May 2026 16:32:41 +0200 Subject: [PATCH] fix: flush StreamedLogAsync tail when stop() cancels the task --- src/apify_client/_streamed_log.py | 11 ++-- tests/unit/test_logging.py | 102 ++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 5 deletions(-) diff --git a/src/apify_client/_streamed_log.py b/src/apify_client/_streamed_log.py index f57ba074..84300287 100644 --- a/src/apify_client/_streamed_log.py +++ b/src/apify_client/_streamed_log.py @@ -214,8 +214,9 @@ async def _stream_log(self) -> None: async with self._log_client.stream(raw=True) as log_stream: if not log_stream: return - async for data in log_stream.aiter_bytes(): - self._process_new_data(data) - - # If the stream is finished, then the last part will be also processed. - self._log_buffer_content(include_last_part=True) + try: + async for data in log_stream.aiter_bytes(): + self._process_new_data(data) + finally: + # Flush the last buffered part even if the task is cancelled by `stop()`. + self._log_buffer_content(include_last_part=True) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 62139e67..fd980f87 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -3,6 +3,7 @@ import asyncio import json import logging +import threading import time from datetime import datetime, timedelta from typing import TYPE_CHECKING @@ -716,3 +717,104 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception( elapsed = time.monotonic() - start assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s' + + +_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line' +_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line' + + +def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None: + """Register the minimal run and actor endpoints required by `get_streamed_log`.""" + httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json( + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'userId': 'test_user_id', + 'startedAt': '2019-11-30T07:34:24.202Z', + 'finishedAt': '2019-12-12T09:30:12.202Z', + 'status': 'RUNNING', + 'statusMessage': 'Running', + 'isStatusMessageTerminal': False, + 'meta': {'origin': 'WEB'}, + 'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1}, + 'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048}, + 'buildId': 'test_build_id', + 'generalAccess': 'RESTRICTED', + 'defaultKeyValueStoreId': 'test_kvs_id', + 'defaultDatasetId': 'test_dataset_id', + 'defaultRequestQueueId': 'test_rq_id', + 'buildNumber': '0.0.1', + 'containerUrl': 'https://test.runs.apify.net', + } + } + ) + httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json( + { + 'data': { + 'id': _MOCKED_ACTOR_ID, + 'userId': 'test_user_id', + 'name': _MOCKED_ACTOR_NAME, + 'username': 'test_user', + 'isPublic': False, + 'createdAt': '2019-07-08T11:27:57.401Z', + 'modifiedAt': '2019-07-08T14:01:05.546Z', + 'stats': { + 'totalBuilds': 0, + 'totalRuns': 0, + 'totalUsers': 0, + 'totalUsers7Days': 0, + 'totalUsers30Days': 0, + 'totalUsers90Days': 0, + 'totalMetamorphs': 0, + 'lastRunStartedAt': '2019-07-08T14:01:05.546Z', + }, + 'versions': [], + 'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048}, + 'deploymentKey': 'test_key', + } + } + ) + + +@pytest.mark.usefixtures('propagate_stream_logs') +async def test_streamed_log_async_stop_flushes_buffered_tail( + caplog: LogCaptureFixture, + httpserver: HTTPServer, +) -> None: + """Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`.""" + stop_emitting = threading.Event() + + def _tail_handler(_request: Request) -> Response: + def generate_logs() -> Iterator[bytes]: + yield f'{_TAIL_FIRST_MESSAGE}\n'.encode() + # Second marker has no trailing newline/next-marker, so it stays in the buffer. + yield _TAIL_SECOND_MESSAGE.encode() + # Block until the test tears the server down (or stop releases us). + stop_emitting.wait(timeout=30) + + return Response(response=generate_logs(), status=200, mimetype='application/octet-stream') + + httpserver.expect_request( + f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true' + ).respond_with_handler(_tail_handler) + _register_run_and_actor_endpoints(httpserver) + + api_url = httpserver.url_for('/').removesuffix('/') + run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) + streamed_log = await run_client.get_streamed_log() + + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + try: + with caplog.at_level(logging.DEBUG, logger=logger_name): + async with streamed_log: + # Wait long enough for both chunks to arrive and be processed. + await asyncio.sleep(1) + # Context exit calls stop(), which cancels the task mid-stream. + finally: + stop_emitting.set() + + messages = [record.message for record in caplog.records] + assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}' + assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'