From 7539a2e5079ac9620e8af8a86e5f790358f71fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Sat, 6 Jun 2026 00:56:50 +0200 Subject: [PATCH] fix(scheduling): offload sync @scheduled tasks off the event loop + bump v26.06.17 Surfaced by an audit while validating implement-scheduling (skill clean: tasks fire repeatedly, errors isolated, graceful shutdown, cron next-run correct via croniter). TaskScheduler._invoke ran the scheduled method inline on the event loop, so a SYNC @scheduled task with a blocking body (I/O, time.sleep) stalled the whole application for its duration. Sync methods are now offloaded via asyncio.to_thread; async methods are still awaited on the loop. Verified: a sync task with time.sleep(0.4) previously froze a 20ms heartbeat to 1 tick over 0.2s; now it keeps ticking. Regression test tests/scheduling/test_sync_offload.py (confirmed it fails without the fix: assert 1 >= 4). Gates: mypy --strict (607), ruff + format, full suite 3684 passed. --- CHANGELOG.md | 19 ++++++ README.md | 2 +- pyproject.toml | 2 +- src/pyfly/__init__.py | 2 +- src/pyfly/scheduling/task_scheduler.py | 14 ++++- tests/scheduling/test_sync_offload.py | 83 ++++++++++++++++++++++++++ uv.lock | 2 +- 7 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 tests/scheduling/test_sync_offload.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e9853671..83957612 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.17 (2026-06-06) + +### Fixed + +- **A synchronous `@scheduled` task no longer blocks the event loop.** + `TaskScheduler._invoke` ran the scheduled method inline on the loop, so a sync + task with a blocking body (I/O, `time.sleep`, a blocking DB/HTTP call) stalled + the entire application for its duration — every request and every other task. + Sync methods are now offloaded to a worker thread via `asyncio.to_thread`; + async methods are still awaited on the loop. (Confirmed: a sync task with + `time.sleep(0.4)` previously froze a 20 ms heartbeat to a single tick over + 0.2 s; it now keeps ticking.) + +This surfaced in an audit while validating the `implement-scheduling` skill (which +validated clean — tasks fire repeatedly, errors are isolated, shutdown is +graceful, and the cron next-run is correct via croniter). + +--- + ## v26.06.16 (2026-06-05) ### Fixed diff --git a/README.md b/README.md index e18c48a6..65ae4275 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Firefly Framework Python 3.12+ License: Apache 2.0 - Version: 26.06.16 + Version: 26.06.17 Type Checked: mypy strict Code Style: Ruff Async First diff --git a/pyproject.toml b/pyproject.toml index 053c7428..cfddbfd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.16" +version = "26.6.17" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index 08514259..5455c4d3 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.16" +__version__ = "26.06.17" diff --git a/src/pyfly/scheduling/task_scheduler.py b/src/pyfly/scheduling/task_scheduler.py index ee2bd7cc..d1606e3a 100644 --- a/src/pyfly/scheduling/task_scheduler.py +++ b/src/pyfly/scheduling/task_scheduler.py @@ -204,13 +204,21 @@ async def _run_fixed_delay_loop( async def _invoke(bean: Any, method: Callable[..., Any]) -> None: """Invoke a scheduled method, handling both sync and async methods. + An async method is awaited on the event loop; a **synchronous** method is + offloaded to a worker thread (``asyncio.to_thread``) so a blocking body + (I/O, ``time.sleep``) does not stall the loop — and therefore the whole + application — for the duration of the task. + Exceptions are logged (not propagated) so a failing iteration of a cron / fixed-rate job — whose task is not awaited by the loop — is still reported through the framework logger instead of vanishing (audit #186). """ try: - result = method() - if inspect.isawaitable(result): - await result + if inspect.iscoroutinefunction(method): + await method() + else: + result = await asyncio.to_thread(method) + if inspect.isawaitable(result): # rare: sync method returning an awaitable + await result except Exception: logger.exception("scheduled task '%s' failed", getattr(method, "__name__", method)) diff --git a/tests/scheduling/test_sync_offload.py b/tests/scheduling/test_sync_offload.py new file mode 100644 index 00000000..f696e8b1 --- /dev/null +++ b/tests/scheduling/test_sync_offload.py @@ -0,0 +1,83 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Regression: a synchronous @scheduled task is offloaded to a thread so it does +not block the event loop (v26.06.17). + +Previously TaskScheduler._invoke ran ``method()`` inline on the loop, so a blocking +sync scheduled task (I/O, ``time.sleep``) stalled the entire application for its +duration. Sync methods are now run via ``asyncio.to_thread``. +""" + +from __future__ import annotations + +import asyncio +import time +from datetime import timedelta + +import pytest + +from pyfly.scheduling import TaskScheduler, scheduled + + +class _Jobs: + def __init__(self) -> None: + self.ticks = 0 + + @scheduled(fixed_rate=timedelta(seconds=0.02)) + async def heartbeat(self) -> None: + self.ticks += 1 + + @scheduled(fixed_rate=timedelta(seconds=5)) # fires once at t~0 within our window + def blocking(self) -> None: + time.sleep(0.4) # synchronous blocking body + + +@pytest.mark.asyncio +async def test_sync_task_does_not_block_event_loop() -> None: + jobs = _Jobs() + scheduler = TaskScheduler() + assert scheduler.discover([jobs]) == 2 + await scheduler.start() + try: + # The blocking sync task (0.4s) fires immediately. If it ran inline on the + # loop, the 20ms async heartbeat could not tick during the first 0.2s. + await asyncio.sleep(0.2) + early_ticks = jobs.ticks + finally: + await scheduler.stop() + + assert early_ticks >= 4, f"event loop blocked by sync task: only {early_ticks} heartbeat ticks in 0.2s" + + +class _SyncJob: + def __init__(self) -> None: + self.count = 0 + + @scheduled(fixed_rate=timedelta(seconds=0.02)) + def tick(self) -> None: + self.count += 1 + + +@pytest.mark.asyncio +async def test_sync_task_still_executes() -> None: + job = _SyncJob() + scheduler = TaskScheduler() + scheduler.discover([job]) + await scheduler.start() + try: + await asyncio.sleep(0.2) + finally: + await scheduler.stop() + + assert job.count >= 3 # offloaded to a thread, but still runs repeatedly diff --git a/uv.lock b/uv.lock index 71f374c1..109335cc 100644 --- a/uv.lock +++ b/uv.lock @@ -1967,7 +1967,7 @@ wheels = [ [[package]] name = "pyfly" -version = "26.6.16" +version = "26.6.17" source = { editable = "." } dependencies = [ { name = "pydantic" },