Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions dbos/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ func (e *DBOSError) Unwrap() error {
return e.wrappedErr
}

// Implements https://pkg.go.dev/errors#Is
func (e *DBOSError) Is(target error) bool {
t, ok := target.(*DBOSError)
if !ok {
return false
}
// Match if codes are equal (and target code is set)
return t.Code != 0 && e.Code == t.Code
}

func newConflictingWorkflowError(workflowID, message string) *DBOSError {
msg := fmt.Sprintf("Conflicting workflow invocation with the same ID (%s)", workflowID)
if message != "" {
Expand Down Expand Up @@ -147,12 +157,13 @@ func newWorkflowExecutionError(workflowID string, err error) *DBOSError {
}
}

func newStepExecutionError(workflowID, stepName, message string) *DBOSError {
func newStepExecutionError(workflowID, stepName string, err error) *DBOSError {
return &DBOSError{
Message: fmt.Sprintf("Step %s in workflow %s execution error: %s", stepName, workflowID, message),
Message: fmt.Sprintf("Step %s in workflow %s execution error: %v", stepName, workflowID, err),
Code: StepExecutionError,
WorkflowID: workflowID,
StepName: stepName,
wrappedErr: err,
}
}

Expand Down
19 changes: 9 additions & 10 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,8 +1211,7 @@ type recordOperationResultDBInput struct {
func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error {
query := fmt.Sprintf(`INSERT INTO %s.operation_outputs
(workflow_uuid, function_id, output, error, function_name)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING`, pgx.Identifier{s.schema}.Sanitize())
VALUES ($1, $2, $3, $4, $5)`, pgx.Identifier{s.schema}.Sanitize())

var errorString *string
if input.err != nil {
Expand Down Expand Up @@ -1532,11 +1531,11 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
// Get workflow state from context
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return 0, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
return 0, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
}

if wfState.isWithinStep {
return 0, newStepExecutionError(wfState.workflowID, functionName, "cannot call Sleep within a step")
return 0, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Sleep within a step"))
}

// Determine step ID
Expand Down Expand Up @@ -1749,7 +1748,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
if ok && wfState != nil {
isInWorkflow = true
if wfState.isWithinStep {
return newStepExecutionError(wfState.workflowID, functionName, "cannot call Send within a step")
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Send within a step"))
}
stepID = wfState.nextStepID()
}
Expand Down Expand Up @@ -1832,11 +1831,11 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
// Get workflow state from context
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return nil, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
return nil, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
}

if wfState.isWithinStep {
return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call Recv within a step")
return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Recv within a step"))
}

stepID := wfState.nextStepID()
Expand Down Expand Up @@ -1988,11 +1987,11 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
// Get workflow state from context
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
return newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
}

if wfState.isWithinStep {
return newStepExecutionError(wfState.workflowID, functionName, "cannot call SetEvent within a step")
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call SetEvent within a step"))
}

stepID := wfState.nextStepID()
Expand Down Expand Up @@ -2071,7 +2070,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
if ok && wfState != nil {
isInWorkflow = true
if wfState.isWithinStep {
return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call GetEvent within a step")
return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call GetEvent within a step"))
}
stepID = wfState.nextStepID()
sleepStepID = wfState.nextStepID() // We will use a sleep step to implement the timeout
Expand Down
19 changes: 9 additions & 10 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt

// Prevent spawning child workflows from within a step
if isChildWorkflow && parentWorkflowState.isWithinStep {
return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, "cannot spawn child workflow from within a step")
return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, fmt.Errorf("cannot spawn child workflow from within a step"))
}

if isChildWorkflow {
Expand Down Expand Up @@ -935,8 +935,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
result, err = fn(workflowCtx, input)

// Handle DBOS ID conflict errors by waiting workflow result
var dbosErr *DBOSError
if errors.As(err, &dbosErr) && dbosErr.Code == ConflictingIDError {
if errors.Is(err, &DBOSError{Code: ConflictingIDError}) {
c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID)
result, err = retryWithResult(c, func() (any, error) {
return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID)
Expand Down Expand Up @@ -1098,11 +1097,11 @@ func WithMaxInterval(interval time.Duration) StepOption {
// Under the hood, DBOS uses the provided context to manage durable execution.
func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) {
if ctx == nil {
return *new(R), newStepExecutionError("", "", "ctx cannot be nil")
return *new(R), newStepExecutionError("", "", fmt.Errorf("ctx cannot be nil"))
}

if fn == nil {
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
return *new(R), newStepExecutionError("", "", fmt.Errorf("step function cannot be nil"))
}

// Register the output type for gob encoding
Expand Down Expand Up @@ -1144,12 +1143,12 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
// Get workflow state from context
wfState, ok := c.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return nil, newStepExecutionError("", stepOpts.stepName, "workflow state not found in context: are you running this step within a workflow?")
return nil, newStepExecutionError("", stepOpts.stepName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
}

// This should not happen when called from the package-level RunAsStep
if fn == nil {
return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, "step function cannot be nil")
return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, fmt.Errorf("step function cannot be nil"))
}

// If within a step, just run the function directly
Expand All @@ -1176,7 +1175,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
})
}, withRetrierLogger(c.logger))
if err != nil {
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("checking operation execution: %v", err))
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("checking operation execution: %w", err))
}
if recordedOutput != nil {
return recordedOutput.output, recordedOutput.err
Expand Down Expand Up @@ -1205,7 +1204,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
// Wait before retry
select {
case <-c.Done():
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("context cancelled during retry: %v", c.Err()))
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("context cancelled during retry: %w", c.Err()))
case <-time.After(delay):
// Continue to retry
}
Expand Down Expand Up @@ -1241,7 +1240,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
return c.systemDB.recordOperationResult(uncancellableCtx, dbInput)
}, withRetrierLogger(c.logger))
if recErr != nil {
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("recording step outcome: %v", recErr))
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, recErr)
}

return stepOutput, stepError
Expand Down
5 changes: 3 additions & 2 deletions dbos/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4149,9 +4149,10 @@ func TestGarbageCollect(t *testing.T) {
found = true
require.Equal(t, WorkflowStatusPending, wf.Status, "blocked workflow should still be pending")
}
if wf.Status == WorkflowStatusPending {
switch wf.Status {
case WorkflowStatusPending:
pendingCount++
} else if wf.Status == WorkflowStatusSuccess {
case WorkflowStatusSuccess:
completedCount++
}
}
Expand Down
Loading