diff --git a/dbos/errors.go b/dbos/errors.go index 982cf3b..88be37b 100644 --- a/dbos/errors.go +++ b/dbos/errors.go @@ -118,6 +118,14 @@ func newAwaitedWorkflowCancelledError(workflowID string) *DBOSError { } } +func newAwaitedWorkflowMaxStepRetriesExceeded(workflowID string) *DBOSError { + return &DBOSError{ + Message: fmt.Sprintf("Awaited workflow %s has exceeded the maximum number of step retries", workflowID), + Code: MaxStepRetriesExceeded, + WorkflowID: workflowID, + } +} + func newWorkflowCancelledError(workflowID string) *DBOSError { return &DBOSError{ Message: fmt.Sprintf("Workflow %s was cancelled", workflowID), diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 53148f6..bab4b01 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -366,9 +366,17 @@ func TestWorkflowQueues(t *testing.T) { // Check the workflow completes dlqCompleteEvent.Set() for _, handle := range handles { - result, err := handle.GetResult() - require.NoError(t, err, "failed to get result from recovered workflow handle") - assert.Equal(t, "test-input", result, "expected result to be 'test-input'") + result, _ := handle.GetResult() + handleStatus, _ := handle.GetStatus() + + if result == nil { + assert.Equal(t, WorkflowStatusMaxRecoveryAttemptsExceeded, handleStatus.Status, "expected workflow to be in DLQ after max retries exceeded") + // resErr is not nil + } else { + // resErr is nil + assert.Equal(t, WorkflowStatusSuccess, handleStatus.Status, "expected workflow status to be SUCCESS when result is not nil") + } + } require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after successive enqueues test") diff --git a/dbos/system_database.go b/dbos/system_database.go index e3365bc..249c674 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1218,6 +1218,8 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (*st return outputString, errors.New(*errorStr) case WorkflowStatusCancelled: return outputString, newAwaitedWorkflowCancelledError(workflowID) + case WorkflowStatusMaxRecoveryAttemptsExceeded: + return outputString, newAwaitedWorkflowMaxStepRetriesExceeded(workflowID) default: time.Sleep(_DB_RETRY_INTERVAL) }