Skip to content

Commit 676e320

Browse files
committed
remove on do nothing clause, fix step execution error to wrap error, implement errors.Is
1 parent c1f41a6 commit 676e320

File tree

4 files changed

+34
-24
lines changed

4 files changed

+34
-24
lines changed

dbos/errors.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ func (e *DBOSError) Unwrap() error {
5555
return e.wrappedErr
5656
}
5757

58+
// Implements https://pkg.go.dev/errors#Is
59+
func (e *DBOSError) Is(target error) bool {
60+
t, ok := target.(*DBOSError)
61+
if !ok {
62+
return false
63+
}
64+
// Match if codes are equal (and target code is set)
65+
return t.Code != 0 && e.Code == t.Code
66+
}
67+
5868
func newConflictingWorkflowError(workflowID, message string) *DBOSError {
5969
msg := fmt.Sprintf("Conflicting workflow invocation with the same ID (%s)", workflowID)
6070
if message != "" {
@@ -147,12 +157,13 @@ func newWorkflowExecutionError(workflowID string, err error) *DBOSError {
147157
}
148158
}
149159

150-
func newStepExecutionError(workflowID, stepName, message string) *DBOSError {
160+
func newStepExecutionError(workflowID, stepName string, err error) *DBOSError {
151161
return &DBOSError{
152-
Message: fmt.Sprintf("Step %s in workflow %s execution error: %s", stepName, workflowID, message),
162+
Message: fmt.Sprintf("Step %s in workflow %s execution error: %v", stepName, workflowID, err),
153163
Code: StepExecutionError,
154164
WorkflowID: workflowID,
155165
StepName: stepName,
166+
wrappedErr: err,
156167
}
157168
}
158169

dbos/system_database.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,8 +1211,7 @@ type recordOperationResultDBInput struct {
12111211
func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error {
12121212
query := fmt.Sprintf(`INSERT INTO %s.operation_outputs
12131213
(workflow_uuid, function_id, output, error, function_name)
1214-
VALUES ($1, $2, $3, $4, $5)
1215-
ON CONFLICT DO NOTHING`, pgx.Identifier{s.schema}.Sanitize())
1214+
VALUES ($1, $2, $3, $4, $5)`, pgx.Identifier{s.schema}.Sanitize())
12161215

12171216
var errorString *string
12181217
if input.err != nil {
@@ -1532,11 +1531,11 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
15321531
// Get workflow state from context
15331532
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
15341533
if !ok || wfState == nil {
1535-
return 0, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
1534+
return 0, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
15361535
}
15371536

15381537
if wfState.isWithinStep {
1539-
return 0, newStepExecutionError(wfState.workflowID, functionName, "cannot call Sleep within a step")
1538+
return 0, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Sleep within a step"))
15401539
}
15411540

15421541
// Determine step ID
@@ -1749,7 +1748,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
17491748
if ok && wfState != nil {
17501749
isInWorkflow = true
17511750
if wfState.isWithinStep {
1752-
return newStepExecutionError(wfState.workflowID, functionName, "cannot call Send within a step")
1751+
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Send within a step"))
17531752
}
17541753
stepID = wfState.nextStepID()
17551754
}
@@ -1832,11 +1831,11 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
18321831
// Get workflow state from context
18331832
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
18341833
if !ok || wfState == nil {
1835-
return nil, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
1834+
return nil, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
18361835
}
18371836

18381837
if wfState.isWithinStep {
1839-
return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call Recv within a step")
1838+
return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Recv within a step"))
18401839
}
18411840

18421841
stepID := wfState.nextStepID()
@@ -1988,11 +1987,11 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
19881987
// Get workflow state from context
19891988
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
19901989
if !ok || wfState == nil {
1991-
return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
1990+
return newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
19921991
}
19931992

19941993
if wfState.isWithinStep {
1995-
return newStepExecutionError(wfState.workflowID, functionName, "cannot call SetEvent within a step")
1994+
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call SetEvent within a step"))
19961995
}
19971996

19981997
stepID := wfState.nextStepID()
@@ -2071,7 +2070,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
20712070
if ok && wfState != nil {
20722071
isInWorkflow = true
20732072
if wfState.isWithinStep {
2074-
return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call GetEvent within a step")
2073+
return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call GetEvent within a step"))
20752074
}
20762075
stepID = wfState.nextStepID()
20772076
sleepStepID = wfState.nextStepID() // We will use a sleep step to implement the timeout

dbos/workflow.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
742742

743743
// Prevent spawning child workflows from within a step
744744
if isChildWorkflow && parentWorkflowState.isWithinStep {
745-
return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, "cannot spawn child workflow from within a step")
745+
return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, fmt.Errorf("cannot spawn child workflow from within a step"))
746746
}
747747

748748
if isChildWorkflow {
@@ -935,8 +935,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
935935
result, err = fn(workflowCtx, input)
936936

937937
// Handle DBOS ID conflict errors by waiting workflow result
938-
var dbosErr *DBOSError
939-
if errors.As(err, &dbosErr) && dbosErr.Code == ConflictingIDError {
938+
if errors.Is(err, &DBOSError{Code: ConflictingIDError}) {
940939
c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID)
941940
result, err = retryWithResult(c, func() (any, error) {
942941
return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID)
@@ -1098,11 +1097,11 @@ func WithMaxInterval(interval time.Duration) StepOption {
10981097
// Under the hood, DBOS uses the provided context to manage durable execution.
10991098
func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) {
11001099
if ctx == nil {
1101-
return *new(R), newStepExecutionError("", "", "ctx cannot be nil")
1100+
return *new(R), newStepExecutionError("", "", fmt.Errorf("ctx cannot be nil"))
11021101
}
11031102

11041103
if fn == nil {
1105-
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
1104+
return *new(R), newStepExecutionError("", "", fmt.Errorf("step function cannot be nil"))
11061105
}
11071106

11081107
// Register the output type for gob encoding
@@ -1144,12 +1143,12 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
11441143
// Get workflow state from context
11451144
wfState, ok := c.Value(workflowStateKey).(*workflowState)
11461145
if !ok || wfState == nil {
1147-
return nil, newStepExecutionError("", stepOpts.stepName, "workflow state not found in context: are you running this step within a workflow?")
1146+
return nil, newStepExecutionError("", stepOpts.stepName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
11481147
}
11491148

11501149
// This should not happen when called from the package-level RunAsStep
11511150
if fn == nil {
1152-
return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, "step function cannot be nil")
1151+
return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, fmt.Errorf("step function cannot be nil"))
11531152
}
11541153

11551154
// If within a step, just run the function directly
@@ -1176,7 +1175,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
11761175
})
11771176
}, withRetrierLogger(c.logger))
11781177
if err != nil {
1179-
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("checking operation execution: %v", err))
1178+
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("checking operation execution: %w", err))
11801179
}
11811180
if recordedOutput != nil {
11821181
return recordedOutput.output, recordedOutput.err
@@ -1205,7 +1204,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12051204
// Wait before retry
12061205
select {
12071206
case <-c.Done():
1208-
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("context cancelled during retry: %v", c.Err()))
1207+
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("context cancelled during retry: %w", c.Err()))
12091208
case <-time.After(delay):
12101209
// Continue to retry
12111210
}
@@ -1241,7 +1240,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12411240
return c.systemDB.recordOperationResult(uncancellableCtx, dbInput)
12421241
}, withRetrierLogger(c.logger))
12431242
if recErr != nil {
1244-
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("recording step outcome: %v", recErr))
1243+
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, recErr)
12451244
}
12461245

12471246
return stepOutput, stepError

dbos/workflows_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4149,9 +4149,10 @@ func TestGarbageCollect(t *testing.T) {
41494149
found = true
41504150
require.Equal(t, WorkflowStatusPending, wf.Status, "blocked workflow should still be pending")
41514151
}
4152-
if wf.Status == WorkflowStatusPending {
4152+
switch wf.Status {
4153+
case WorkflowStatusPending:
41534154
pendingCount++
4154-
} else if wf.Status == WorkflowStatusSuccess {
4155+
case WorkflowStatusSuccess:
41554156
completedCount++
41564157
}
41574158
}

0 commit comments

Comments
 (0)