Skip to content

Commit 67d1d9e

Browse files
authored
Improve Errors For Unregistered Functions and Classes (#429)
1 parent 9d66c8c commit 67d1d9e

File tree

7 files changed

+82
-21
lines changed

7 files changed

+82
-21
lines changed

dbos/_context.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,3 +753,34 @@ def __exit__(
753753
assert ctx.assumed_role == self.assume_role
754754
ctx.assumed_role = self.prior_role
755755
return False # Did not handle
756+
757+
758+
class UseLogAttributes:
759+
"""Temporarily set context attributes for logging"""
760+
761+
def __init__(self, *, workflow_id: str = "") -> None:
762+
self.workflow_id = workflow_id
763+
self.created_ctx = False
764+
765+
def __enter__(self) -> UseLogAttributes:
766+
ctx = get_local_dbos_context()
767+
if ctx is None:
768+
self.created_ctx = True
769+
_set_local_dbos_context(DBOSContext())
770+
ctx = assert_current_dbos_context()
771+
self.saved_workflow_id = ctx.workflow_id
772+
ctx.workflow_id = self.workflow_id
773+
return self
774+
775+
def __exit__(
776+
self,
777+
exc_type: Optional[Type[BaseException]],
778+
exc_value: Optional[BaseException],
779+
traceback: Optional[TracebackType],
780+
) -> Literal[False]:
781+
ctx = assert_current_dbos_context()
782+
ctx.workflow_id = self.saved_workflow_id
783+
# Clean up the basic context if we created it
784+
if self.created_ctx:
785+
_clear_local_dbos_context()
786+
return False # Did not handle

dbos/_core.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import sys
66
import threading
77
import time
8-
import traceback
98
from concurrent.futures import Future
109
from functools import wraps
1110
from typing import (
@@ -66,7 +65,6 @@
6665
get_dbos_func_name,
6766
get_func_info,
6867
get_or_create_func_info,
69-
get_temp_workflow_type,
7068
set_dbos_func_name,
7169
set_func_info,
7270
set_temp_workflow_type,
@@ -452,7 +450,7 @@ def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[An
452450
if not wf_func:
453451
raise DBOSWorkflowFunctionNotFoundError(
454452
workflow_id,
455-
f"Cannot execute workflow because {status['name']} is not a registered workflow function",
453+
f"{status['name']} is not a registered workflow function",
456454
)
457455
with DBOSContextEnsure():
458456
# If this function belongs to a configured class, add that class instance as its first argument
@@ -463,7 +461,7 @@ def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[An
463461
if iname not in dbos._registry.instance_info_map:
464462
raise DBOSWorkflowFunctionNotFoundError(
465463
workflow_id,
466-
f"Cannot execute workflow because instance '{iname}' is not registered",
464+
f"configured class instance '{iname}' is not registered",
467465
)
468466
class_instance = dbos._registry.instance_info_map[iname]
469467
inputs["args"] = (class_instance,) + inputs["args"]
@@ -473,7 +471,7 @@ def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[An
473471
if class_name not in dbos._registry.class_info_map:
474472
raise DBOSWorkflowFunctionNotFoundError(
475473
workflow_id,
476-
f"Cannot execute workflow because class '{class_name}' is not registered",
474+
f"class '{class_name}' is not registered",
477475
)
478476
class_object = dbos._registry.class_info_map[class_name]
479477
inputs["args"] = (class_object,) + inputs["args"]
@@ -534,7 +532,7 @@ def start_workflow(
534532
if fi is None:
535533
raise DBOSWorkflowFunctionNotFoundError(
536534
"<NONE>",
537-
f"start_workflow: function {func.__name__} is not registered",
535+
f"{func.__name__} is not a registered workflow function",
538536
)
539537

540538
func = cast("Workflow[P, R]", func.__orig_func) # type: ignore
@@ -630,7 +628,7 @@ async def start_workflow_async(
630628
if fi is None:
631629
raise DBOSWorkflowFunctionNotFoundError(
632630
"<NONE>",
633-
f"start_workflow: function {func.__name__} is not registered",
631+
f"{func.__name__} is not a registered workflow function",
634632
)
635633

636634
func = cast("Workflow[P, R]", func.__orig_func) # type: ignore

dbos/_dbos.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,10 @@ def register_poller(
211211
def register_instance(self, inst: object) -> None:
212212
config_name = getattr(inst, "config_name")
213213
class_name = _class_fqn(inst.__class__)
214+
if self.dbos and self.dbos._launched:
215+
dbos_logger.warning(
216+
f"Configured instance {config_name} of class {class_name} was registered after DBOS was launched. This may cause errors during workflow recovery. All configured instances should be instantiated before DBOS is launched."
217+
)
214218
fn = f"{class_name}/{config_name}"
215219
if fn in self.instance_info_map:
216220
if self.instance_info_map[fn] is not inst:

dbos/_error.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class DBOSWorkflowFunctionNotFoundError(DBOSException):
106106

107107
def __init__(self, workflow_id: str, message: Optional[str] = None):
108108
super().__init__(
109-
f"Workflow function not found for workflow ID {workflow_id}: {message}",
109+
f"Could not execute workflow {workflow_id}: {message}",
110110
dbos_error_code=DBOSErrorCode.WorkflowFunctionNotFound.value,
111111
)
112112

dbos/_recovery.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import time
33
from typing import TYPE_CHECKING, Any, List
44

5+
from dbos._context import UseLogAttributes
56
from dbos._utils import GlobalParams
67

78
from ._core import execute_workflow_by_id
@@ -29,17 +30,19 @@ def startup_recovery_thread(
2930
stop_event = threading.Event()
3031
dbos.background_thread_stop_events.append(stop_event)
3132
while not stop_event.is_set() and len(pending_workflows) > 0:
32-
try:
33-
for pending_workflow in list(pending_workflows):
33+
for pending_workflow in list(pending_workflows):
34+
try:
3435
_recover_workflow(dbos, pending_workflow)
3536
pending_workflows.remove(pending_workflow)
36-
except DBOSWorkflowFunctionNotFoundError:
37-
time.sleep(1)
38-
except Exception as e:
39-
dbos.logger.error(
40-
f"Exception encountered when recovering workflows:", exc_info=e
41-
)
42-
raise
37+
except DBOSWorkflowFunctionNotFoundError:
38+
time.sleep(1)
39+
except Exception as e:
40+
with UseLogAttributes(workflow_id=pending_workflow.workflow_uuid):
41+
dbos.logger.error(
42+
f"Exception encountered when recovering workflow {pending_workflow.workflow_uuid}:",
43+
exc_info=e,
44+
)
45+
raise
4346

4447

4548
def recover_pending_workflows(
@@ -56,9 +59,11 @@ def recover_pending_workflows(
5659
handle = _recover_workflow(dbos, pending_workflow)
5760
workflow_handles.append(handle)
5861
except Exception as e:
59-
dbos.logger.error(
60-
f"Exception encountered when recovering workflows:", exc_info=e
61-
)
62+
with UseLogAttributes(workflow_id=pending_workflow.workflow_uuid):
63+
dbos.logger.error(
64+
f"Exception encountered when recovering workflow {pending_workflow.workflow_uuid}:",
65+
exc_info=e,
66+
)
6267
raise
6368
dbos.logger.info(
6469
f"Recovering {len(pending_workflows)} workflows for executor {executor_id} from version {GlobalParams.app_version}"

dbos/_registrations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def get_dbos_func_name(f: Any) -> str:
1313
if hasattr(f, "dbos_function_name"):
1414
return str(getattr(f, "dbos_function_name"))
1515
raise DBOSWorkflowFunctionNotFoundError(
16-
"<NONE>", f"function {f.__name__} is not registered"
16+
"<NONE>", f"{f.__name__} is not a registered workflow function"
1717
)
1818

1919

tests/test_failures.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
from sqlalchemy.exc import InvalidRequestError, OperationalError
99

1010
from dbos import DBOS, Queue, SetWorkflowID
11+
from dbos._dbos_config import DBOSConfig
1112
from dbos._error import (
1213
DBOSAwaitedWorkflowCancelledError,
1314
DBOSMaxStepRetriesExceeded,
1415
DBOSNotAuthorizedError,
1516
DBOSQueueDeduplicatedError,
1617
DBOSUnexpectedStepError,
18+
DBOSWorkflowFunctionNotFoundError,
1719
MaxRecoveryAttemptsExceededError,
1820
)
1921
from dbos._registrations import DEFAULT_MAX_RECOVERY_ATTEMPTS
@@ -499,3 +501,24 @@ def __init__(self, one: int, two: int) -> None:
499501
assert output is None
500502
assert isinstance(exception, str)
501503
assert "Message: 1, 2" in exception
504+
505+
506+
def test_unregistered_workflow(dbos: DBOS, config: DBOSConfig) -> None:
507+
508+
@DBOS.workflow()
509+
def workflow() -> None:
510+
return
511+
512+
wfid = str(uuid.uuid4())
513+
with SetWorkflowID(wfid):
514+
workflow()
515+
516+
dbos._sys_db.update_workflow_outcome(wfid, "PENDING")
517+
518+
DBOS.destroy(destroy_registry=True)
519+
config["executor_id"] = str(uuid.uuid4())
520+
DBOS(config=config)
521+
DBOS.launch()
522+
523+
with pytest.raises(DBOSWorkflowFunctionNotFoundError):
524+
DBOS._recover_pending_workflows()

0 commit comments

Comments
 (0)