Skip to content

Commit 1315cef

Browse files
committed
Fixed rediscluster schedule source.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent dce4dac commit 1315cef

File tree

1 file changed

+7
-12
lines changed

1 file changed

+7
-12
lines changed

taskiq_redis/schedule_source.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,14 @@ async def get_schedules(self) -> List[ScheduledTask]:
156156
:return: list of schedules.
157157
"""
158158
schedules = []
159-
buffer = []
160159
async for key in self.redis.scan_iter(f"{self.prefix}:*"): # type: ignore[attr-defined]
161-
buffer.append(key)
162-
if len(buffer) >= self.buffer_size:
163-
schedules.extend(await self.redis.mget(buffer)) # type: ignore[attr-defined]
164-
buffer = []
165-
if buffer:
166-
schedules.extend(await self.redis.mget(buffer)) # type: ignore[attr-defined]
167-
return [
168-
model_validate(ScheduledTask, self.serializer.loadb(schedule))
169-
for schedule in schedules
170-
if schedule
171-
]
160+
raw_schedule = await self.redis.get(key)
161+
parsed_schedule = model_validate(
162+
ScheduledTask,
163+
self.serializer.loadb(raw_schedule),
164+
)
165+
schedules.append(parsed_schedule)
166+
return schedules
172167

173168
async def post_send(self, task: ScheduledTask) -> None:
174169
"""Delete a task after it's completed."""

0 commit comments

Comments
 (0)