Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ on:
pull_request:
paths-ignore:
- '*.md'
push:
paths-ignore:
- '*.md'

permissions:
actions: read
Expand All @@ -26,10 +23,8 @@ jobs:
- id: setup-uv
uses: astral-sh/setup-uv@v7
with:
enable-cache: true
cache-suffix: "3.12"
version: "latest"
python-version: "3.12"
python-version: "3.13"
- name: Install deps
run: uv sync --all-extras
- name: Run lint check
Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ repos:

- id: mypy
name: Validate types with MyPy
entry: uv run mypy
entry: uv run mypy --show-traceback .
language: system
pass_filenames: false
types: [ python ]
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]
dependencies = [
"redis>=7.0.0,<8", # TODO: fix issues in tests with 7.1.0
"redis>=8.0.0,<9",
"taskiq>=0.12.0",
]

Expand All @@ -42,7 +42,7 @@ dev = [
]
lint = [
"black>=25.11.0",
"mypy>=1.19.0",
"mypy>=2.0.0",
"ruff>=0.14.7",
]
test = [
Expand Down
36 changes: 18 additions & 18 deletions taskiq_redis/list_schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def _get_previous_time_schedules(self) -> list[bytes]:
).replace(second=0, microsecond=0) - datetime.timedelta(
minutes=1,
)
schedules = []
schedules: list[bytes] = []
async with Redis(connection_pool=self._connection_pool) as redis:
time_keys: list[str] = []
# We need to get all the time keys and check if the time is less than
Expand All @@ -133,28 +133,28 @@ async def _get_previous_time_schedules(self) -> list[bytes]:
if key_time and key_time <= minute_before:
time_keys.append(key.decode())
for key in time_keys:
schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore[misc]
schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore[arg-type]

return schedules

async def delete_schedule(self, schedule_id: str) -> None:
"""Delete a schedule from the source."""
async with Redis(connection_pool=self._connection_pool) as redis:
schedule = await redis.getdel(self._get_data_key(schedule_id))
if schedule is not None:
raw_schedule = await redis.getdel(self._get_data_key(schedule_id))
if raw_schedule is not None:
logger.debug("Deleting schedule %s", schedule_id)
schedule = model_validate(
ScheduledTask,
self._serializer.loadb(schedule),
self._serializer.loadb(raw_schedule), # type: ignore[arg-type]
)
# We need to remove the schedule from the cron or time list.
if schedule.cron is not None:
await redis.lrem(self._get_cron_key(), 0, schedule_id) # type: ignore[misc]
await redis.lrem(self._get_cron_key(), 0, schedule_id)
elif schedule.time is not None:
time_key = self._get_time_key(schedule.time)
await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc]
await redis.lrem(time_key, 0, schedule_id)
elif schedule.interval:
await redis.lrem(self._get_interval_key(), 0, schedule_id) # type: ignore[misc]
await redis.lrem(self._get_interval_key(), 0, schedule_id)

async def add_schedule(self, schedule: "ScheduledTask") -> None:
"""Add a schedule to the source."""
Expand All @@ -168,14 +168,14 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None:
# This is an optimization, so we can get all the schedules
# for the current time much faster.
if schedule.cron is not None:
await redis.rpush(self._get_cron_key(), schedule.schedule_id) # type: ignore[misc]
await redis.rpush(self._get_cron_key(), schedule.schedule_id)
elif schedule.time is not None:
await redis.rpush( # type: ignore[misc]
await redis.rpush(
self._get_time_key(schedule.time),
schedule.schedule_id,
)
elif schedule.interval:
await redis.rpush( # type: ignore[misc]
await redis.rpush(
self._get_interval_key(),
schedule.schedule_id,
)
Expand Down Expand Up @@ -204,16 +204,16 @@ async def get_schedules(self) -> list["ScheduledTask"]:
timed = await self._get_previous_time_schedules()
self._is_first_run = False
async with Redis(connection_pool=self._connection_pool) as redis:
buffer = []
crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore[misc]
buffer: list[bytes] = []
crons = await redis.lrange(self._get_cron_key(), 0, -1)
logger.debug("Got %d cron schedules", len(crons))
if crons:
buffer.extend(crons)
intervals = await redis.lrange(self._get_interval_key(), 0, -1) # type: ignore[misc]
buffer.extend(crons) # type: ignore[arg-type]
intervals = await redis.lrange(self._get_interval_key(), 0, -1)
logger.debug("Got %d interval schedules", len(intervals))
if intervals:
buffer.extend(intervals)
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[misc]
buffer.extend(intervals) # type: ignore[arg-type]
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[arg-type]
logger.debug("Got %d timed schedules", len(timed))
if timed:
buffer.extend(timed)
Expand All @@ -229,7 +229,7 @@ async def get_schedules(self) -> list["ScheduledTask"]:
buffer = buffer[self._buffer_size :]

return [
model_validate(ScheduledTask, self._serializer.loadb(schedule))
model_validate(ScheduledTask, self._serializer.loadb(schedule)) # type: ignore[arg-type]
for schedule in schedules
if schedule
]
Expand Down
12 changes: 6 additions & 6 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def get_result(

taskiq_result = model_validate(
TaskiqResult[_ReturnType],
self.serializer.loadb(result_value),
self.serializer.loadb(result_value), # type: ignore[arg-type]
)

if not with_logs:
Expand Down Expand Up @@ -208,7 +208,7 @@ async def get_progress(

return model_validate(
TaskProgress[_ReturnType],
self.serializer.loadb(result_value),
self.serializer.loadb(result_value), # type: ignore[arg-type]
)


Expand Down Expand Up @@ -333,7 +333,7 @@ async def get_result(

taskiq_result: TaskiqResult[_ReturnType] = model_validate(
TaskiqResult[_ReturnType],
self.serializer.loadb(result_value),
self.serializer.loadb(result_value), # type: ignore[arg-type]
)

if not with_logs:
Expand Down Expand Up @@ -384,7 +384,7 @@ async def get_progress(

return model_validate(
TaskProgress[_ReturnType],
self.serializer.loadb(result_value),
self.serializer.loadb(result_value), # type: ignore[arg-type]
)


Expand Down Expand Up @@ -519,7 +519,7 @@ async def get_result(

taskiq_result = model_validate(
TaskiqResult[_ReturnType],
self.serializer.loadb(result_value),
self.serializer.loadb(result_value), # type: ignore[arg-type]
)

if not with_logs:
Expand Down Expand Up @@ -571,7 +571,7 @@ async def get_progress(

return model_validate(
TaskProgress[_ReturnType],
self.serializer.loadb(result_value),
self.serializer.loadb(result_value), # type: ignore[arg-type]
)

async def shutdown(self) -> None:
Expand Down
19 changes: 11 additions & 8 deletions taskiq_redis/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def kick(self, message: BrokerMessage) -> None:
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with Redis(connection_pool=self.connection_pool) as redis_conn:
await redis_conn.lpush(queue_name, message.message) # type: ignore
await redis_conn.lpush(queue_name, message.message)

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Expand All @@ -129,9 +129,10 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
while True:
try:
async with Redis(connection_pool=self.connection_pool) as redis_conn:
yield (await redis_conn.brpop(self.queue_name))[ # type: ignore
redis_brpop_data_position
]
brpop_result = await redis_conn.brpop(self.queue_name)
if brpop_result is None:
continue
yield brpop_result[redis_brpop_data_position] # type: ignore[misc]
except ConnectionError as exc:
logger.warning("Redis connection error: %s", exc)
continue
Expand Down Expand Up @@ -283,12 +284,14 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
noack=False,
count=self.count,
)
for stream, msg_list in fetched:
for msg_id, msg in msg_list:
if not fetched:
continue
for stream, msg_list in fetched: # type: ignore[str-unpack]
for msg_id, msg in msg_list: # type: ignore[str-unpack,union-attr]
logger.debug("Received message: %s", msg)
yield AckableMessage(
data=msg[b"data"],
ack=self._ack_generator(id=msg_id, queue_name=stream),
data=msg[b"data"], # type: ignore[arg-type,index]
ack=self._ack_generator(id=msg_id, queue_name=stream), # type: ignore[arg-type]
)
logger.debug("Starting fetching unacknowledged messages")
for stream in [self.queue_name, *self.additional_streams.keys()]:
Expand Down
18 changes: 11 additions & 7 deletions taskiq_redis/redis_cluster_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def kick(self, message: BrokerMessage) -> None:
:param message: message to append.
"""
queue_name = message.labels.get("queue_name") or self.queue_name
await self.redis.lpush(queue_name, message.message) # type: ignore
await self.redis.lpush(queue_name, message.message)

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Expand All @@ -70,8 +70,10 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
"""
redis_brpop_data_position = 1
while True:
value = await self.redis.brpop([self.queue_name]) # type: ignore
yield value[redis_brpop_data_position]
value = await self.redis.brpop([self.queue_name])
if value is None:
continue
yield value[redis_brpop_data_position] # type: ignore[misc]


class RedisStreamClusterBroker(BaseRedisClusterBroker):
Expand Down Expand Up @@ -195,10 +197,12 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
block=self.block,
noack=False,
)
for stream, msg_list in fetched:
for msg_id, msg in msg_list:
if not fetched:
continue
for stream, msg_list in fetched: # type: ignore[str-unpack]
for msg_id, msg in msg_list: # type: ignore[str-unpack,union-attr]
logger.debug("Received message: %s", msg)
yield AckableMessage(
data=msg[b"data"],
ack=self._ack_generator(id=msg_id, queue_name=stream),
data=msg[b"data"], # type: ignore[arg-type,index]
ack=self._ack_generator(id=msg_id, queue_name=stream), # type: ignore[arg-type]
)
19 changes: 11 additions & 8 deletions taskiq_redis/redis_sentinel_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def kick(self, message: BrokerMessage) -> None:
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with self._acquire_master_conn() as redis_conn:
await redis_conn.lpush(queue_name, message.message) # type: ignore
await redis_conn.lpush(queue_name, message.message)

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Expand All @@ -119,9 +119,10 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
redis_brpop_data_position = 1
async with self._acquire_master_conn() as redis_conn:
while True:
yield (await redis_conn.brpop(self.queue_name))[ # type: ignore
redis_brpop_data_position
]
brpop_result = await redis_conn.brpop(self.queue_name)
if brpop_result is None:
continue
yield brpop_result[redis_brpop_data_position] # type: ignore[misc]


class RedisStreamSentinelBroker(BaseSentinelBroker):
Expand Down Expand Up @@ -252,10 +253,12 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
block=self.block,
noack=False,
)
for stream, msg_list in fetched:
for msg_id, msg in msg_list:
if not fetched:
continue
for stream, msg_list in fetched: # type: ignore[str-unpack]
for msg_id, msg in msg_list: # type: ignore[str-unpack,union-attr]
logger.debug("Received message: %s", msg)
yield AckableMessage(
data=msg[b"data"],
ack=self._ack_generator(id=msg_id, queue_name=stream),
data=msg[b"data"], # type: ignore[arg-type,index]
ack=self._ack_generator(id=msg_id, queue_name=stream), # type: ignore[arg-type]
)
6 changes: 3 additions & 3 deletions taskiq_redis/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def get_schedules(self) -> list[ScheduledTask]:
if buffer:
schedules.extend(await redis.mget(buffer))
return [
model_validate(ScheduledTask, self.serializer.loadb(schedule))
model_validate(ScheduledTask, self.serializer.loadb(schedule)) # type: ignore[arg-type]
for schedule in schedules
if schedule
]
Expand Down Expand Up @@ -179,7 +179,7 @@ async def get_schedules(self) -> list[ScheduledTask]:
raw_schedule = await self.redis.get(key)
parsed_schedule = model_validate(
ScheduledTask,
self.serializer.loadb(raw_schedule),
self.serializer.loadb(raw_schedule), # type: ignore[arg-type]
)
schedules.append(parsed_schedule)
return schedules
Expand Down Expand Up @@ -277,7 +277,7 @@ async def get_schedules(self) -> list[ScheduledTask]:
if buffer:
schedules.extend(await redis.mget(buffer))
return [
model_validate(ScheduledTask, self.serializer.loadb(schedule))
model_validate(ScheduledTask, self.serializer.loadb(schedule)) # type: ignore[arg-type]
for schedule in schedules
if schedule
]
Expand Down
Loading
Loading