File tree Expand file tree Collapse file tree 2 files changed +17
-5
lines changed Expand file tree Collapse file tree 2 files changed +17
-5
lines changed Original file line number Diff line number Diff line change @@ -117,7 +117,6 @@ def __init__(
117
117
self ,
118
118
url : str ,
119
119
prefix : str = "schedule" ,
120
- buffer_size : int = 50 ,
121
120
serializer : Optional [TaskiqSerializer ] = None ,
122
121
** connection_kwargs : Any ,
123
122
) -> None :
@@ -126,7 +125,6 @@ def __init__(
126
125
url ,
127
126
** connection_kwargs ,
128
127
)
129
- self .buffer_size = buffer_size
130
128
if serializer is None :
131
129
serializer = PickleSerializer ()
132
130
self .serializer = serializer
@@ -157,7 +155,7 @@ async def get_schedules(self) -> List[ScheduledTask]:
157
155
"""
158
156
schedules = []
159
157
async for key in self .redis .scan_iter (f"{ self .prefix } :*" ): # type: ignore[attr-defined]
160
- raw_schedule = await self .redis .get (key )
158
+ raw_schedule = await self .redis .get (key ) # type: ignore[attr-defined]
161
159
parsed_schedule = model_validate (
162
160
ScheduledTask ,
163
161
self .serializer .loadb (raw_schedule ),
Original file line number Diff line number Diff line change @@ -196,17 +196,31 @@ async def test_cluster_post_run_time(redis_cluster_url: str) -> None:
196
196
197
197
198
198
@pytest .mark .anyio
199
- async def test_cluster_buffer (redis_cluster_url : str ) -> None :
199
+ async def test_cluster_get_schedules (redis_cluster_url : str ) -> None :
200
+ """
201
+ Test of a redis cluster source.
202
+
203
+ This test checks that if the schedules are located on different nodes,
204
+ the source will still be able to get them all.
205
+
206
+ To simulate this we set a specific shard key for each schedule.
207
+ The shard keys are from this gist:
208
+
209
+ https://gist.githubusercontent.com/dvirsky/93f43277317f629bb06e858946416f7e/raw/b0438faf6f5a0020c12a0730f6cd6ac4bdc4b171/crc16_slottable.h
210
+
211
+ """
200
212
prefix = uuid .uuid4 ().hex
201
- source = RedisClusterScheduleSource (redis_cluster_url , prefix = prefix , buffer_size = 1 )
213
+ source = RedisClusterScheduleSource (redis_cluster_url , prefix = prefix )
202
214
schedule1 = ScheduledTask (
215
+ schedule_id = r"id-{06S}" ,
203
216
task_name = "test_task1" ,
204
217
labels = {},
205
218
args = [],
206
219
kwargs = {},
207
220
cron = "* * * * *" ,
208
221
)
209
222
schedule2 = ScheduledTask (
223
+ schedule_id = r"id-{4Rs}" ,
210
224
task_name = "test_task2" ,
211
225
labels = {},
212
226
args = [],
You can’t perform that action at this time.
0 commit comments