Skip to content

Commit 346e5fc

Browse files
authored
Fix Queue Limiter (#125)
Also includes a quick fix for #123
1 parent 1081866 commit 346e5fc

File tree

3 files changed

+88
-1
lines changed

3 files changed

+88
-1
lines changed

dbos/scheduler/scheduler.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from datetime import datetime, timezone
33
from typing import TYPE_CHECKING, Callable
44

5+
from dbos.logger import dbos_logger
56
from dbos.queue import Queue
67

78
if TYPE_CHECKING:
@@ -18,7 +19,12 @@
1819
def scheduler_loop(
1920
func: ScheduledWorkflow, cron: str, stop_event: threading.Event
2021
) -> None:
21-
iter = croniter(cron, datetime.now(timezone.utc), second_at_beginning=True)
22+
try:
23+
iter = croniter(cron, datetime.now(timezone.utc), second_at_beginning=True)
24+
except Exception as e:
25+
dbos_logger.error(
26+
f'Cannot run scheduled function {func.__name__}. Invalid crontab "{cron}"'
27+
)
2228
while not stop_event.is_set():
2329
nextExecTime = iter.get_next(datetime)
2430
sleepTime = nextExecTime - datetime.now(timezone.utc)

dbos/system_database.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,7 @@ def start_queued_workflows(self, queue: "Queue") -> List[str]:
11001100
query = (
11011101
sa.select(sa.func.count())
11021102
.select_from(SystemSchema.workflow_queue)
1103+
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
11031104
.where(
11041105
SystemSchema.workflow_queue.c.started_at_epoch_ms.isnot(None)
11051106
)
@@ -1167,6 +1168,7 @@ def start_queued_workflows(self, queue: "Queue") -> List[str]:
11671168
c.execute(
11681169
sa.delete(SystemSchema.workflow_queue)
11691170
.where(SystemSchema.workflow_queue.c.completed_at_epoch_ms != None)
1171+
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
11701172
.where(
11711173
SystemSchema.workflow_queue.c.started_at_epoch_ms
11721174
< start_time_ms - limiter_period_ms

tests/test_queue.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,82 @@ def test_workflow(var1: str, var2: str) -> float:
241241

242242
# Verify all queue entries eventually get cleaned up.
243243
assert queue_entries_are_cleaned_up(dbos)
244+
245+
246+
def test_multiple_queues(dbos: DBOS) -> None:
247+
248+
wf_counter = 0
249+
flag = False
250+
workflow_event = threading.Event()
251+
main_thread_event = threading.Event()
252+
253+
@DBOS.workflow()
254+
def workflow_one() -> None:
255+
nonlocal wf_counter
256+
wf_counter += 1
257+
main_thread_event.set()
258+
workflow_event.wait()
259+
260+
@DBOS.workflow()
261+
def workflow_two() -> None:
262+
nonlocal flag
263+
flag = True
264+
265+
concurrency_queue = Queue("test_concurrency_queue", 1)
266+
handle1 = concurrency_queue.enqueue(workflow_one)
267+
assert handle1.get_status().queue_name == "test_concurrency_queue"
268+
handle2 = concurrency_queue.enqueue(workflow_two)
269+
270+
@DBOS.workflow()
271+
def limited_workflow(var1: str, var2: str) -> float:
272+
assert var1 == "abc" and var2 == "123"
273+
return time.time()
274+
275+
limit = 5
276+
period = 2
277+
limiter_queue = Queue(
278+
"test_limit_queue", limiter={"limit": limit, "period": period}
279+
)
280+
281+
handles: list[WorkflowHandle[float]] = []
282+
times: list[float] = []
283+
284+
# Launch a number of tasks equal to three times the limit.
285+
# This should lead to three "waves" of the limit tasks being
286+
# executed simultaneously, followed by a wait of the period,
287+
# followed by the next wave.
288+
num_waves = 3
289+
for _ in range(limit * num_waves):
290+
h = limiter_queue.enqueue(limited_workflow, "abc", "123")
291+
handles.append(h)
292+
for h in handles:
293+
times.append(h.get_result())
294+
295+
# Verify that each "wave" of tasks started at the ~same time.
296+
for wave in range(num_waves):
297+
for i in range(wave * limit, (wave + 1) * limit - 1):
298+
assert times[i + 1] - times[i] < 0.2
299+
300+
# Verify that the gap between "waves" is ~equal to the period
301+
for wave in range(num_waves - 1):
302+
assert times[limit * (wave + 1)] - times[limit * wave] > period - 0.2
303+
assert times[limit * (wave + 1)] - times[limit * wave] < period + 0.2
304+
305+
# Verify all workflows get the SUCCESS status eventually
306+
dbos._sys_db.wait_for_buffer_flush()
307+
for h in handles:
308+
assert h.get_status().status == WorkflowStatusString.SUCCESS.value
309+
310+
# Verify that during all this time, the second task
311+
# was not launched on the concurrency-limited queue.
312+
# Then, finish the first task and verify the second
313+
# task runs on schedule.
314+
assert not flag
315+
workflow_event.set()
316+
assert handle1.get_result() == None
317+
assert handle2.get_result() == None
318+
assert flag
319+
assert wf_counter == 1
320+
321+
# Verify all queue entries eventually get cleaned up.
322+
assert queue_entries_are_cleaned_up(dbos)

0 commit comments

Comments
 (0)