Skip to content

Commit 97562ec

Browse files
authored
Update Workflow Status (#201)
Refactor `update_workflow_status`, eliminating its incomprehensible flags by splitting it into two separate functions. - `insert_workflow_status` inserts a new workflow status, is called when starting a workflow, and contains code for tracking workflow attempts and for validating the workflow was called properly. - `update_workflow_status` updates the status of an existing workflow, is called when terminating a workflow to record its final status and output, and contains no extraneous code. This does not change behavior or semantics.
1 parent ddc59a8 commit 97562ec

File tree

2 files changed

+79
-54
lines changed

2 files changed

+79
-54
lines changed

dbos/_core.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ def _init_workflow(
185185
# Synchronously record the status and inputs for workflows and single-step workflows
186186
# We also have to do this for single-step workflows because of the foreign key constraint on the operation outputs table
187187
# TODO: Make this transactional (and with the queue step below)
188-
wf_status = dbos._sys_db.update_workflow_status(
189-
status, False, max_recovery_attempts=max_recovery_attempts
188+
wf_status = dbos._sys_db.insert_workflow_status(
189+
status, max_recovery_attempts=max_recovery_attempts
190190
)
191191
# TODO: Modify the inputs if they were changed by `update_workflow_inputs`
192192
dbos._sys_db.update_workflow_inputs(wfid, _serialization.serialize_args(inputs))

dbos/_sys_db.py

Lines changed: 77 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -243,70 +243,50 @@ def wait_for_buffer_flush(self) -> None:
243243
dbos_logger.debug("Waiting for system buffers to be exported")
244244
time.sleep(1)
245245

246-
def update_workflow_status(
246+
def insert_workflow_status(
247247
self,
248248
status: WorkflowStatusInternal,
249-
replace: bool = True,
250249
*,
251-
conn: Optional[sa.Connection] = None,
252250
max_recovery_attempts: int = DEFAULT_MAX_RECOVERY_ATTEMPTS,
253-
is_status_flush: bool = False,
254251
) -> WorkflowStatuses:
255252
wf_status: WorkflowStatuses = status["status"]
256253

257-
cmd = pg.insert(SystemSchema.workflow_status).values(
258-
workflow_uuid=status["workflow_uuid"],
259-
status=status["status"],
260-
name=status["name"],
261-
class_name=status["class_name"],
262-
config_name=status["config_name"],
263-
output=status["output"],
264-
error=status["error"],
265-
executor_id=status["executor_id"],
266-
application_version=status["app_version"],
267-
application_id=status["app_id"],
268-
request=status["request"],
269-
authenticated_user=status["authenticated_user"],
270-
authenticated_roles=status["authenticated_roles"],
271-
assumed_role=status["assumed_role"],
272-
queue_name=status["queue_name"],
273-
recovery_attempts=(
274-
1 if wf_status != WorkflowStatusString.ENQUEUED.value else 0
275-
),
276-
)
277-
if replace:
278-
cmd = cmd.on_conflict_do_update(
279-
index_elements=["workflow_uuid"],
280-
set_=dict(
281-
status=status["status"],
282-
output=status["output"],
283-
error=status["error"],
284-
recovery_attempts=(
285-
SystemSchema.workflow_status.c.recovery_attempts + 1
286-
if not is_status_flush
287-
else SystemSchema.workflow_status.c.recovery_attempts
288-
),
254+
cmd = (
255+
pg.insert(SystemSchema.workflow_status)
256+
.values(
257+
workflow_uuid=status["workflow_uuid"],
258+
status=status["status"],
259+
name=status["name"],
260+
class_name=status["class_name"],
261+
config_name=status["config_name"],
262+
output=status["output"],
263+
error=status["error"],
264+
executor_id=status["executor_id"],
265+
application_version=status["app_version"],
266+
application_id=status["app_id"],
267+
request=status["request"],
268+
authenticated_user=status["authenticated_user"],
269+
authenticated_roles=status["authenticated_roles"],
270+
assumed_role=status["assumed_role"],
271+
queue_name=status["queue_name"],
272+
recovery_attempts=(
273+
1 if wf_status != WorkflowStatusString.ENQUEUED.value else 0
289274
),
290275
)
291-
else:
292-
cmd = cmd.on_conflict_do_update(
276+
.on_conflict_do_update(
293277
index_elements=["workflow_uuid"],
294278
set_=dict(
295279
recovery_attempts=(
296280
SystemSchema.workflow_status.c.recovery_attempts + 1
297-
if not is_status_flush
298-
else SystemSchema.workflow_status.c.recovery_attempts
299281
),
300282
),
301283
)
284+
)
302285

303286
cmd = cmd.returning(SystemSchema.workflow_status.c.recovery_attempts, SystemSchema.workflow_status.c.status, SystemSchema.workflow_status.c.name, SystemSchema.workflow_status.c.class_name, SystemSchema.workflow_status.c.config_name, SystemSchema.workflow_status.c.queue_name) # type: ignore
304287

305-
if conn is not None:
306-
results = conn.execute(cmd)
307-
else:
308-
with self.engine.begin() as c:
309-
results = c.execute(cmd)
288+
with self.engine.begin() as c:
289+
results = c.execute(cmd)
310290

311291
row = results.fetchone()
312292
if row is not None:
@@ -329,9 +309,8 @@ def update_workflow_status(
329309
if err_msg is not None:
330310
raise DBOSConflictingWorkflowError(status["workflow_uuid"], err_msg)
331311

332-
# recovery_attempt means "attempts" (we kept the name for backward compatibility). It's default value is 1.
333-
# Every time we init the status, we increment `recovery_attempts` by 1.
334-
# Thus, when this number becomes equal to `maxRetries + 1`, we should mark the workflow as `RETRIES_EXCEEDED`.
312+
# Every time we start executing a workflow (and thus attempt to insert its status), we increment `recovery_attempts` by 1.
313+
# When this number becomes equal to `maxRetries + 1`, we mark the workflow as `RETRIES_EXCEEDED`.
335314
if recovery_attempts > max_recovery_attempts + 1:
336315
with self.engine.begin() as c:
337316
c.execute(
@@ -359,12 +338,58 @@ def update_workflow_status(
359338
status["workflow_uuid"], max_recovery_attempts
360339
)
361340

362-
# Record we have exported status for this single-transaction workflow
341+
return wf_status
342+
343+
def update_workflow_status(
344+
self,
345+
status: WorkflowStatusInternal,
346+
*,
347+
conn: Optional[sa.Connection] = None,
348+
) -> None:
349+
wf_status: WorkflowStatuses = status["status"]
350+
351+
cmd = (
352+
pg.insert(SystemSchema.workflow_status)
353+
.values(
354+
workflow_uuid=status["workflow_uuid"],
355+
status=status["status"],
356+
name=status["name"],
357+
class_name=status["class_name"],
358+
config_name=status["config_name"],
359+
output=status["output"],
360+
error=status["error"],
361+
executor_id=status["executor_id"],
362+
application_version=status["app_version"],
363+
application_id=status["app_id"],
364+
request=status["request"],
365+
authenticated_user=status["authenticated_user"],
366+
authenticated_roles=status["authenticated_roles"],
367+
assumed_role=status["assumed_role"],
368+
queue_name=status["queue_name"],
369+
recovery_attempts=(
370+
1 if wf_status != WorkflowStatusString.ENQUEUED.value else 0
371+
),
372+
)
373+
.on_conflict_do_update(
374+
index_elements=["workflow_uuid"],
375+
set_=dict(
376+
status=status["status"],
377+
output=status["output"],
378+
error=status["error"],
379+
),
380+
)
381+
)
382+
383+
if conn is not None:
384+
conn.execute(cmd)
385+
else:
386+
with self.engine.begin() as c:
387+
c.execute(cmd)
388+
389+
# If this is a single-transaction workflow, record that its status has been exported
363390
if status["workflow_uuid"] in self._temp_txn_wf_ids:
364391
self._exported_temp_txn_wf_status.add(status["workflow_uuid"])
365392

366-
return wf_status
367-
368393
def set_workflow_status(
369394
self,
370395
workflow_uuid: str,
@@ -1057,7 +1082,7 @@ def _flush_workflow_status_buffer(self) -> None:
10571082
continue
10581083
exported_status[wf_id] = status
10591084
try:
1060-
self.update_workflow_status(status, conn=c, is_status_flush=True)
1085+
self.update_workflow_status(status, conn=c)
10611086
exported += 1
10621087
except Exception as e:
10631088
dbos_logger.error(f"Error while flushing status buffer: {e}")

0 commit comments

Comments
 (0)