Skip to content

Commit f07760a

Browse files
qianl15kraftp
andauthored
Fix Deactivate (#327) (#328)
Co-authored-by: Peter Kraft <[email protected]>
1 parent 106fc2c commit f07760a

File tree

6 files changed

+72
-14
lines changed

6 files changed

+72
-14
lines changed

dbos/_admin_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ def do_GET(self) -> None:
6666
elif self.path == _deactivate_path:
6767
if not AdminRequestHandler.is_deactivated:
6868
dbos_logger.info(
69-
f"Deactivating DBOS executor {GlobalParams.executor_id} with version {GlobalParams.app_version}. This executor will complete existing workflows but will not start new workflows."
69+
f"Deactivating DBOS executor {GlobalParams.executor_id} with version {GlobalParams.app_version}. This executor will complete existing workflows but will not create new workflows."
7070
)
7171
AdminRequestHandler.is_deactivated = True
72-
# Stop all scheduled workflows, queues, and kafka loops
73-
for event in self.dbos.stop_events:
72+
# Stop all event receivers (scheduler and Kafka threads)
73+
for event in self.dbos.poller_stop_events:
7474
event.set()
7575
self.send_response(200)
7676
self._end_headers()

dbos/_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ def _init_workflow(
296296

297297
if workflow_deadline_epoch_ms is not None:
298298
evt = threading.Event()
299-
dbos.stop_events.append(evt)
299+
dbos.background_thread_stop_events.append(evt)
300300

301301
def timeout_func() -> None:
302302
try:

dbos/_dbos.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def register_poller(
197197
self, evt: threading.Event, func: Callable[..., Any], *args: Any, **kwargs: Any
198198
) -> None:
199199
if self.dbos and self.dbos._launched:
200-
self.dbos.stop_events.append(evt)
200+
self.dbos.poller_stop_events.append(evt)
201201
self.dbos._executor.submit(func, *args, **kwargs)
202202
else:
203203
self.pollers.append((evt, func, args, kwargs))
@@ -330,7 +330,10 @@ def __init__(
330330
self._registry: DBOSRegistry = _get_or_create_dbos_registry()
331331
self._registry.dbos = self
332332
self._admin_server_field: Optional[AdminServer] = None
333-
self.stop_events: List[threading.Event] = []
333+
# Stop internal background threads (queue thread, timeout threads, etc.)
334+
self.background_thread_stop_events: List[threading.Event] = []
335+
# Stop pollers (event receivers) that can create new workflows (scheduler, Kafka)
336+
self.poller_stop_events: List[threading.Event] = []
334337
self.fastapi: Optional["FastAPI"] = fastapi
335338
self.flask: Optional["Flask"] = flask
336339
self._executor_field: Optional[ThreadPoolExecutor] = None
@@ -502,7 +505,7 @@ def _launch(self, *, debug_mode: bool = False) -> None:
502505

503506
# Start the queue thread
504507
evt = threading.Event()
505-
self.stop_events.append(evt)
508+
self.background_thread_stop_events.append(evt)
506509
bg_queue_thread = threading.Thread(
507510
target=queue_thread, args=(evt, self), daemon=True
508511
)
@@ -515,7 +518,7 @@ def _launch(self, *, debug_mode: bool = False) -> None:
515518
dbos_domain = os.environ.get("DBOS_DOMAIN", "cloud.dbos.dev")
516519
self.conductor_url = f"wss://{dbos_domain}/conductor/v1alpha1"
517520
evt = threading.Event()
518-
self.stop_events.append(evt)
521+
self.background_thread_stop_events.append(evt)
519522
self.conductor_websocket = ConductorWebsocket(
520523
self,
521524
conductor_url=self.conductor_url,
@@ -527,7 +530,7 @@ def _launch(self, *, debug_mode: bool = False) -> None:
527530

528531
# Grab any pollers that were deferred and start them
529532
for evt, func, args, kwargs in self._registry.pollers:
530-
self.stop_events.append(evt)
533+
self.poller_stop_events.append(evt)
531534
poller_thread = threading.Thread(
532535
target=func, args=args, kwargs=kwargs, daemon=True
533536
)
@@ -583,7 +586,9 @@ def _reset_system_database(self) -> None:
583586

584587
def _destroy(self) -> None:
585588
self._initialized = False
586-
for event in self.stop_events:
589+
for event in self.poller_stop_events:
590+
event.set()
591+
for event in self.background_thread_stop_events:
587592
event.set()
588593
self._background_event_loop.stop()
589594
if self._sys_db_field is not None:

dbos/_recovery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def startup_recovery_thread(
2929
) -> None:
3030
"""Attempt to recover local pending workflows on startup using a background thread."""
3131
stop_event = threading.Event()
32-
dbos.stop_events.append(stop_event)
32+
dbos.background_thread_stop_events.append(stop_event)
3333
while not stop_event.is_set() and len(pending_workflows) > 0:
3434
try:
3535
for pending_workflow in list(pending_workflows):

tests/test_admin_server.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
import time
55
import uuid
6+
from datetime import datetime
67

78
import pytest
89
import requests
@@ -65,8 +66,59 @@ def test_admin_endpoints(dbos: DBOS) -> None:
6566
response = requests.get("http://localhost:3001/deactivate", timeout=5)
6667
assert response.status_code == 200
6768

68-
for event in dbos.stop_events:
69-
assert event.is_set(), "Event is not set!"
69+
for event in dbos.poller_stop_events:
70+
assert event.is_set()
71+
72+
73+
def test_deactivate(dbos: DBOS, config: ConfigFile) -> None:
74+
wf_counter: int = 0
75+
76+
queue = Queue("example-queue")
77+
78+
@DBOS.scheduled("* * * * * *")
79+
@DBOS.workflow()
80+
def test_workflow(scheduled: datetime, actual: datetime) -> None:
81+
nonlocal wf_counter
82+
wf_counter += 1
83+
84+
@DBOS.workflow()
85+
def regular_workflow() -> int:
86+
return 5
87+
88+
# Let the scheduled workflow run
89+
time.sleep(2)
90+
val = wf_counter
91+
assert val > 0
92+
# Deactivate--scheduled workflow should stop
93+
response = requests.get("http://localhost:3001/deactivate", timeout=5)
94+
assert response.status_code == 200
95+
for event in dbos.poller_stop_events:
96+
assert event.is_set()
97+
# Verify the scheduled workflow does not run anymore
98+
time.sleep(3)
99+
assert wf_counter <= val + 1
100+
# Enqueue a workflow, verify it still runs
101+
assert queue.enqueue(regular_workflow).get_result() == 5
102+
103+
# Test deferred event receivers
104+
DBOS.destroy(destroy_registry=True)
105+
dbos = DBOS(config=config)
106+
107+
@DBOS.scheduled("* * * * * *")
108+
@DBOS.workflow()
109+
def deferred_workflow(scheduled: datetime, actual: datetime) -> None:
110+
nonlocal wf_counter
111+
wf_counter += 1
112+
113+
DBOS.launch()
114+
assert len(dbos.poller_stop_events) > 0
115+
for event in dbos.poller_stop_events:
116+
assert not event.is_set()
117+
# Deactivate--scheduled workflow should stop
118+
response = requests.get("http://localhost:3001/deactivate", timeout=5)
119+
assert response.status_code == 200
120+
for event in dbos.poller_stop_events:
121+
assert event.is_set()
70122

71123

72124
def test_admin_recovery(config: ConfigFile) -> None:

tests/test_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ def test_transaction() -> None:
202202
time.sleep(1)
203203

204204
# Stop the scheduled workflow
205-
dbos.stop_events[0].set()
205+
for evt in dbos.poller_stop_events:
206+
evt.set()
206207

207208
dbos._sys_db.update_workflow_status(
208209
{

0 commit comments

Comments
 (0)