Skip to content

Commit 029997c

Browse files
authored
Use daemon threads for background threads (#132)
Otherwise, DBOS programs would hang forever when the main thread exits, because the background threads were using the thread pool and running.
1 parent 9bec7cb commit 029997c

File tree

1 file changed

+29
-6
lines changed

1 file changed

+29
-6
lines changed

dbos/dbos.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ def __init__(
277277
self.fastapi: Optional["FastAPI"] = fastapi
278278
self.flask: Optional["Flask"] = flask
279279
self._executor_field: Optional[ThreadPoolExecutor] = None
280+
self._background_threads: List[threading.Thread] = []
280281

281282
# If using FastAPI, set up middleware and lifecycle events
282283
if self.fastapi is not None:
@@ -358,20 +359,38 @@ def _launch(self) -> None:
358359
self._executor.submit(_startup_recovery_thread, self, workflow_ids)
359360

360361
# Listen to notifications
361-
self._executor.submit(self._sys_db._notification_listener)
362+
notification_listener_thread = threading.Thread(
363+
target=self._sys_db._notification_listener,
364+
daemon=True,
365+
)
366+
notification_listener_thread.start()
367+
self._background_threads.append(notification_listener_thread)
362368

363369
# Start flush workflow buffers thread
364-
self._executor.submit(self._sys_db.flush_workflow_buffers)
370+
flush_workflow_buffers_thread = threading.Thread(
371+
target=self._sys_db.flush_workflow_buffers,
372+
daemon=True,
373+
)
374+
flush_workflow_buffers_thread.start()
375+
self._background_threads.append(flush_workflow_buffers_thread)
365376

366377
# Start the queue thread
367378
evt = threading.Event()
368379
self.stop_events.append(evt)
369-
self._executor.submit(queue_thread, evt, self)
380+
bg_queue_thread = threading.Thread(
381+
target=queue_thread, args=(evt, self), daemon=True
382+
)
383+
bg_queue_thread.start()
384+
self._background_threads.append(bg_queue_thread)
370385

371386
# Grab any pollers that were deferred and start them
372387
for evt, func, args, kwargs in self._registry.pollers:
373388
self.stop_events.append(evt)
374-
self._executor.submit(func, *args, **kwargs)
389+
poller_thread = threading.Thread(
390+
target=func, args=args, kwargs=kwargs, daemon=True
391+
)
392+
poller_thread.start()
393+
self._background_threads.append(poller_thread)
375394
self._registry.pollers = []
376395

377396
dbos_logger.info("DBOS launched")
@@ -403,6 +422,8 @@ def _destroy(self) -> None:
403422
if self._executor_field is not None:
404423
self._executor_field.shutdown(cancel_futures=True)
405424
self._executor_field = None
425+
for bg_thread in self._background_threads:
426+
bg_thread.join()
406427

407428
@classmethod
408429
def register_instance(cls, inst: object) -> None:
@@ -832,7 +853,7 @@ def __init__(self, config_name: str) -> None:
832853

833854
# Apps that import DBOS probably don't exit. If they do, let's see if
834855
# it looks like startup was abandoned or a call was forgotten...
835-
def log_exit_info() -> None:
856+
def dbos_exit_hook() -> None:
836857
if _dbos_global_registry is None:
837858
# Probably used as or for a support module
838859
return
@@ -846,7 +867,9 @@ def log_exit_info() -> None:
846867
print("DBOS exiting; DBOS exists but launch() was not called")
847868
dbos_logger.warning("DBOS exiting; DBOS exists but launch() was not called")
848869
return
870+
# If we get here, we're exiting normally
871+
_dbos_global_instance.destroy()
849872

850873

851874
# Register the exit hook
852-
atexit.register(log_exit_info)
875+
atexit.register(dbos_exit_hook)

0 commit comments

Comments
 (0)