From 5d3d51472cb0033b5313daf9d1b9d3e45d72d45c Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 22 Jun 2026 23:39:21 +0200 Subject: [PATCH] Fixed race-condition for streambroker. Signed-off-by: Pavel Kirilin --- taskiq_redis/redis_broker.py | 45 +++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 2480802..b1ad461 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -295,28 +295,31 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: ) logger.debug("Starting fetching unacknowledged messages") for stream in [self.queue_name, *self.additional_streams.keys()]: - lock = redis_conn.lock( + pipe = redis_conn.pipeline() + lock = pipe.lock( f"autoclaim:{self.consumer_group_name}:{stream}", timeout=self.unacknowledged_lock_timeout, ) - if await lock.locked(): - continue - async with lock: - pending = await redis_conn.xautoclaim( - name=stream, - groupname=self.consumer_group_name, - consumername=self.consumer_name, - min_idle_time=self.idle_timeout, - count=self.unacknowledged_batch_size, - ) - logger.debug( - "Found %d pending messages in stream %s", - len(pending[1]), - stream, + await lock.acquire() + await pipe.xautoclaim( + name=stream, + groupname=self.consumer_group_name, + consumername=self.consumer_name, + min_idle_time=self.idle_timeout, + count=self.unacknowledged_batch_size, + ) + await lock.release() + results = await pipe.execute() + pending = results[1] + + logger.debug( + "Found %d pending messages in stream %s", + len(pending[1]), + stream, + ) + for msg_id, msg in pending[1]: + logger.debug("Received message: %s", msg) + yield AckableMessage( + data=msg[b"data"], + ack=self._ack_generator(id=msg_id, queue_name=stream), ) - for msg_id, msg in pending[1]: - logger.debug("Received message: %s", msg) - yield AckableMessage( - data=msg[b"data"], - ack=self._ack_generator(id=msg_id, queue_name=stream), - )