Skip to content

Commit 1887532

Browse files
authored
Better Scheduler (#484)
Update the scheduler to enqueue a task only after a short random interval has passed, and only if the task is not already enqueued. This prevents the problem in a distributed setting of hundreds of workers attempting to enqueue a scheduled task simultaneously (which is safe but not performant).
1 parent 4651b40 commit 1887532

File tree

2 files changed

+31
-21
lines changed

2 files changed

+31
-21
lines changed

dbos/_scheduler.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
import threading
23
import traceback
34
from datetime import datetime, timezone
@@ -15,28 +16,40 @@
1516

1617
ScheduledWorkflow = Callable[[datetime, datetime], None]
1718

18-
scheduler_queue: Queue
19-
2019

2120
def scheduler_loop(
2221
func: ScheduledWorkflow, cron: str, stop_event: threading.Event
2322
) -> None:
23+
from dbos._dbos import _get_dbos_instance
24+
25+
dbos = _get_dbos_instance()
26+
scheduler_queue = dbos._registry.get_internal_queue()
2427
try:
2528
iter = croniter(cron, datetime.now(timezone.utc), second_at_beginning=True)
26-
except Exception as e:
29+
except Exception:
2730
dbos_logger.error(
2831
f'Cannot run scheduled function {get_dbos_func_name(func)}. Invalid crontab "{cron}"'
2932
)
33+
raise
3034
while not stop_event.is_set():
31-
nextExecTime = iter.get_next(datetime)
32-
sleepTime = nextExecTime - datetime.now(timezone.utc)
33-
if stop_event.wait(timeout=sleepTime.total_seconds()):
35+
next_exec_time = iter.get_next(datetime)
36+
sleep_time = (next_exec_time - datetime.now(timezone.utc)).total_seconds()
37+
sleep_time = max(0, sleep_time)
38+
# To prevent a "thundering herd" problem in a distributed setting,
39+
# apply jitter of up to 10% the sleep time, capped at 10 seconds
40+
max_jitter = min(sleep_time / 10, 10)
41+
jitter = random.uniform(0, max_jitter)
42+
if stop_event.wait(timeout=sleep_time + jitter):
3443
return
3544
try:
36-
with SetWorkflowID(
37-
f"sched-{get_dbos_func_name(func)}-{nextExecTime.isoformat()}"
38-
):
39-
scheduler_queue.enqueue(func, nextExecTime, datetime.now(timezone.utc))
45+
workflowID = (
46+
f"sched-{get_dbos_func_name(func)}-{next_exec_time.isoformat()}"
47+
)
48+
if not dbos._sys_db.get_workflow_status(workflowID):
49+
with SetWorkflowID(workflowID):
50+
scheduler_queue.enqueue(
51+
func, next_exec_time, datetime.now(timezone.utc)
52+
)
4053
except Exception:
4154
dbos_logger.warning(
4255
f"Exception encountered in scheduler thread: {traceback.format_exc()})"
@@ -49,13 +62,10 @@ def scheduled(
4962
def decorator(func: ScheduledWorkflow) -> ScheduledWorkflow:
5063
try:
5164
croniter(cron, datetime.now(timezone.utc), second_at_beginning=True)
52-
except Exception as e:
65+
except Exception:
5366
raise ValueError(
5467
f'Invalid crontab "{cron}" for scheduled function function {get_dbos_func_name(func)}.'
5568
)
56-
57-
global scheduler_queue
58-
scheduler_queue = dbosreg.get_internal_queue()
5969
stop_event = threading.Event()
6070
dbosreg.register_poller(stop_event, scheduler_loop, func, cron, stop_event)
6171
return func

tests/test_scheduler.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def test_workflow(scheduled: datetime, actual: datetime) -> None:
102102
wf_counter += 1
103103

104104
time.sleep(5)
105-
assert wf_counter > 2 and wf_counter <= 5
105+
assert wf_counter > 1 and wf_counter <= 5
106106

107107

108108
def test_appdb_downtime(dbos: DBOS, skip_with_sqlite: None) -> None:
@@ -123,7 +123,7 @@ def test_workflow(scheduled: datetime, actual: datetime) -> None:
123123
time.sleep(2)
124124
assert dbos._app_db
125125
simulate_db_restart(dbos._app_db.engine, 2)
126-
time.sleep(2)
126+
time.sleep(3)
127127
assert wf_counter > 2
128128

129129

@@ -138,7 +138,7 @@ def test_workflow(scheduled: datetime, actual: datetime) -> None:
138138

139139
time.sleep(2)
140140
simulate_db_restart(dbos._sys_db.engine, 2)
141-
time.sleep(2)
141+
time.sleep(3)
142142
# We know there should be at least 2 occurrences from the 4 seconds when the DB was up.
143143
# There could be more than 4, depending on the pace the machine...
144144
assert wf_counter >= 2
@@ -154,7 +154,7 @@ def test_transaction(scheduled: datetime, actual: datetime) -> None:
154154
txn_counter += 1
155155

156156
time.sleep(5)
157-
assert txn_counter > 2 and txn_counter <= 5
157+
assert txn_counter > 1 and txn_counter <= 5
158158

159159

160160
def test_scheduled_step(dbos: DBOS) -> None:
@@ -205,8 +205,8 @@ def test_transaction() -> None:
205205
nonlocal txn_counter
206206
txn_counter += 1
207207

208-
time.sleep(3)
209-
assert wf_counter >= 1 and wf_counter <= 3
208+
time.sleep(4)
209+
assert wf_counter >= 1 and wf_counter <= 4
210210
max_tries = 10
211211
for i in range(max_tries):
212212
try:
@@ -223,7 +223,7 @@ def test_transaction() -> None:
223223
evt.set()
224224

225225
# Wait for workflows to finish
226-
time.sleep(2)
226+
time.sleep(3)
227227

228228
dbos._sys_db.update_workflow_outcome(workflow_id, "PENDING")
229229

0 commit comments

Comments
 (0)