Skip to content

Commit 66f3f1e

Browse files
committed
handle ConflictingIDError during workflow execution
1 parent 7f38dcf commit 66f3f1e

File tree

1 file changed

+35
-26
lines changed

1 file changed

+35
-26
lines changed

dbos/workflow.go

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -826,35 +826,44 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
826826
c.workflowsWg.Add(1)
827827
go func() {
828828
defer c.workflowsWg.Done()
829-
result, err := fn(workflowCtx, input)
830-
status := WorkflowStatusSuccess
831829

832-
// If an error occurred, set the status to error
833-
if err != nil {
834-
status = WorkflowStatusError
835-
}
830+
var result any
831+
var err error
836832

837-
// If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled
838-
if stopFunc != nil && !stopFunc() {
839-
c.logger.Info("Workflow was cancelled. Waiting for cancel function to complete", "workflow_id", workflowID)
840-
// Wait for the cancel function to complete
841-
// Note this must happen before we write on the outcome channel (and signal the handler's GetResult)
842-
<-cancelFuncCompleted
843-
// Set the status to cancelled and move on so we still record the outcome in the DB
844-
status = WorkflowStatusCancelled
845-
}
833+
result, err = fn(workflowCtx, input)
846834

847-
recordErr := c.systemDB.updateWorkflowOutcome(uncancellableCtx, updateWorkflowOutcomeDBInput{
848-
workflowID: workflowID,
849-
status: status,
850-
err: err,
851-
output: result,
852-
})
853-
if recordErr != nil {
854-
c.logger.Error("Error recording workflow outcome", "workflow_id", workflowID, "error", recordErr)
855-
outcomeChan <- workflowOutcome[any]{result: nil, err: recordErr}
856-
close(outcomeChan)
857-
return
835+
// Handle DBOS ID conflict errors by waiting workflow result
836+
var dbosErr *DBOSError
837+
if errors.As(err, &dbosErr) && dbosErr.Code == ConflictingIDError {
838+
c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID)
839+
result, err = c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID)
840+
} else {
841+
status := WorkflowStatusSuccess
842+
843+
// If an error occurred, set the status to error
844+
if err != nil {
845+
status = WorkflowStatusError
846+
}
847+
848+
// If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled
849+
if stopFunc != nil && !stopFunc() {
850+
c.logger.Info("Workflow was cancelled. Waiting for cancel function to complete", "workflow_id", workflowID)
851+
<-cancelFuncCompleted // Wait for the cancel function to complete
852+
status = WorkflowStatusCancelled
853+
}
854+
855+
recordErr := c.systemDB.updateWorkflowOutcome(uncancellableCtx, updateWorkflowOutcomeDBInput{
856+
workflowID: workflowID,
857+
status: status,
858+
err: err,
859+
output: result,
860+
})
861+
if recordErr != nil {
862+
c.logger.Error("Error recording workflow outcome", "workflow_id", workflowID, "error", recordErr)
863+
outcomeChan <- workflowOutcome[any]{result: nil, err: recordErr}
864+
close(outcomeChan)
865+
return
866+
}
858867
}
859868
outcomeChan <- workflowOutcome[any]{result: result, err: err}
860869
close(outcomeChan)

0 commit comments

Comments
 (0)