Skip to content

Commit b064b3f

Browse files
authored
Reproducer and test for #139 - py scheduler not handling DB outage (#144)
1 parent d7cbb1d commit b064b3f

File tree

2 files changed

+126
-1
lines changed

2 files changed

+126
-1
lines changed

dbos/_scheduler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ def scheduler_loop(
3131
if stop_event.wait(timeout=sleepTime.total_seconds()):
3232
return
3333
with SetWorkflowID(f"sched-{func.__qualname__}-{nextExecTime.isoformat()}"):
34-
scheduler_queue.enqueue(func, nextExecTime, datetime.now(timezone.utc))
34+
try:
35+
scheduler_queue.enqueue(func, nextExecTime, datetime.now(timezone.utc))
36+
except Exception as e:
37+
dbos_logger.warning(f"Error scheduling workflow: ", e)
3538

3639

3740
def scheduled(

tests/test_scheduler.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,95 @@
22
from datetime import datetime
33

44
import pytest
5+
from sqlalchemy import create_engine, text
6+
from sqlalchemy.engine import Engine
57

68
# Public API
79
from dbos import DBOS
810

911

12+
def simulate_db_restart(engine: Engine, downtime: float) -> None:
13+
# Get DB name
14+
with engine.connect() as connection:
15+
current_db = connection.execute(text("SELECT current_database()")).scalar()
16+
17+
# Goal here is to disable connections to the DB for a while.
18+
# Need a temp DB to do that and recover connectivity...
19+
temp_db_name = "temp_database_for_maintenance"
20+
21+
# Retrieve the URL of the current engine
22+
main_db_url = engine.url
23+
24+
# Modify the URL to point to the temporary database
25+
temp_db_url = main_db_url.set(database=temp_db_name)
26+
27+
try:
28+
with engine.connect().execution_options(
29+
isolation_level="AUTOCOMMIT"
30+
) as connection:
31+
# Create a temporary database
32+
connection.execute(text(f"CREATE DATABASE {temp_db_name};"))
33+
print(f"Temporary database '{temp_db_name}' created.")
34+
except Exception as e:
35+
print("Could not create temp db: ", e)
36+
37+
temp_engine = create_engine(temp_db_url)
38+
with temp_engine.connect().execution_options(
39+
isolation_level="AUTOCOMMIT"
40+
) as temp_connection:
41+
try:
42+
# Disable new connections to the database
43+
temp_connection.execute(
44+
text(f"ALTER DATABASE {current_db} WITH ALLOW_CONNECTIONS false;")
45+
)
46+
47+
# Terminate all connections except the current one
48+
temp_connection.execute(
49+
text(
50+
f"""
51+
SELECT pg_terminate_backend(pid)
52+
FROM pg_stat_activity
53+
WHERE pid <> pg_backend_pid()
54+
AND datname = '{current_db}';
55+
"""
56+
)
57+
)
58+
except Exception as e:
59+
print(f"Could not disable db {current_db}: ", e)
60+
61+
time.sleep(downtime)
62+
63+
# Re-enable new connections
64+
try:
65+
temp_connection.execute(
66+
text(f"ALTER DATABASE {current_db} WITH ALLOW_CONNECTIONS true;")
67+
)
68+
except Exception as e:
69+
print(f"Could not reenable db {current_db}: ", e)
70+
temp_engine.dispose()
71+
72+
try:
73+
with engine.connect().execution_options(
74+
isolation_level="AUTOCOMMIT"
75+
) as connection:
76+
# Clean up the temp DB
77+
connection.execute(
78+
text(
79+
f"""
80+
SELECT pg_terminate_backend(pid)
81+
FROM pg_stat_activity
82+
WHERE pid <> pg_backend_pid()
83+
AND datname = '{temp_db_name}';
84+
"""
85+
)
86+
)
87+
88+
# Drop temporary database
89+
connection.execute(text(f"DROP DATABASE {temp_db_name};"))
90+
except Exception as e:
91+
print(f"Could not clean up temp db {temp_db_name}: ", e)
92+
93+
1094
def test_scheduled_workflow(dbos: DBOS) -> None:
1195
wf_counter: int = 0
1296

@@ -20,6 +104,44 @@ def test_workflow(scheduled: datetime, actual: datetime) -> None:
20104
assert wf_counter > 2 and wf_counter <= 4
21105

22106

107+
def test_appdb_downtime(dbos: DBOS) -> None:
108+
wf_counter: int = 0
109+
110+
@DBOS.transaction()
111+
def test_transaction(var2: str) -> str:
112+
rows = DBOS.sql_session.execute(text("SELECT 1")).fetchall()
113+
return "ran"
114+
115+
@DBOS.scheduled("* * * * * *")
116+
@DBOS.workflow()
117+
def test_workflow(scheduled: datetime, actual: datetime) -> None:
118+
nonlocal wf_counter
119+
test_transaction("x")
120+
wf_counter += 1
121+
122+
time.sleep(2)
123+
simulate_db_restart(dbos._app_db.engine, 2)
124+
time.sleep(2)
125+
assert wf_counter > 2
126+
127+
128+
def test_sysdb_downtime(dbos: DBOS) -> None:
129+
wf_counter: int = 0
130+
131+
@DBOS.scheduled("* * * * * *")
132+
@DBOS.workflow()
133+
def test_workflow(scheduled: datetime, actual: datetime) -> None:
134+
nonlocal wf_counter
135+
wf_counter += 1
136+
137+
time.sleep(2)
138+
simulate_db_restart(dbos._sys_db.engine, 2)
139+
time.sleep(2)
140+
# We know there should be at least 3 occurrences from the 4 seconds when the DB was up.
141+
# There could be more than 4, depending on the pace the machine...
142+
assert wf_counter > 2
143+
144+
23145
def test_scheduled_transaction(dbos: DBOS) -> None:
24146
txn_counter: int = 0
25147

0 commit comments

Comments
 (0)