diff --git a/dbos/errors.go b/dbos/errors.go index 57679af..982cf3b 100644 --- a/dbos/errors.go +++ b/dbos/errors.go @@ -55,6 +55,16 @@ func (e *DBOSError) Unwrap() error { return e.wrappedErr } +// Implements https://pkg.go.dev/errors#Is +func (e *DBOSError) Is(target error) bool { + t, ok := target.(*DBOSError) + if !ok { + return false + } + // Match if codes are equal (and target code is set) + return t.Code != 0 && e.Code == t.Code +} + func newConflictingWorkflowError(workflowID, message string) *DBOSError { msg := fmt.Sprintf("Conflicting workflow invocation with the same ID (%s)", workflowID) if message != "" { @@ -147,12 +157,13 @@ func newWorkflowExecutionError(workflowID string, err error) *DBOSError { } } -func newStepExecutionError(workflowID, stepName, message string) *DBOSError { +func newStepExecutionError(workflowID, stepName string, err error) *DBOSError { return &DBOSError{ - Message: fmt.Sprintf("Step %s in workflow %s execution error: %s", stepName, workflowID, message), + Message: fmt.Sprintf("Step %s in workflow %s execution error: %v", stepName, workflowID, err), Code: StepExecutionError, WorkflowID: workflowID, StepName: stepName, + wrappedErr: err, } } diff --git a/dbos/system_database.go b/dbos/system_database.go index 01a4ef1..ac1abc4 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1211,8 +1211,7 @@ type recordOperationResultDBInput struct { func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error { query := fmt.Sprintf(`INSERT INTO %s.operation_outputs (workflow_uuid, function_id, output, error, function_name) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT DO NOTHING`, pgx.Identifier{s.schema}.Sanitize()) + VALUES ($1, $2, $3, $4, $5)`, pgx.Identifier{s.schema}.Sanitize()) var errorString *string if input.err != nil { @@ -1532,11 +1531,11 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err // Get workflow state from context wfState, ok := ctx.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { - return 0, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") + return 0, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?")) } if wfState.isWithinStep { - return 0, newStepExecutionError(wfState.workflowID, functionName, "cannot call Sleep within a step") + return 0, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Sleep within a step")) } // Determine step ID @@ -1749,7 +1748,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error { if ok && wfState != nil { isInWorkflow = true if wfState.isWithinStep { - return newStepExecutionError(wfState.workflowID, functionName, "cannot call Send within a step") + return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Send within a step")) } stepID = wfState.nextStepID() } @@ -1832,11 +1831,11 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) { // Get workflow state from context wfState, ok := ctx.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { - return nil, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") + return nil, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?")) } if wfState.isWithinStep { - return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call Recv within a step") + return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Recv within a step")) } stepID := wfState.nextStepID() @@ -1988,11 +1987,11 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error // Get workflow state from context wfState, ok := ctx.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { - return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") + return newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?")) } if wfState.isWithinStep { - return newStepExecutionError(wfState.workflowID, functionName, "cannot call SetEvent within a step") + return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call SetEvent within a step")) } stepID := wfState.nextStepID() @@ -2071,7 +2070,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) if ok && wfState != nil { isInWorkflow = true if wfState.isWithinStep { - return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call GetEvent within a step") + return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call GetEvent within a step")) } stepID = wfState.nextStepID() sleepStepID = wfState.nextStepID() // We will use a sleep step to implement the timeout diff --git a/dbos/workflow.go b/dbos/workflow.go index ea5a359..5117933 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -742,7 +742,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt // Prevent spawning child workflows from within a step if isChildWorkflow && parentWorkflowState.isWithinStep { - return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, "cannot spawn child workflow from within a step") + return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, fmt.Errorf("cannot spawn child workflow from within a step")) } if isChildWorkflow { @@ -935,8 +935,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt result, err = fn(workflowCtx, input) // Handle DBOS ID conflict errors by waiting workflow result - var dbosErr *DBOSError - if errors.As(err, &dbosErr) && dbosErr.Code == ConflictingIDError { + if errors.Is(err, &DBOSError{Code: ConflictingIDError}) { c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID) result, err = retryWithResult(c, func() (any, error) { return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID) @@ -1098,11 +1097,11 @@ func WithMaxInterval(interval time.Duration) StepOption { // Under the hood, DBOS uses the provided context to manage durable execution. func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { if ctx == nil { - return *new(R), newStepExecutionError("", "", "ctx cannot be nil") + return *new(R), newStepExecutionError("", "", fmt.Errorf("ctx cannot be nil")) } if fn == nil { - return *new(R), newStepExecutionError("", "", "step function cannot be nil") + return *new(R), newStepExecutionError("", "", fmt.Errorf("step function cannot be nil")) } // Register the output type for gob encoding @@ -1144,12 +1143,12 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) // Get workflow state from context wfState, ok := c.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { - return nil, newStepExecutionError("", stepOpts.stepName, "workflow state not found in context: are you running this step within a workflow?") + return nil, newStepExecutionError("", stepOpts.stepName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?")) } // This should not happen when called from the package-level RunAsStep if fn == nil { - return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, "step function cannot be nil") + return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, fmt.Errorf("step function cannot be nil")) } // If within a step, just run the function directly @@ -1176,7 +1175,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) }) }, withRetrierLogger(c.logger)) if err != nil { - return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("checking operation execution: %v", err)) + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("checking operation execution: %w", err)) } if recordedOutput != nil { return recordedOutput.output, recordedOutput.err @@ -1205,7 +1204,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) // Wait before retry select { case <-c.Done(): - return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("context cancelled during retry: %v", c.Err())) + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("context cancelled during retry: %w", c.Err())) case <-time.After(delay): // Continue to retry } @@ -1241,7 +1240,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return c.systemDB.recordOperationResult(uncancellableCtx, dbInput) }, withRetrierLogger(c.logger)) if recErr != nil { - return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("recording step outcome: %v", recErr)) + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, recErr) } return stepOutput, stepError diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 8756efd..f5672ec 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1285,6 +1285,7 @@ func TestWorkflowRecovery(t *testing.T) { recoveryCounters []int64 recoveryEvents []*Event blockingEvents []*Event + secondStepErrors []error ) recoveryWorkflow := func(dbosCtx DBOSContext, index int) (int64, error) { @@ -1306,6 +1307,7 @@ func TestWorkflowRecovery(t *testing.T) { return fmt.Sprintf("completed-%d", index), nil }, WithStepName(fmt.Sprintf("BlockingStep-%d", index))) if err != nil { + secondStepErrors = append(secondStepErrors, err) return 0, err } @@ -1324,6 +1326,7 @@ func TestWorkflowRecovery(t *testing.T) { recoveryCounters = make([]int64, numWorkflows) recoveryEvents = make([]*Event, numWorkflows) blockingEvents = make([]*Event, numWorkflows) + secondStepErrors = make([]error, 0) // Create events for each workflow for i := range numWorkflows { @@ -1425,6 +1428,16 @@ func TestWorkflowRecovery(t *testing.T) { assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i) assert.Nil(t, steps[1].Error, "workflow %d second step should not have error", i) } + + // At least 5 of the 2nd steps should have errored due to execution race + // Check they are DBOSErrors with StepExecutionError wrapping a ConflictingIDError + require.GreaterOrEqual(t, len(secondStepErrors), 5, "expected at least 5 errors from second steps due to recovery race, got %d", len(secondStepErrors)) + for _, err := range secondStepErrors { + dbosErr, ok := err.(*DBOSError) + require.True(t, ok, "expected error to be of type *DBOSError, got %T", err) + require.Equal(t, StepExecutionError, dbosErr.Code, "expected error code to be StepExecutionError, got %v", dbosErr.Code) + require.True(t, errors.Is(dbosErr.Unwrap(), &DBOSError{Code: ConflictingIDError}), "expected underlying error to be ConflictingIDError, got %T", dbosErr.Unwrap()) + } }) } @@ -4149,9 +4162,10 @@ func TestGarbageCollect(t *testing.T) { found = true require.Equal(t, WorkflowStatusPending, wf.Status, "blocked workflow should still be pending") } - if wf.Status == WorkflowStatusPending { + switch wf.Status { + case WorkflowStatusPending: pendingCount++ - } else if wf.Status == WorkflowStatusSuccess { + case WorkflowStatusSuccess: completedCount++ } }