Skip to content

Commit ecad764

Browse files
committed
decode on the concurrent execution fallback path
1 parent f945e3b commit ecad764

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

dbos/workflow.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -959,9 +959,25 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
959959
// Handle DBOS ID conflict errors by waiting workflow result
960960
if errors.Is(err, &DBOSError{Code: ConflictingIDError}) {
961961
c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID)
962-
result, err = retryWithResult(c, func() (any, error) {
962+
var encodedResult any
963+
encodedResult, err = retryWithResult(c, func() (any, error) {
963964
return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID)
964965
}, withRetrierLogger(c.logger))
966+
var deserErr error
967+
encodedResultString, ok := encodedResult.(string)
968+
if !ok {
969+
c.logger.Error("Unexpected result type when awaiting workflow result after ID conflict", "workflow_id", workflowID, "type", fmt.Sprintf("%T", encodedResult))
970+
outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("unexpected result type when awaiting workflow result after ID conflict: expected string, got %T", encodedResult)}
971+
close(outcomeChan)
972+
return
973+
}
974+
result, deserErr = deserialize(&encodedResultString)
975+
if deserErr != nil {
976+
c.logger.Error("Failed to deserialize workflow result after ID conflict", "workflow_id", workflowID, "error", deserErr)
977+
outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("failed to deserialize workflow result after ID conflict: %w", deserErr)}
978+
close(outcomeChan)
979+
return
980+
}
965981
} else {
966982
status := WorkflowStatusSuccess
967983

0 commit comments

Comments
 (0)