Skip to content

Commit ddc59a8

Browse files
authored
remove explicit notion of recovery (#198)
This PR simplifies the logic for putting a workflow in the dead letter queue. Specifically, it changes the meaning of the `recovery_attempt` column in the `dbos.workflow_status` table to `attempts`*. The default of the column is now 1. `update_workflow_status` just verifies that (number of attempts) <= max_retries before placing a workflow in the DLQ. [*] This PR does not change the column name, to facilitate backward compatibility.
1 parent 023c748 commit ddc59a8

File tree

8 files changed

+75
-155
lines changed

8 files changed

+75
-155
lines changed

dbos/_context.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def __init__(self) -> None:
6363
self.parent_workflow_fid: int = -1
6464
self.workflow_id: str = ""
6565
self.function_id: int = -1
66-
self.in_recovery: bool = False
6766

6867
self.curr_step_function_id: int = -1
6968
self.curr_tx_function_id: int = -1
@@ -82,7 +81,6 @@ def create_child(self) -> DBOSContext:
8281
rv.is_within_set_workflow_id_block = self.is_within_set_workflow_id_block
8382
rv.parent_workflow_id = self.workflow_id
8483
rv.parent_workflow_fid = self.function_id
85-
rv.in_recovery = self.in_recovery
8684
rv.authenticated_user = self.authenticated_user
8785
rv.authenticated_roles = (
8886
self.authenticated_roles[:]
@@ -335,34 +333,6 @@ def __exit__(
335333
return False # Did not handle
336334

337335

338-
class SetWorkflowRecovery:
339-
def __init__(self) -> None:
340-
self.created_ctx = False
341-
342-
def __enter__(self) -> SetWorkflowRecovery:
343-
# Code to create a basic context
344-
ctx = get_local_dbos_context()
345-
if ctx is None:
346-
self.created_ctx = True
347-
_set_local_dbos_context(DBOSContext())
348-
assert_current_dbos_context().in_recovery = True
349-
350-
return self
351-
352-
def __exit__(
353-
self,
354-
exc_type: Optional[Type[BaseException]],
355-
exc_value: Optional[BaseException],
356-
traceback: Optional[TracebackType],
357-
) -> Literal[False]:
358-
assert assert_current_dbos_context().in_recovery == True
359-
assert_current_dbos_context().in_recovery = False
360-
# Code to clean up the basic context if we created it
361-
if self.created_ctx:
362-
_clear_local_dbos_context()
363-
return False # Did not handle
364-
365-
366336
class EnterDBOSWorkflow(AbstractContextManager[DBOSContext, Literal[False]]):
367337
def __init__(self, attributes: TracedAttributes) -> None:
368338
self.created_ctx = False

dbos/_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def _init_workflow(
186186
# We also have to do this for single-step workflows because of the foreign key constraint on the operation outputs table
187187
# TODO: Make this transactional (and with the queue step below)
188188
wf_status = dbos._sys_db.update_workflow_status(
189-
status, False, ctx.in_recovery, max_recovery_attempts=max_recovery_attempts
189+
status, False, max_recovery_attempts=max_recovery_attempts
190190
)
191191
# TODO: Modify the inputs if they were changed by `update_workflow_inputs`
192192
dbos._sys_db.update_workflow_inputs(wfid, _serialization.serialize_args(inputs))

dbos/_dbos.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ def recover_pending_workflows(
801801
def cancel_workflow(cls, workflow_id: str) -> None:
802802
"""Cancel a workflow by ID."""
803803
_get_dbos_instance()._sys_db.set_workflow_status(
804-
workflow_id, WorkflowStatusString.CANCELLED, False
804+
workflow_id, WorkflowStatusString.CANCELLED
805805
)
806806

807807
@classmethod

dbos/_recovery.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import traceback
55
from typing import TYPE_CHECKING, Any, List
66

7-
from ._context import SetWorkflowRecovery
87
from ._core import execute_workflow_by_id
98
from ._error import DBOSWorkflowFunctionNotFoundError
109

@@ -19,8 +18,7 @@ def startup_recovery_thread(dbos: "DBOS", workflow_ids: List[str]) -> None:
1918
while not stop_event.is_set() and len(workflow_ids) > 0:
2019
try:
2120
for workflowID in list(workflow_ids):
22-
with SetWorkflowRecovery():
23-
execute_workflow_by_id(dbos, workflowID)
21+
execute_workflow_by_id(dbos, workflowID)
2422
workflow_ids.remove(workflowID)
2523
except DBOSWorkflowFunctionNotFoundError:
2624
time.sleep(1)
@@ -45,8 +43,7 @@ def recover_pending_workflows(
4543
dbos.logger.debug(f"Pending workflows: {workflow_ids}")
4644

4745
for workflowID in workflow_ids:
48-
with SetWorkflowRecovery():
49-
handle = execute_workflow_by_id(dbos, workflowID)
46+
handle = execute_workflow_by_id(dbos, workflowID)
5047
workflow_handles.append(handle)
5148

5249
dbos.logger.info("Recovered pending workflows")

dbos/_sys_db.py

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,10 @@ def update_workflow_status(
247247
self,
248248
status: WorkflowStatusInternal,
249249
replace: bool = True,
250-
in_recovery: bool = False,
251250
*,
252251
conn: Optional[sa.Connection] = None,
253252
max_recovery_attempts: int = DEFAULT_MAX_RECOVERY_ATTEMPTS,
253+
is_status_flush: bool = False,
254254
) -> WorkflowStatuses:
255255
wf_status: WorkflowStatuses = status["status"]
256256

@@ -270,6 +270,9 @@ def update_workflow_status(
270270
authenticated_roles=status["authenticated_roles"],
271271
assumed_role=status["assumed_role"],
272272
queue_name=status["queue_name"],
273+
recovery_attempts=(
274+
1 if wf_status != WorkflowStatusString.ENQUEUED.value else 0
275+
),
273276
)
274277
if replace:
275278
cmd = cmd.on_conflict_do_update(
@@ -278,24 +281,25 @@ def update_workflow_status(
278281
status=status["status"],
279282
output=status["output"],
280283
error=status["error"],
281-
),
282-
)
283-
elif in_recovery:
284-
cmd = cmd.on_conflict_do_update(
285-
index_elements=["workflow_uuid"],
286-
set_=dict(
287-
recovery_attempts=SystemSchema.workflow_status.c.recovery_attempts
288-
+ 1,
284+
recovery_attempts=(
285+
SystemSchema.workflow_status.c.recovery_attempts + 1
286+
if not is_status_flush
287+
else SystemSchema.workflow_status.c.recovery_attempts
288+
),
289289
),
290290
)
291291
else:
292-
# A blank update so that we can return the existing status
293292
cmd = cmd.on_conflict_do_update(
294293
index_elements=["workflow_uuid"],
295294
set_=dict(
296-
recovery_attempts=SystemSchema.workflow_status.c.recovery_attempts
295+
recovery_attempts=(
296+
SystemSchema.workflow_status.c.recovery_attempts + 1
297+
if not is_status_flush
298+
else SystemSchema.workflow_status.c.recovery_attempts
299+
),
297300
),
298301
)
302+
299303
cmd = cmd.returning(SystemSchema.workflow_status.c.recovery_attempts, SystemSchema.workflow_status.c.status, SystemSchema.workflow_status.c.name, SystemSchema.workflow_status.c.class_name, SystemSchema.workflow_status.c.config_name, SystemSchema.workflow_status.c.queue_name) # type: ignore
300304

301305
if conn is not None:
@@ -325,7 +329,10 @@ def update_workflow_status(
325329
if err_msg is not None:
326330
raise DBOSConflictingWorkflowError(status["workflow_uuid"], err_msg)
327331

328-
if in_recovery and recovery_attempts > max_recovery_attempts:
332+
# recovery_attempt means "attempts" (we kept the name for backward compatibility). It's default value is 1.
333+
# Every time we init the status, we increment `recovery_attempts` by 1.
334+
# Thus, when this number becomes equal to `maxRetries + 1`, we should mark the workflow as `RETRIES_EXCEEDED`.
335+
if recovery_attempts > max_recovery_attempts + 1:
329336
with self.engine.begin() as c:
330337
c.execute(
331338
sa.delete(SystemSchema.workflow_queue).where(
@@ -362,7 +369,6 @@ def set_workflow_status(
362369
self,
363370
workflow_uuid: str,
364371
status: WorkflowStatusString,
365-
reset_recovery_attempts: bool,
366372
) -> None:
367373
with self.engine.begin() as c:
368374
stmt = (
@@ -374,17 +380,6 @@ def set_workflow_status(
374380
)
375381
c.execute(stmt)
376382

377-
if reset_recovery_attempts:
378-
with self.engine.begin() as c:
379-
stmt = (
380-
sa.update(SystemSchema.workflow_status)
381-
.where(
382-
SystemSchema.workflow_status.c.workflow_uuid == workflow_uuid
383-
)
384-
.values(recovery_attempts=reset_recovery_attempts)
385-
)
386-
c.execute(stmt)
387-
388383
def get_workflow_status(
389384
self, workflow_uuid: str
390385
) -> Optional[WorkflowStatusInternal]:
@@ -1062,7 +1057,7 @@ def _flush_workflow_status_buffer(self) -> None:
10621057
continue
10631058
exported_status[wf_id] = status
10641059
try:
1065-
self.update_workflow_status(status, conn=c)
1060+
self.update_workflow_status(status, conn=c, is_status_flush=True)
10661061
exported += 1
10671062
except Exception as e:
10681063
dbos_logger.error(f"Error while flushing status buffer: {e}")

dbos/_workflow_commands.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def _cancel_workflow(config: ConfigFile, uuid: str) -> None:
116116

117117
try:
118118
sys_db = SystemDatabase(config)
119-
sys_db.set_workflow_status(uuid, WorkflowStatusString.CANCELLED, False)
119+
sys_db.set_workflow_status(uuid, WorkflowStatusString.CANCELLED)
120120
return
121121

122122
except Exception as e:

0 commit comments

Comments
 (0)