From 7935dfbef3e91bf85ad7a7464209ddb75a9e0024 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 15:47:31 -0700 Subject: [PATCH 01/19] accept a closure in RunAsStep --- dbos/dbos.go | 2 +- dbos/queues_test.go | 6 ++- dbos/serialization_test.go | 14 ++++--- dbos/workflow.go | 44 +++++++++------------- dbos/workflows_test.go | 76 ++++++++++++++++++++++++++------------ 5 files changed, 84 insertions(+), 58 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 18cccf99..1d3a70db 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -61,7 +61,7 @@ type DBOSContext interface { Cancel() // Workflow operations - RunAsStep(_ DBOSContext, fn StepFunc, input any) (any, error) + RunAsStep(_ DBOSContext, fn StepFunc) (any, error) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) Send(_ DBOSContext, input WorkflowSendInputInternal) error Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 0b8733b9..581c5d74 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -30,14 +30,16 @@ This suite tests */ func queueWorkflow(ctx DBOSContext, input string) (string, error) { - step1, err := RunAsStep(ctx, queueStep, input) + step1, err := RunAsStep[string](ctx, func(context context.Context) (string, error) { + return queueStep(context, input) + }) if err != nil { return "", fmt.Errorf("failed to run step: %v", err) } return step1, nil } -func queueStep(ctx context.Context, input string) (string, error) { +func queueStep(_ context.Context, input string) (string, error) { return input, nil } diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index bc8ebd4a..4858cf15 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -23,7 +23,9 @@ func encodingStepBuiltinTypes(_ context.Context, input int) (int, error) { } func encodingWorkflowBuiltinTypes(ctx DBOSContext, input string) (string, error) { - stepResult, err := RunAsStep(ctx, encodingStepBuiltinTypes, 123) + stepResult, err := RunAsStep[int](ctx, func(context context.Context) (int, error) { + return encodingStepBuiltinTypes(context, 123) + }) return fmt.Sprintf("%d", stepResult), fmt.Errorf("workflow error: %v", err) } @@ -49,13 +51,15 @@ type SimpleStruct struct { } func encodingWorkflowStruct(ctx DBOSContext, input WorkflowInputStruct) (StepOutputStruct, error) { - return RunAsStep(ctx, encodingStepStruct, StepInputStruct{ - A: input.A, - B: fmt.Sprintf("%d", input.B), + return RunAsStep[StepOutputStruct](ctx, func(context context.Context) (StepOutputStruct, error) { + return encodingStepStruct(context, StepInputStruct{ + A: input.A, + B: fmt.Sprintf("%d", input.B), + }) }) } -func encodingStepStruct(ctx context.Context, input StepInputStruct) (StepOutputStruct, error) { +func encodingStepStruct(_ context.Context, input StepInputStruct) (StepOutputStruct, error) { return StepOutputStruct{ A: input, B: "processed by encodingStepStruct", diff --git a/dbos/workflow.go b/dbos/workflow.go index e6802307..c0579b5b 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -684,8 +684,8 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o /******* STEP FUNCTIONS *******/ /******************************/ -type StepFunc func(ctx context.Context, input any) (any, error) -type GenericStepFunc[P any, R any] func(ctx context.Context, input P) (R, error) +type StepFunc func(ctx context.Context) (any, error) +type GenericStepFunc[R any] func(ctx context.Context) (R, error) const StepParamsKey DBOSContextKey = "stepParams" @@ -729,7 +729,7 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { var typeErasedStepNameToStepName = make(map[string]string) -func RunAsStep[P any, R any](ctx DBOSContext, fn GenericStepFunc[P, R], input P) (R, error) { +func RunAsStep[P any, R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { if ctx == nil { return *new(R), newStepExecutionError("", "", "ctx cannot be nil") } @@ -738,37 +738,27 @@ func RunAsStep[P any, R any](ctx DBOSContext, fn GenericStepFunc[P, R], input P) return *new(R), newStepExecutionError("", "", "step function cannot be nil") } - // Type-erase the function based on its actual type - typeErasedFn := StepFunc(func(ctx context.Context, input any) (any, error) { - typedInput, ok := input.(P) - if !ok { - return nil, newStepExecutionError("", "", fmt.Sprintf("unexpected input type: expected %T, got %T", *new(P), input)) - } - return fn(ctx, typedInput) - }) + stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() - typeErasedStepNameToStepName[runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()] = runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() + // Type-erase the function + typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) + typeErasedStepNameToStepName[runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()] = stepName // Call the executor method - result, err := ctx.RunAsStep(ctx, typeErasedFn, input) - if err != nil { - // In case the errors comes from the DBOS step logic, the result will be nil and we must handle it - if result == nil { - return *new(R), err - } - return result.(R), err + result, err := ctx.RunAsStep(ctx, typeErasedFn) + // Step function could return a nil result + if result == nil { + return *new(R), err } - - // Type-check and cast the result + // Otherwise type-check and cast the result typedResult, ok := result.(R) if !ok { return *new(R), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) } - return typedResult, nil } -func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, input any) (any, error) { +func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc) (any, error) { // Get workflow state from context wfState, ok := c.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { @@ -776,8 +766,8 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, input any) (any, err return nil, newStepExecutionError("", "", "workflow state not found in context: are you running this step within a workflow?") } + // This should not happen when called from the package-level RunAsStep if fn == nil { - // TODO: try to print step name return nil, newStepExecutionError(wfState.workflowID, "", "step function cannot be nil") } @@ -790,7 +780,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, input any) (any, err // If within a step, just run the function directly if wfState.isWithinStep { - return fn(c, input) + return fn(c) } // Setup step state @@ -819,7 +809,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, input any) (any, err // Spawn a child DBOSContext with the step state stepCtx := WithValue(c, workflowStateKey, &stepState) - stepOutput, stepError := fn(stepCtx, input) + stepOutput, stepError := fn(stepCtx) // Retry if MaxRetries > 0 and the first execution failed var joinedErrors error @@ -845,7 +835,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, input any) (any, err } // Execute the retry - stepOutput, stepError = fn(stepCtx, input) + stepOutput, stepError = fn(stepCtx) // If successful, break if stepError == nil { diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 3c5913cd..5f96f334 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -35,19 +35,23 @@ func simpleWorkflowError(dbosCtx DBOSContext, input string) (int, error) { } func simpleWorkflowWithStep(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep(dbosCtx, simpleStep, input) + return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return simpleStep(ctx) + }) } -func simpleStep(ctx context.Context, input string) (string, error) { +func simpleStep(_ context.Context) (string, error) { return "from step", nil } -func simpleStepError(ctx context.Context, input string) (string, error) { +func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep(dbosCtx, simpleStepError, input) + return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return simpleStepError(ctx) + }) } // idempotencyWorkflow increments a global counter and returns the input @@ -292,38 +296,44 @@ func TestWorkflowsRegistration(t *testing.T) { } } -func stepWithinAStep(ctx context.Context, input string) (string, error) { - return simpleStep(ctx, input) +func stepWithinAStep(ctx context.Context) (string, error) { + return simpleStep(ctx) } func stepWithinAStepWorkflow(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep(dbosCtx, stepWithinAStep, input) + return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return stepWithinAStep(ctx) + }) } // Global counter for retry testing var stepRetryAttemptCount int -func stepRetryAlwaysFailsStep(ctx context.Context, input string) (string, error) { +func stepRetryAlwaysFailsStep(ctx context.Context) (string, error) { stepRetryAttemptCount++ return "", fmt.Errorf("always fails - attempt %d", stepRetryAttemptCount) } var stepIdempotencyCounter int -func stepIdempotencyTest(ctx context.Context, input int) (string, error) { +func stepIdempotencyTest(ctx context.Context) (string, error) { stepIdempotencyCounter++ return "", nil } func stepRetryWorkflow(dbosCtx DBOSContext, input string) (string, error) { - RunAsStep(dbosCtx, stepIdempotencyTest, 1) + RunAsStep[int](dbosCtx, func(ctx context.Context) (string, error) { + return stepIdempotencyTest(ctx) + }) stepCtx := WithValue(dbosCtx, StepParamsKey, &StepParams{ MaxRetries: 5, BaseInterval: 1 * time.Millisecond, MaxInterval: 10 * time.Millisecond, }) - return RunAsStep(stepCtx, stepRetryAlwaysFailsStep, input) + return RunAsStep[string](stepCtx, func(ctx context.Context) (string, error) { + return stepRetryAlwaysFailsStep(ctx) + }) } func TestSteps(t *testing.T) { @@ -335,7 +345,9 @@ func TestSteps(t *testing.T) { t.Run("StepsMustRunInsideWorkflows", func(t *testing.T) { // Attempt to run a step outside of a workflow context - _, err := RunAsStep(dbosCtx, simpleStep, "test") + _, err := RunAsStep[int](dbosCtx, func(ctx context.Context) (string, error) { + return simpleStep(ctx) + }) if err == nil { t.Fatal("expected error when running step outside of workflow context, but got none") } @@ -470,7 +482,9 @@ func TestChildWorkflow(t *testing.T) { return "", fmt.Errorf("expected childWf workflow ID to be %s, got %s", expectedCurrentID, workflowID) } // Steps of a child workflow start with an incremented step ID, because the first step ID is allocated to the child workflow - return RunAsStep(dbosCtx, simpleStep, "") + return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return simpleStep(ctx) + }) } RegisterWorkflow(dbosCtx, childWf) @@ -644,7 +658,9 @@ func TestChildWorkflow(t *testing.T) { customChildID := uuid.NewString() simpleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep(dbosCtx, simpleStep, input) + return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return simpleStep(ctx) + }) } RegisterWorkflow(dbosCtx, simpleChildWf) @@ -713,13 +729,15 @@ func TestChildWorkflow(t *testing.T) { // Idempotency workflows moved to test functions func idempotencyWorkflow(dbosCtx DBOSContext, input string) (string, error) { - RunAsStep(dbosCtx, incrementCounter, int64(1)) + RunAsStep[int64](dbosCtx, func(ctx context.Context) (int64, error) { + return incrementCounter(ctx, int64(1)) + }) return input, nil } var blockingStepStopEvent *Event -func blockingStep(ctx context.Context, input string) (string, error) { +func blockingStep(_ context.Context) (string, error) { blockingStepStopEvent.Wait() return "", nil } @@ -727,9 +745,13 @@ func blockingStep(ctx context.Context, input string) (string, error) { var idempotencyWorkflowWithStepEvent *Event func idempotencyWorkflowWithStep(dbosCtx DBOSContext, input string) (int64, error) { - RunAsStep(dbosCtx, incrementCounter, int64(1)) + RunAsStep[int64](dbosCtx, func(ctx context.Context) (int64, error) { + return incrementCounter(ctx, int64(1)) + }) idempotencyWorkflowWithStepEvent.Set() - RunAsStep(dbosCtx, blockingStep, input) + RunAsStep[int](dbosCtx, func(ctx context.Context) (string, error) { + return blockingStep(ctx) + }) return idempotencyCounter, nil } @@ -1253,7 +1275,9 @@ func stepThatCallsSend(ctx context.Context, input sendWorkflowInput) (string, er } func workflowThatCallsSendInStep(ctx DBOSContext, input sendWorkflowInput) (string, error) { - return RunAsStep(ctx, stepThatCallsSend, input) + return RunAsStep[sendWorkflowInput](ctx, func(context context.Context) (string, error) { + return stepThatCallsSend(context, input) + }) } type sendRecvType struct { @@ -2193,7 +2217,7 @@ func TestWorkflowTimeout(t *testing.T) { } }) - waitForCancelStep := func(ctx context.Context, _ string) (string, error) { + waitForCancelStep := func(ctx context.Context) (string, error) { // This step will trigger cancellation of the entire workflow context <-ctx.Done() if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) { @@ -2203,7 +2227,9 @@ func TestWorkflowTimeout(t *testing.T) { } waitForCancelWorkflowWithStep := func(ctx DBOSContext, _ string) (string, error) { - return RunAsStep(ctx, waitForCancelStep, "trigger-cancellation") + return RunAsStep[sendWorkflowInput](ctx, func(context context.Context) (string, error) { + return waitForCancelStep(context) + }) } RegisterWorkflow(dbosCtx, waitForCancelWorkflowWithStep) @@ -2240,7 +2266,9 @@ func TestWorkflowTimeout(t *testing.T) { // The timeout will trigger a step error, the workflow can do whatever it wants with that error stepCtx, stepCancelFunc := WithTimeout(ctx, 1*time.Millisecond) defer stepCancelFunc() // Ensure we clean up the context - _, err := RunAsStep(stepCtx, waitForCancelStep, "short-step-timeout") + _, err := RunAsStep[string](stepCtx, func(context context.Context) (string, error) { + return waitForCancelStep(context) + }) if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("expected step to timeout, got: %v", err) } @@ -2287,7 +2315,9 @@ func TestWorkflowTimeout(t *testing.T) { // This workflow will run a step that is not cancelable. // What this means is the workflow *will* be cancelled, but the step will run normally stepCtx := WithoutCancel(ctx) - res, err := RunAsStep(stepCtx, detachedStep, timeout*2) + res, err := RunAsStep[time.Duration](stepCtx, func(context context.Context) (string, error) { + return detachedStep(context, timeout*2) + }) if err != nil { t.Fatalf("failed to run detached step: %v", err) } From e156816deb287d4e76362bf81027ae18b43f8fa5 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 15:58:50 -0700 Subject: [PATCH 02/19] fix --- dbos/workflow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index c0579b5b..2f084a2a 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -744,7 +744,7 @@ func RunAsStep[P any, R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) typeErasedStepNameToStepName[runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()] = stepName - // Call the executor method + // Call the executor method and pass through the result/error result, err := ctx.RunAsStep(ctx, typeErasedFn) // Step function could return a nil result if result == nil { @@ -755,7 +755,7 @@ func RunAsStep[P any, R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) if !ok { return *new(R), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) } - return typedResult, nil + return typedResult, err } func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc) (any, error) { From 88cf424be91327b85a152c92532a46e51fac3693 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 16:10:45 -0700 Subject: [PATCH 03/19] fix --- dbos/workflows_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 5f96f334..453c48ab 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1153,9 +1153,9 @@ func TestScheduledWorkflows(t *testing.T) { } // Stop the workflowScheduler and check if it stops executing - currentCounter := counter dbosCtx.(*dbosContext).getWorkflowScheduler().Stop() time.Sleep(3 * time.Second) // Wait a bit to ensure no more executions + currentCounter := counter // If more scheduled executions happen, this can also trigger a data race. If the scheduler is correct, there should be no race. if counter >= currentCounter+2 { t.Fatalf("Scheduled workflow continued executing after stopping scheduler: %d (expected < %d)", counter, currentCounter+2) } From fb5b0b6b07b5df5ce3ac0e047c0e4a4a259a3683 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 16:18:37 -0700 Subject: [PATCH 04/19] fix --- dbos/workflows_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 453c48ab..0ad62362 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -423,7 +423,7 @@ func TestSteps(t *testing.T) { } // Verify the error contains the step name and max retries - expectedErrorMessage := "dbos.stepRetryAlwaysFailsStep has exceeded its maximum of 5 retries" + expectedErrorMessage := "has exceeded its maximum of 5 retries" if !strings.Contains(dbosErr.Message, expectedErrorMessage) { t.Fatalf("expected error message to contain '%s', got '%s'", expectedErrorMessage, dbosErr.Message) } From dfcff91e995bb635cb3b886ad962227a1c387bc2 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 17:03:42 -0700 Subject: [PATCH 05/19] fix --- dbos/queues_test.go | 2 +- dbos/serialization_test.go | 4 ++-- dbos/workflow.go | 2 +- dbos/workflows_test.go | 30 +++++++++++++++--------------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 581c5d74..eda6dd57 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -30,7 +30,7 @@ This suite tests */ func queueWorkflow(ctx DBOSContext, input string) (string, error) { - step1, err := RunAsStep[string](ctx, func(context context.Context) (string, error) { + step1, err := RunAsStep(ctx, func(context context.Context) (string, error) { return queueStep(context, input) }) if err != nil { diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index 4858cf15..c367c89d 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -23,7 +23,7 @@ func encodingStepBuiltinTypes(_ context.Context, input int) (int, error) { } func encodingWorkflowBuiltinTypes(ctx DBOSContext, input string) (string, error) { - stepResult, err := RunAsStep[int](ctx, func(context context.Context) (int, error) { + stepResult, err := RunAsStep(ctx, func(context context.Context) (int, error) { return encodingStepBuiltinTypes(context, 123) }) return fmt.Sprintf("%d", stepResult), fmt.Errorf("workflow error: %v", err) @@ -51,7 +51,7 @@ type SimpleStruct struct { } func encodingWorkflowStruct(ctx DBOSContext, input WorkflowInputStruct) (StepOutputStruct, error) { - return RunAsStep[StepOutputStruct](ctx, func(context context.Context) (StepOutputStruct, error) { + return RunAsStep(ctx, func(context context.Context) (StepOutputStruct, error) { return encodingStepStruct(context, StepInputStruct{ A: input.A, B: fmt.Sprintf("%d", input.B), diff --git a/dbos/workflow.go b/dbos/workflow.go index 2f084a2a..a3eac4a2 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -729,7 +729,7 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { var typeErasedStepNameToStepName = make(map[string]string) -func RunAsStep[P any, R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { +func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { if ctx == nil { return *new(R), newStepExecutionError("", "", "ctx cannot be nil") } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 0ad62362..45112b5f 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -35,7 +35,7 @@ func simpleWorkflowError(dbosCtx DBOSContext, input string) (int, error) { } func simpleWorkflowWithStep(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStep(ctx) }) } @@ -49,7 +49,7 @@ func simpleStepError(_ context.Context) (string, error) { } func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStepError(ctx) }) } @@ -301,7 +301,7 @@ func stepWithinAStep(ctx context.Context) (string, error) { } func stepWithinAStepWorkflow(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return stepWithinAStep(ctx) }) } @@ -322,7 +322,7 @@ func stepIdempotencyTest(ctx context.Context) (string, error) { } func stepRetryWorkflow(dbosCtx DBOSContext, input string) (string, error) { - RunAsStep[int](dbosCtx, func(ctx context.Context) (string, error) { + RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return stepIdempotencyTest(ctx) }) stepCtx := WithValue(dbosCtx, StepParamsKey, &StepParams{ @@ -331,7 +331,7 @@ func stepRetryWorkflow(dbosCtx DBOSContext, input string) (string, error) { MaxInterval: 10 * time.Millisecond, }) - return RunAsStep[string](stepCtx, func(ctx context.Context) (string, error) { + return RunAsStep(stepCtx, func(ctx context.Context) (string, error) { return stepRetryAlwaysFailsStep(ctx) }) } @@ -345,7 +345,7 @@ func TestSteps(t *testing.T) { t.Run("StepsMustRunInsideWorkflows", func(t *testing.T) { // Attempt to run a step outside of a workflow context - _, err := RunAsStep[int](dbosCtx, func(ctx context.Context) (string, error) { + _, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStep(ctx) }) if err == nil { @@ -482,7 +482,7 @@ func TestChildWorkflow(t *testing.T) { return "", fmt.Errorf("expected childWf workflow ID to be %s, got %s", expectedCurrentID, workflowID) } // Steps of a child workflow start with an incremented step ID, because the first step ID is allocated to the child workflow - return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStep(ctx) }) } @@ -658,7 +658,7 @@ func TestChildWorkflow(t *testing.T) { customChildID := uuid.NewString() simpleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { - return RunAsStep[string](dbosCtx, func(ctx context.Context) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStep(ctx) }) } @@ -729,7 +729,7 @@ func TestChildWorkflow(t *testing.T) { // Idempotency workflows moved to test functions func idempotencyWorkflow(dbosCtx DBOSContext, input string) (string, error) { - RunAsStep[int64](dbosCtx, func(ctx context.Context) (int64, error) { + RunAsStep(dbosCtx, func(ctx context.Context) (int64, error) { return incrementCounter(ctx, int64(1)) }) return input, nil @@ -745,11 +745,11 @@ func blockingStep(_ context.Context) (string, error) { var idempotencyWorkflowWithStepEvent *Event func idempotencyWorkflowWithStep(dbosCtx DBOSContext, input string) (int64, error) { - RunAsStep[int64](dbosCtx, func(ctx context.Context) (int64, error) { + RunAsStep(dbosCtx, func(ctx context.Context) (int64, error) { return incrementCounter(ctx, int64(1)) }) idempotencyWorkflowWithStepEvent.Set() - RunAsStep[int](dbosCtx, func(ctx context.Context) (string, error) { + RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return blockingStep(ctx) }) return idempotencyCounter, nil @@ -1275,7 +1275,7 @@ func stepThatCallsSend(ctx context.Context, input sendWorkflowInput) (string, er } func workflowThatCallsSendInStep(ctx DBOSContext, input sendWorkflowInput) (string, error) { - return RunAsStep[sendWorkflowInput](ctx, func(context context.Context) (string, error) { + return RunAsStep(ctx, func(context context.Context) (string, error) { return stepThatCallsSend(context, input) }) } @@ -2227,7 +2227,7 @@ func TestWorkflowTimeout(t *testing.T) { } waitForCancelWorkflowWithStep := func(ctx DBOSContext, _ string) (string, error) { - return RunAsStep[sendWorkflowInput](ctx, func(context context.Context) (string, error) { + return RunAsStep(ctx, func(context context.Context) (string, error) { return waitForCancelStep(context) }) } @@ -2266,7 +2266,7 @@ func TestWorkflowTimeout(t *testing.T) { // The timeout will trigger a step error, the workflow can do whatever it wants with that error stepCtx, stepCancelFunc := WithTimeout(ctx, 1*time.Millisecond) defer stepCancelFunc() // Ensure we clean up the context - _, err := RunAsStep[string](stepCtx, func(context context.Context) (string, error) { + _, err := RunAsStep(stepCtx, func(context context.Context) (string, error) { return waitForCancelStep(context) }) if !errors.Is(err, context.DeadlineExceeded) { @@ -2315,7 +2315,7 @@ func TestWorkflowTimeout(t *testing.T) { // This workflow will run a step that is not cancelable. // What this means is the workflow *will* be cancelled, but the step will run normally stepCtx := WithoutCancel(ctx) - res, err := RunAsStep[time.Duration](stepCtx, func(context context.Context) (string, error) { + res, err := RunAsStep(stepCtx, func(context context.Context) (string, error) { return detachedStep(context, timeout*2) }) if err != nil { From a62ee45ead7c1d51987e7e6ee27addb4e9dcfe4b Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 17:47:34 -0700 Subject: [PATCH 06/19] check special steps --- dbos/workflows_test.go | 186 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 178 insertions(+), 8 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 45112b5f..3d49925b 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -382,7 +382,7 @@ func TestSteps(t *testing.T) { t.Fatalf("expected result 'from step', got '%s'", result) } - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) if err != nil { t.Fatal("failed to list steps:", err) } @@ -437,7 +437,7 @@ func TestSteps(t *testing.T) { } // Verify that the failed step was still recorded in the database - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) if err != nil { t.Fatal("failed to get workflow steps:", err) } @@ -694,7 +694,7 @@ func TestChildWorkflow(t *testing.T) { } // Verify the child workflow was recorded as step 0 - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), parentHandle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, parentHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get workflow steps: %v", err) } @@ -1329,6 +1329,40 @@ func TestSendRecv(t *testing.T) { if time.Since(start) > 10*time.Second { t.Fatalf("receive workflow took too long to complete, expected < 5s, got %v", time.Since(start)) } + + // Verify step counting for send workflow (sendWorkflow calls Send 3 times) + sendSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for send workflow: %v", err) + } + if len(sendSteps) != 3 { + t.Fatalf("expected 3 steps in send workflow (3 Send calls), got %d", len(sendSteps)) + } + for i, step := range sendSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.send" { + t.Fatalf("expected step %d to have StepName 'DBOS.send', got '%s'", i, step.StepName) + } + } + + // Verify step counting for receive workflow (receiveWorkflow calls Recv 3 times) + receiveSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for receive workflow: %v", err) + } + if len(receiveSteps) != 3 { + t.Fatalf("expected 3 steps in receive workflow (3 Recv calls), got %d", len(receiveSteps)) + } + for i, step := range receiveSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.recv" { + t.Fatalf("expected step %d to have StepName 'DBOS.recv', got '%s'", i, step.StepName) + } + } }) t.Run("SendRecvCustomStruct", func(t *testing.T) { @@ -1362,6 +1396,36 @@ func TestSendRecv(t *testing.T) { if result.Value != "test-struct-value" { t.Fatalf("expected received struct value to be 'test-struct-value', got '%s'", result.Value) } + + // Verify step counting for sendStructWorkflow (calls Send 1 time) + sendSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for send struct workflow: %v", err) + } + if len(sendSteps) != 1 { + t.Fatalf("expected 1 step in send struct workflow (1 Send call), got %d", len(sendSteps)) + } + if sendSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", sendSteps[0].StepID) + } + if sendSteps[0].StepName != "DBOS.send" { + t.Fatalf("expected step to have StepName 'DBOS.send', got '%s'", sendSteps[0].StepName) + } + + // Verify step counting for receiveStructWorkflow (calls Recv 1 time) + receiveSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for receive struct workflow: %v", err) + } + if len(receiveSteps) != 1 { + t.Fatalf("expected 1 step in receive struct workflow (1 Recv call), got %d", len(receiveSteps)) + } + if receiveSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", receiveSteps[0].StepID) + } + if receiveSteps[0].StepName != "DBOS.recv" { + t.Fatalf("expected step to have StepName 'DBOS.recv', got '%s'", receiveSteps[0].StepName) + } }) t.Run("SendToNonExistentUUID", func(t *testing.T) { @@ -1463,6 +1527,23 @@ func TestSendRecv(t *testing.T) { if result != "message1-message2-message3" { t.Fatalf("expected result to be 'message1-message2-message3', got '%s'", result) } + + // Verify step counting for receive workflow (calls Recv 3 times) + receiveSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for receive workflow: %v", err) + } + if len(receiveSteps) != 3 { + t.Fatalf("expected 3 steps in receive workflow (3 Recv calls), got %d", len(receiveSteps)) + } + for i, step := range receiveSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.recv" { + t.Fatalf("expected step %d to have StepName 'DBOS.recv', got '%s'", i, step.StepName) + } + } }) t.Run("SendRecvIdempotency", func(t *testing.T) { // Start the receive workflow and wait for it to be ready @@ -1491,20 +1572,32 @@ func TestSendRecv(t *testing.T) { if len(recoveredHandles) != 2 { t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles)) } - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), sendHandle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get workflow steps: %v", err) } if len(steps) != 1 { t.Fatalf("expected 1 step in send idempotency workflow, got %d", len(steps)) } - steps, err = dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID()) + if steps[0].StepID != 0 { + t.Fatalf("expected send idempotency step to have StepID 0, got %d", steps[0].StepID) + } + if steps[0].StepName != "DBOS.send" { + t.Fatalf("expected send idempotency step to have StepName 'DBOS.send', got '%s'", steps[0].StepName) + } + steps, err = dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for receive idempotency workflow: %v", err) } if len(steps) != 1 { t.Fatalf("expected 1 step in receive idempotency workflow, got %d", len(steps)) } + if steps[0].StepID != 0 { + t.Fatalf("expected receive idempotency step to have StepID 0, got %d", steps[0].StepID) + } + if steps[0].StepName != "DBOS.recv" { + t.Fatalf("expected receive idempotency step to have StepName 'DBOS.recv', got '%s'", steps[0].StepName) + } // Unblock the workflows to complete receiveIdempotencyStopEvent.Set() @@ -1805,6 +1898,53 @@ func TestSetGetEvent(t *testing.T) { if result != "two-events-set" { t.Fatalf("expected result to be 'two-events-set', got '%s'", result) } + + // Verify step counting for setTwoEventsWorkflow (calls SetEvent 2 times) + setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for set two events workflow: %v", err) + } + if len(setSteps) != 2 { + t.Fatalf("expected 2 steps in set two events workflow (2 SetEvent calls), got %d", len(setSteps)) + } + for i, step := range setSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.setEvent" { + t.Fatalf("expected step %d to have StepName 'DBOS.setEvent', got '%s'", i, step.StepName) + } + } + + // Verify step counting for getFirstEventHandle (calls GetEvent 1 time) + getFirstSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, getFirstEventHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for get first event workflow: %v", err) + } + if len(getFirstSteps) != 1 { + t.Fatalf("expected 1 step in get first event workflow (1 GetEvent call), got %d", len(getFirstSteps)) + } + if getFirstSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", getFirstSteps[0].StepID) + } + if getFirstSteps[0].StepName != "DBOS.getEvent" { + t.Fatalf("expected step to have StepName 'DBOS.getEvent', got '%s'", getFirstSteps[0].StepName) + } + + // Verify step counting for getSecondEventHandle (calls GetEvent 1 time) + getSecondSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, getSecondEventHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for get second event workflow: %v", err) + } + if len(getSecondSteps) != 1 { + t.Fatalf("expected 1 step in get second event workflow (1 GetEvent call), got %d", len(getSecondSteps)) + } + if getSecondSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", getSecondSteps[0].StepID) + } + if getSecondSteps[0].StepName != "DBOS.getEvent" { + t.Fatalf("expected step to have StepName 'DBOS.getEvent', got '%s'", getSecondSteps[0].StepName) + } }) t.Run("GetEventFromOutsideWorkflow", func(t *testing.T) { @@ -1835,6 +1975,21 @@ func TestSetGetEvent(t *testing.T) { if message != "test-message" { t.Fatalf("expected received message to be 'test-message', got '%s'", message) } + + // Verify step counting for setEventWorkflow (calls SetEvent 1 time) + setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for set event workflow: %v", err) + } + if len(setSteps) != 1 { + t.Fatalf("expected 1 step in set event workflow (1 SetEvent call), got %d", len(setSteps)) + } + if setSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", setSteps[0].StepID) + } + if setSteps[0].StepName != "DBOS.setEvent" { + t.Fatalf("expected step to have StepName 'DBOS.setEvent', got '%s'", setSteps[0].StepName) + } }) t.Run("GetEventTimeout", func(t *testing.T) { @@ -1939,21 +2094,33 @@ func TestSetGetEvent(t *testing.T) { setEventStartIdempotencyEvent.Wait() // Verify step counts - setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), setHandle.GetWorkflowID()) + setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for set event idempotency workflow: %v", err) } if len(setSteps) != 1 { t.Fatalf("expected 1 step in set event idempotency workflow, got %d", len(setSteps)) } + if setSteps[0].StepID != 0 { + t.Fatalf("expected set event idempotency step to have StepID 0, got %d", setSteps[0].StepID) + } + if setSteps[0].StepName != "DBOS.setEvent" { + t.Fatalf("expected set event idempotency step to have StepName 'DBOS.setEvent', got '%s'", setSteps[0].StepName) + } - getSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), getHandle.GetWorkflowID()) + getSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, getHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for get event idempotency workflow: %v", err) } if len(getSteps) != 1 { t.Fatalf("expected 1 step in get event idempotency workflow, got %d", len(getSteps)) } + if getSteps[0].StepID != 0 { + t.Fatalf("expected get event idempotency step to have StepID 0, got %d", getSteps[0].StepID) + } + if getSteps[0].StepName != "DBOS.getEvent" { + t.Fatalf("expected get event idempotency step to have StepName 'DBOS.getEvent', got '%s'", getSteps[0].StepName) + } // Complete the workflows setEvenStopIdempotencyEvent.Set() @@ -2097,7 +2264,7 @@ func TestSleep(t *testing.T) { } // Verify the sleep step was recorded correctly - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get workflow steps: %v", err) } @@ -2107,6 +2274,9 @@ func TestSleep(t *testing.T) { } step := steps[0] + if step.StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", step.StepID) + } if step.StepName != "DBOS.sleep" { t.Fatalf("expected step name to be 'DBOS.sleep', got '%s'", step.StepName) } From 555c367b8190a123c36735358e8ba996348c3a21 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 17:49:54 -0700 Subject: [PATCH 07/19] remove t.Fatal calls in wf/steps --- dbos/workflows_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 3d49925b..8bf94969 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -142,11 +142,11 @@ func TestWorkflowsRegistration(t *testing.T) { result, err := handle.GetResult() _, err2 := handle.GetResult() if err2 == nil { - t.Fatal("Second call to GetResult should return an error") + return nil, fmt.Errorf("Second call to GetResult should return an error") } expectedErrorMsg := "workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?" if err2.Error() != expectedErrorMsg { - t.Fatal("Unexpected error message:", err2, "expected:", expectedErrorMsg) + return nil, fmt.Errorf("Unexpected error message: %v, expected: %s", err2, expectedErrorMsg) } return result, err }, @@ -2391,7 +2391,7 @@ func TestWorkflowTimeout(t *testing.T) { // This step will trigger cancellation of the entire workflow context <-ctx.Done() if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) { - t.Fatalf("step was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err()) + return "", fmt.Errorf("step was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err()) } return "", ctx.Err() } From 6d38d3f1e7e17fa5626ed8c8125c66db03838bf7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 18:11:33 -0700 Subject: [PATCH 08/19] fix race in step name map --- dbos/workflow.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index a3eac4a2..8b5a1065 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -8,6 +8,7 @@ import ( "math" "reflect" "runtime" + "sync" "time" "github.com/google/uuid" @@ -705,7 +706,7 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { BackoffFactor: 2.0, BaseInterval: 100 * time.Millisecond, // Default base interval MaxInterval: 5 * time.Second, // Default max interval - StepName: typeErasedStepNameToStepName[stepName], + StepName: getTypeErasedStepName(stepName), } } @@ -721,13 +722,22 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { } if params.StepName == "" { // If the step name is not provided, use the function name - params.StepName = typeErasedStepNameToStepName[stepName] + params.StepName = getTypeErasedStepName(stepName) } return params } -var typeErasedStepNameToStepName = make(map[string]string) +var ( + typeErasedStepNameToStepName = make(map[string]string) + typeErasedStepNameMutex sync.RWMutex +) + +func getTypeErasedStepName(stepName string) string { + typeErasedStepNameMutex.RLock() + defer typeErasedStepNameMutex.RUnlock() + return typeErasedStepNameToStepName[stepName] +} func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { if ctx == nil { @@ -742,7 +752,10 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { // Type-erase the function typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) - typeErasedStepNameToStepName[runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()] = stepName + typeErasedFnName := runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name() + typeErasedStepNameMutex.Lock() + typeErasedStepNameToStepName[typeErasedFnName] = stepName + typeErasedStepNameMutex.Unlock() // Call the executor method and pass through the result/error result, err := ctx.RunAsStep(ctx, typeErasedFn) From 3ebbf9fb56fdaaa01f8405b616ab5b6441fcd5a5 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 18:11:46 -0700 Subject: [PATCH 09/19] add test for running workflows in goroutines --- dbos/workflows_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 8bf94969..dbe96971 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -44,6 +44,12 @@ func simpleStep(_ context.Context) (string, error) { return "from step", nil } +func concurrentSimpleWorkflow(dbosCtx DBOSContext, input int) (int, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (int, error) { + return input * 2, nil + }) +} + func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } @@ -2696,3 +2702,66 @@ func TestWorkflowTimeout(t *testing.T) { } }) } + +func TestConcurrentWorkflows(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + RegisterWorkflow(dbosCtx, concurrentSimpleWorkflow) + + t.Run("SimpleWorkflow", func(t *testing.T) { + const numGoroutines = 100 + var wg sync.WaitGroup + results := make(chan int, numGoroutines) + errors := make(chan error, numGoroutines) + + wg.Add(numGoroutines) + for i := range numGoroutines { + go func(input int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, concurrentSimpleWorkflow, input) + if err != nil { + errors <- fmt.Errorf("failed to start workflow %d: %w", input, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for workflow %d: %w", input, err) + return + } + results <- result + }(i) + } + + wg.Wait() + close(results) + close(errors) + + if len(errors) > 0 { + for err := range errors { + t.Errorf("Workflow error: %v", err) + } + t.Fatalf("Expected no errors from concurrent workflows, got %d errors", len(errors)) + } + + resultCount := 0 + receivedResults := make(map[int]bool) + for result := range results { + resultCount++ + if result < 0 || result >= numGoroutines*2 || result%2 != 0 { + t.Errorf("Unexpected result %d", result) + } else { + receivedResults[result] = true + } + } + + if resultCount != numGoroutines { + t.Fatalf("Expected %d results, got %d", numGoroutines, resultCount) + } + + for i := range numGoroutines { + expectedResult := i * 2 + if !receivedResults[expectedResult] { + t.Errorf("Expected result %d not found", expectedResult) + } + } + }) +} From 63e2c789f8888d438676be5e69762495304a4b83 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 18:32:26 -0700 Subject: [PATCH 10/19] more concurrent tests --- dbos/workflows_test.go | 230 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 229 insertions(+), 1 deletion(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index dbe96971..9bd28d0a 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -2703,12 +2703,62 @@ func TestWorkflowTimeout(t *testing.T) { }) } +func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) { + result, err := GetEvent[string](ctx, WorkflowGetEventInput{ + TargetWorkflowID: fmt.Sprintf("notification-setter-%d", pairID), + Key: "event-key", + Timeout: 10 * time.Second, + }) + if err != nil { + return "", err + } + return result, nil +} + +func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) { + err := SetEvent(ctx, WorkflowSetEventInputGeneric[string]{ + Key: "event-key", + Message: fmt.Sprintf("notification-message-%d", pairID), + }) + if err != nil { + return "", err + } + return "event-set", nil +} + +func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) { + result, err := Recv[string](ctx, WorkflowRecvInput{ + Topic: "send-recv-topic", + Timeout: 10 * time.Second, + }) + if err != nil { + return "", err + } + return result, nil +} + +func sendRecvSenderWorkflow(ctx DBOSContext, pairID int) (string, error) { + err := Send(ctx, WorkflowSendInput[string]{ + DestinationID: fmt.Sprintf("send-recv-receiver-%d", pairID), + Topic: "send-recv-topic", + Message: fmt.Sprintf("send-recv-message-%d", pairID), + }) + if err != nil { + return "", err + } + return "message-sent", nil +} + func TestConcurrentWorkflows(t *testing.T) { dbosCtx := setupDBOS(t, true, true) RegisterWorkflow(dbosCtx, concurrentSimpleWorkflow) + RegisterWorkflow(dbosCtx, notificationWaiterWorkflow) + RegisterWorkflow(dbosCtx, notificationSetterWorkflow) + RegisterWorkflow(dbosCtx, sendRecvReceiverWorkflow) + RegisterWorkflow(dbosCtx, sendRecvSenderWorkflow) t.Run("SimpleWorkflow", func(t *testing.T) { - const numGoroutines = 100 + const numGoroutines = 500 var wg sync.WaitGroup results := make(chan int, numGoroutines) errors := make(chan error, numGoroutines) @@ -2764,4 +2814,182 @@ func TestConcurrentWorkflows(t *testing.T) { } } }) + + t.Run("NotificationWorkflows", func(t *testing.T) { + const numPairs = 500 + var wg sync.WaitGroup + waiterResults := make(chan string, numPairs) + setterResults := make(chan string, numPairs) + errors := make(chan error, numPairs*2) + + wg.Add(numPairs * 2) + + for i := range numPairs { + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, notificationSetterWorkflow, pairID, WithWorkflowID(fmt.Sprintf("notification-setter-%d", pairID))) + if err != nil { + errors <- fmt.Errorf("failed to start setter workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for setter workflow %d: %w", pairID, err) + return + } + setterResults <- result + }(i) + + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, notificationWaiterWorkflow, pairID) + if err != nil { + errors <- fmt.Errorf("failed to start waiter workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for waiter workflow %d: %w", pairID, err) + return + } + expectedMessage := fmt.Sprintf("notification-message-%d", pairID) + if result != expectedMessage { + errors <- fmt.Errorf("waiter workflow %d: expected message '%s', got '%s'", pairID, expectedMessage, result) + return + } + waiterResults <- result + }(i) + } + + wg.Wait() + close(waiterResults) + close(setterResults) + close(errors) + + if len(errors) > 0 { + for err := range errors { + t.Errorf("Workflow error: %v", err) + } + t.Fatalf("Expected no errors from notification workflows, got %d errors", len(errors)) + } + + waiterCount := 0 + receivedWaiterResults := make(map[string]bool) + for result := range waiterResults { + waiterCount++ + receivedWaiterResults[result] = true + } + + setterCount := 0 + for result := range setterResults { + setterCount++ + if result != "event-set" { + t.Errorf("Expected setter result to be 'event-set', got '%s'", result) + } + } + + if waiterCount != numPairs { + t.Fatalf("Expected %d waiter results, got %d", numPairs, waiterCount) + } + + if setterCount != numPairs { + t.Fatalf("Expected %d setter results, got %d", numPairs, setterCount) + } + + for i := range numPairs { + expectedWaiterResult := fmt.Sprintf("notification-message-%d", i) + if !receivedWaiterResults[expectedWaiterResult] { + t.Errorf("Expected waiter result '%s' not found", expectedWaiterResult) + } + } + }) + + t.Run("SendRecvWorkflows", func(t *testing.T) { + const numPairs = 500 + var wg sync.WaitGroup + receiverResults := make(chan string, numPairs) + senderResults := make(chan string, numPairs) + errors := make(chan error, numPairs*2) + + wg.Add(numPairs * 2) + + for i := range numPairs { + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, sendRecvReceiverWorkflow, pairID, WithWorkflowID(fmt.Sprintf("send-recv-receiver-%d", pairID))) + if err != nil { + errors <- fmt.Errorf("failed to start receiver workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for receiver workflow %d: %w", pairID, err) + return + } + expectedMessage := fmt.Sprintf("send-recv-message-%d", pairID) + if result != expectedMessage { + errors <- fmt.Errorf("receiver workflow %d: expected message '%s', got '%s'", pairID, expectedMessage, result) + return + } + receiverResults <- result + }(i) + + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, sendRecvSenderWorkflow, pairID) + if err != nil { + errors <- fmt.Errorf("failed to start sender workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for sender workflow %d: %w", pairID, err) + return + } + senderResults <- result + }(i) + } + + wg.Wait() + close(receiverResults) + close(senderResults) + close(errors) + + if len(errors) > 0 { + for err := range errors { + t.Errorf("Workflow error: %v", err) + } + t.Fatalf("Expected no errors from send/recv workflows, got %d errors", len(errors)) + } + + receiverCount := 0 + receivedReceiverResults := make(map[string]bool) + for result := range receiverResults { + receiverCount++ + receivedReceiverResults[result] = true + } + + senderCount := 0 + for result := range senderResults { + senderCount++ + if result != "message-sent" { + t.Errorf("Expected sender result to be 'message-sent', got '%s'", result) + } + } + + if receiverCount != numPairs { + t.Fatalf("Expected %d receiver results, got %d", numPairs, receiverCount) + } + + if senderCount != numPairs { + t.Fatalf("Expected %d sender results, got %d", numPairs, senderCount) + } + + for i := range numPairs { + expectedReceiverResult := fmt.Sprintf("send-recv-message-%d", i) + if !receivedReceiverResults[expectedReceiverResult] { + t.Errorf("Expected receiver result '%s' not found", expectedReceiverResult) + } + } + }) } From f81fff4adb80e21d597a2623822787e5bd9e42e4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 8 Aug 2025 20:58:01 -0700 Subject: [PATCH 11/19] set version --- dbos/workflows_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 9bd28d0a..e7e05c36 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -2993,3 +2993,33 @@ func TestConcurrentWorkflows(t *testing.T) { } }) } + +func TestWorkflowAtVersion(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + RegisterWorkflow(dbosCtx, simpleWorkflow) + + version := "test-app-version-12345" + handle, err := RunAsWorkflow(dbosCtx, simpleWorkflow, "input", WithApplicationVersion(version)) + if err != nil { + t.Fatalf("failed to start workflow: %v", err) + } + + _, err = handle.GetResult() + if err != nil { + t.Fatalf("failed to get workflow result: %v", err) + } + + retrieved, err := RetrieveWorkflow[string](dbosCtx, handle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to retrieve workflow: %v", err) + } + + status, err := retrieved.GetStatus() + if err != nil { + t.Fatalf("failed to get workflow status: %v", err) + } + if status.ApplicationVersion != version { + t.Fatalf("expected application version %q, got %q", version, status.ApplicationVersion) + } +} From cacb25ca34ef4f49672610d54db9df9238ac964c Mon Sep 17 00:00:00 2001 From: maxdml Date: Sun, 10 Aug 2025 18:13:26 -0700 Subject: [PATCH 12/19] allow custom workflow names --- dbos/workflow.go | 27 +++++++++++++---- dbos/workflows_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 6 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 8b5a1065..a05030b3 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -205,10 +205,11 @@ type WrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption type workflowRegistryEntry struct { wrappedFunction WrappedWorkflowFunc maxRetries int + name string } // Register adds a workflow function to the registry (thread-safe, only once per name) -func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFunc, maxRetries int) { +func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFunc, maxRetries int, customName string) { // Skip if we don't have a concrete dbosContext c, ok := ctx.(*dbosContext) if !ok { @@ -222,14 +223,15 @@ func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFu c.workflowRegMutex.Lock() defer c.workflowRegMutex.Unlock() - if _, exists := c.workflowRegistry[workflowName]; exists { - c.logger.Error("workflow function already registered", "fqn", workflowName) - panic(newConflictingRegistrationError(workflowName)) + if _, exists := c.workflowRegistry[workflowFQN]; exists { + c.logger.Error("workflow function already registered", "fqn", workflowFQN) + panic(newConflictingRegistrationError(workflowFQN)) } - c.workflowRegistry[workflowName] = workflowRegistryEntry{ + c.workflowRegistry[workflowFQN] = workflowRegistryEntry{ wrappedFunction: fn, maxRetries: maxRetries, + name: customName, } } @@ -275,6 +277,7 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow type workflowRegistrationParams struct { cronSchedule string maxRetries int + name string } type workflowRegistrationOption func(*workflowRegistrationParams) @@ -295,6 +298,12 @@ func WithSchedule(schedule string) workflowRegistrationOption { } } +func WithWorkflowName(name string) workflowRegistrationOption { + return func(p *workflowRegistrationParams) { + p.name = name + } +} + // RegisterWorkflow registers the provided function as a durable workflow with the provided DBOSContext workflow registry // If the workflow is a scheduled workflow (determined by the presence of a cron schedule), it will also register a cron job to execute it // RegisterWorkflow is generically typed, providing compile-time type checking and allowing us to register the workflow input and output types for gob encoding @@ -348,7 +357,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R } return &workflowPollingHandle[any]{workflowID: handle.GetWorkflowID(), dbosContext: ctx}, nil // this is only used by recovery and queue runner so far -- queue runner dismisses it }) - registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries) + registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries, registrationParams.name) // If this is a scheduled workflow, register a cron job if registrationParams.cronSchedule != "" { @@ -398,6 +407,7 @@ func WithApplicationVersion(version string) WorkflowOption { } } +// An internal option we use to map the reflection function name to the registration options. func withWorkflowName(name string) WorkflowOption { return func(p *workflowParams) { p.workflowName = name @@ -486,6 +496,11 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o params.maxRetries = registeredWorkflow.maxRetries } + // Use the custom workflow name if it was provided during registration + if registeredWorkflow.name != "" { + params.workflowName = registeredWorkflow.name + } + // Check if we are within a workflow (and thus a child workflow) parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState) isChildWorkflow := ok && parentWorkflowState != nil diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index e7e05c36..f7ace674 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -300,6 +300,74 @@ func TestWorkflowsRegistration(t *testing.T) { } }) } + + t.Run("DoubleRegistrationWithoutName", func(t *testing.T) { + // Create a fresh DBOS context for this test + freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB + + // First registration should work + RegisterWorkflow(freshCtx, simpleWorkflow) + + // Second registration of the same workflow should panic with ConflictingRegistrationError + defer func() { + r := recover() + if r == nil { + t.Fatal("expected panic from double registration but got none") + } + dbosErr, ok := r.(*DBOSError) + if !ok { + t.Fatalf("expected panic to be *DBOSError, got %T", r) + } + if dbosErr.Code != ConflictingRegistrationError { + t.Fatalf("expected ConflictingRegistrationError, got %v", dbosErr.Code) + } + }() + RegisterWorkflow(freshCtx, simpleWorkflow) + }) + + t.Run("DoubleRegistrationWithCustomName", func(t *testing.T) { + // Create a fresh DBOS context for this test + freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB + + // First registration with custom name should work + RegisterWorkflow(freshCtx, simpleWorkflow, WithWorkflowName("custom-workflow")) + + // Second registration with same custom name should panic with ConflictingRegistrationError + defer func() { + r := recover() + if r == nil { + t.Fatal("expected panic from double registration with custom name but got none") + } + dbosErr, ok := r.(*DBOSError) + if !ok { + t.Fatalf("expected panic to be *DBOSError, got %T", r) + } + if dbosErr.Code != ConflictingRegistrationError { + t.Fatalf("expected ConflictingRegistrationError, got %v", dbosErr.Code) + } + }() + RegisterWorkflow(freshCtx, simpleWorkflow, WithWorkflowName("custom-workflow")) + }) + + t.Run("RegisterAfterLaunchPanics", func(t *testing.T) { + // Create a fresh DBOS context for this test + freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB + + // Launch DBOS context + err := freshCtx.Launch() + if err != nil { + t.Fatalf("failed to launch DBOS context: %v", err) + } + defer freshCtx.Cancel() + + // Attempting to register after launch should panic + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic from registration after launch but got none") + } + }() + RegisterWorkflow(freshCtx, simpleWorkflow) + }) } func stepWithinAStep(ctx context.Context) (string, error) { From 83b81487f441cdea94966989a5f478717b664780 Mon Sep 17 00:00:00 2001 From: maxdml Date: Sun, 10 Aug 2025 18:26:04 -0700 Subject: [PATCH 13/19] test conflicting execution of workflows / tasks of the same ID --- dbos/queues_test.go | 63 +++++++++++++++-- dbos/workflows_test.go | 149 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 205 insertions(+), 7 deletions(-) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index eda6dd57..eaee7a2c 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "strings" "sync/atomic" "testing" "time" @@ -23,6 +24,7 @@ This suite tests [x] worker concurrency (2 at a time across two "workers") [x] worker concurrency X recovery [x] rate limiter +[x] conflicting workflow on different queues [] queue deduplication [] queue priority [x] queued workflow times out @@ -48,6 +50,8 @@ func TestWorkflowQueues(t *testing.T) { queue := NewWorkflowQueue(dbosCtx, "test-queue") dlqEnqueueQueue := NewWorkflowQueue(dbosCtx, "test-successive-enqueue-queue") + conflictQueue1 := NewWorkflowQueue(dbosCtx, "conflict-queue-1") + conflictQueue2 := NewWorkflowQueue(dbosCtx, "conflict-queue-2") dlqStartEvent := NewEvent() dlqCompleteEvent := NewEvent() @@ -172,14 +176,15 @@ func TestWorkflowQueues(t *testing.T) { } }) - /* TODO: we will move queue registry in the new interface in a subsequent PR t.Run("DynamicRegistration", func(t *testing.T) { - q := NewWorkflowQueue("dynamic-queue") - if len(q.name) > 0 { - t.Fatalf("expected nil queue for dynamic registration after DBOS initialization, got %v", q) - } + // Attempting to register a queue after launch should panic + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic from queue registration after launch but got none") + } + }() + NewWorkflowQueue(dbosCtx, "dynamic-queue") }) - */ t.Run("QueueWorkflowDLQ", func(t *testing.T) { workflowID := "blocking-workflow-test" @@ -255,6 +260,52 @@ func TestWorkflowQueues(t *testing.T) { t.Fatal("expected queue entries to be cleaned up after successive enqueues test") } }) + + t.Run("ConflictingWorkflowOnDifferentQueues", func(t *testing.T) { + workflowID := "conflicting-workflow-id" + + // Enqueue the same workflow ID on the first queue + handle1, err := RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-1", WithQueue(conflictQueue1.Name), WithWorkflowID(workflowID)) + if err != nil { + t.Fatalf("failed to enqueue workflow on first queue: %v", err) + } + + // Get the result from the first workflow to ensure it completes + result1, err := handle1.GetResult() + if err != nil { + t.Fatalf("failed to get result from first workflow: %v", err) + } + if result1 != "test-input-1" { + t.Fatalf("expected 'test-input-1', got %v", result1) + } + + // Now try to enqueue the same workflow ID on a different queue + // This should trigger a ConflictingWorkflowError + _, err = RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-2", WithQueue(conflictQueue2.Name), WithWorkflowID(workflowID)) + if err == nil { + t.Fatal("expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none") + } + + // Check that it's the correct error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != ConflictingWorkflowError { + t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code) + } + + // Check that the error message contains queue information + expectedMsgPart := "different queue" + if !strings.Contains(err.Error(), expectedMsgPart) { + t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error()) + } + + if !queueEntriesAreCleanedUp(dbosCtx) { + t.Fatal("expected queue entries to be cleaned up after conflicting workflow test") + } + }) } func TestQueueRecovery(t *testing.T) { diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index f7ace674..e7245a77 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -5,7 +5,7 @@ Test workflow and steps features [x] Wrapping various golang methods in DBOS workflows [x] workflow idempotency [x] workflow DLQ -[] workflow conflicting name +[x] workflow conflicting name [] workflow timeouts & deadlines (including child workflows) */ @@ -1902,6 +1902,153 @@ func getEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) ( return result, nil } +// Test workflows and steps for parameter mismatch validation +func conflictWorkflowA(dbosCtx DBOSContext, input string) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepA(ctx) + }) +} + +func conflictWorkflowB(dbosCtx DBOSContext, input string) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepB(ctx) + }) +} + +func conflictStepA(_ context.Context) (string, error) { + return "step-a-result", nil +} + +func conflictStepB(_ context.Context) (string, error) { + return "step-b-result", nil +} + +func workflowWithMultipleSteps(dbosCtx DBOSContext, input string) (string, error) { + // First step + result1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepA(ctx) + }) + if err != nil { + return "", err + } + + // Second step - this is where we'll test step name conflicts + result2, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepB(ctx) + }) + if err != nil { + return "", err + } + + return result1 + "-" + result2, nil +} + +func TestWorkflowExecutionMismatch(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Register workflows for testing + RegisterWorkflow(dbosCtx, conflictWorkflowA) + RegisterWorkflow(dbosCtx, conflictWorkflowB) + RegisterWorkflow(dbosCtx, workflowWithMultipleSteps) + + t.Run("WorkflowNameConflict", func(t *testing.T) { + workflowID := uuid.NewString() + + // First, run conflictWorkflowA with a specific workflow ID + handle1, err := RunAsWorkflow(dbosCtx, conflictWorkflowA, "test-input", WithWorkflowID(workflowID)) + if err != nil { + t.Fatalf("failed to start first workflow: %v", err) + } + + // Get the result to ensure it completes + result1, err := handle1.GetResult() + if err != nil { + t.Fatalf("failed to get result from first workflow: %v", err) + } + if result1 != "step-a-result" { + t.Fatalf("expected 'step-a-result', got '%s'", result1) + } + + // Now try to run conflictWorkflowB with the same workflow ID + // This should return a ConflictingWorkflowError + _, err = RunAsWorkflow(dbosCtx, conflictWorkflowB, "test-input", WithWorkflowID(workflowID)) + if err == nil { + t.Fatal("expected ConflictingWorkflowError when running different workflow with same ID, but got none") + } + + // Check that it's the correct error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != ConflictingWorkflowError { + t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code) + } + + // Check that the error message contains the workflow names + expectedMsgPart := "Workflow already exists with a different name" + if !strings.Contains(err.Error(), expectedMsgPart) { + t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error()) + } + }) + + t.Run("StepNameConflict", func(t *testing.T) { + // This test simulates a scenario where a workflow is recovered but + // the step implementation has changed, causing a step name mismatch + + // First, start a workflow and let it complete partially + handle1, err := RunAsWorkflow(dbosCtx, workflowWithMultipleSteps, "test-input") + if err != nil { + t.Fatalf("failed to start workflow: %v", err) + } + + // Complete the workflow + result, err := handle1.GetResult() + if err != nil { + t.Fatalf("failed to get result from workflow: %v", err) + } + if result != "step-a-result-step-b-result" { + t.Fatalf("expected 'step-a-result-step-b-result', got '%s'", result) + } + + // Now simulate what happens if we try to check operation execution + // with a different step name for the same step ID + workflowID := handle1.GetWorkflowID() + + // This directly tests the CheckOperationExecution method with mismatched step name + // We'll check step ID 0 (first step) but with wrong step name + wrongStepName := "wrong-step-name" + _, err = dbosCtx.(*dbosContext).systemDB.CheckOperationExecution(dbosCtx, checkOperationExecutionDBInput{ + workflowID: workflowID, + stepID: 0, + stepName: wrongStepName, + }) + + if err == nil { + t.Fatal("expected UnexpectedStep error when checking operation with wrong step name, but got none") + } + + // Check that it's the correct error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != UnexpectedStep { + t.Fatalf("expected error code to be UnexpectedStep, got %v", dbosErr.Code) + } + + // Check that the error message contains step information + if !strings.Contains(err.Error(), "Check that your workflow is deterministic") { + t.Fatalf("expected error message to contain 'Check that your workflow is deterministic', got '%s'", err.Error()) + } + if !strings.Contains(err.Error(), wrongStepName) { + t.Fatalf("expected error message to contain wrong step name '%s', got '%s'", wrongStepName, err.Error()) + } + }) +} + func TestSetGetEvent(t *testing.T) { dbosCtx := setupDBOS(t, true, true) From 071eac68c9be7a21137ecfc91ea6f724d026ef6d Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 11 Aug 2025 09:33:12 -0700 Subject: [PATCH 14/19] nit --- dbos/workflow.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index a05030b3..9a94acf4 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -495,9 +495,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o if registeredWorkflow.maxRetries > 0 { params.maxRetries = registeredWorkflow.maxRetries } - - // Use the custom workflow name if it was provided during registration - if registeredWorkflow.name != "" { + if len(registeredWorkflow.name) > 0 { params.workflowName = registeredWorkflow.name } From 1071108089acef16e5e0327536bea85a30d5807f Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 11 Aug 2025 09:33:39 -0700 Subject: [PATCH 15/19] test that running a recorded child workflow, from a parent, returns a polling handle --- dbos/workflows_test.go | 100 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index e7245a77..de69ef9b 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -798,6 +798,106 @@ func TestChildWorkflow(t *testing.T) { t.Fatalf("expected second step child workflow ID to be %s, got %s", customChildID, steps[1].ChildWorkflowID) } }) + + t.Run("RecoveredChildWorkflowPollingHandle", func(t *testing.T) { + pollingHandleStartEvent := NewEvent() + pollingHandleCompleteEvent := NewEvent() + knownChildID := "known-child-workflow-id" + knownParentID := "known-parent-workflow-id" + counter := 0 + + // Simple child workflow that returns a result + pollingHandleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { + // Signal the child workflow is started + pollingHandleStartEvent.Set() + // Wait + pollingHandleCompleteEvent.Wait() + return input + "-result", nil + } + RegisterWorkflow(dbosCtx, pollingHandleChildWf) + + pollingHandleParentWf := func(ctx DBOSContext, input string) (string, error) { + counter++ + + // Run child workflow with a known ID + childHandle, err := RunAsWorkflow(ctx, pollingHandleChildWf, "child-input", WithWorkflowID(knownChildID)) + if err != nil { + return "", fmt.Errorf("failed to run child workflow: %w", err) + } + + switch counter { + case 1: + // First handle will be a direct handle + _, ok := childHandle.(*workflowHandle[string]) + if !ok { + return "", fmt.Errorf("expected child handle to be of type workflowDirectHandle, got %T", childHandle) + } + case 2: + // Second handle will be a polling handle + _, ok := childHandle.(*workflowPollingHandle[string]) + if !ok { + return "", fmt.Errorf("expected recovered child handle to be of type workflowPollingHandle, got %T", childHandle) + } + } + + result, err := childHandle.GetResult() + if err != nil { + return "", fmt.Errorf("failed to get result from child workflow: %w", err) + } + return result, nil + } + RegisterWorkflow(dbosCtx, pollingHandleParentWf) + + // Execute parent workflow - it will block after starting the child + parentHandle, err := RunAsWorkflow(dbosCtx, pollingHandleParentWf, "parent-input", WithWorkflowID(knownParentID)) + if err != nil { + t.Fatalf("failed to start parent workflow: %v", err) + } + + // Wait for the child workflow to start + pollingHandleStartEvent.Wait() + + // Recover pending workflows - this should give us both parent and child handles + recoveredHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"}) + if err != nil { + t.Fatalf("failed to recover pending workflows: %v", err) + } + + // Should have recovered both parent and child workflows + if len(recoveredHandles) != 2 { + t.Fatalf("expected 2 recovered handles (parent and child), got %d", len(recoveredHandles)) + } + + // Find the child handle and verify it's a polling handle with the correct ID + var childRecoveredHandle WorkflowHandle[any] + for _, handle := range recoveredHandles { + if handle.GetWorkflowID() == knownChildID { + childRecoveredHandle = handle + break + } + } + + if childRecoveredHandle == nil { + t.Fatalf("failed to find recovered child workflow handle with ID %s", knownChildID) + } + + // Complete both workflows + pollingHandleCompleteEvent.Set() + result, err := parentHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from original parent workflow: %v", err) + } + if result != "child-input-result" { + t.Fatalf("expected result 'child-input-result', got '%s'", result) + } + childResult, err := childRecoveredHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from recovered child handle: %v", err) + } + if childResult != result { + t.Fatalf("expected child result '%s', got '%s'", result, childResult) + } + }) } // Idempotency workflows moved to test functions From a9a83a104b47a55bcd55d6d7a0ad7f88a17e44f0 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 11 Aug 2025 10:14:59 -0700 Subject: [PATCH 16/19] nits --- dbos/workflows_test.go | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index de69ef9b..c412943e 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -2055,18 +2055,18 @@ func TestWorkflowExecutionMismatch(t *testing.T) { workflowID := uuid.NewString() // First, run conflictWorkflowA with a specific workflow ID - handle1, err := RunAsWorkflow(dbosCtx, conflictWorkflowA, "test-input", WithWorkflowID(workflowID)) + handle, err := RunAsWorkflow(dbosCtx, conflictWorkflowA, "test-input", WithWorkflowID(workflowID)) if err != nil { t.Fatalf("failed to start first workflow: %v", err) } // Get the result to ensure it completes - result1, err := handle1.GetResult() + result, err := handle.GetResult() if err != nil { t.Fatalf("failed to get result from first workflow: %v", err) } - if result1 != "step-a-result" { - t.Fatalf("expected 'step-a-result', got '%s'", result1) + if result != "step-a-result" { + t.Fatalf("expected 'step-a-result', got '%s'", result) } // Now try to run conflictWorkflowB with the same workflow ID @@ -2094,17 +2094,11 @@ func TestWorkflowExecutionMismatch(t *testing.T) { }) t.Run("StepNameConflict", func(t *testing.T) { - // This test simulates a scenario where a workflow is recovered but - // the step implementation has changed, causing a step name mismatch - - // First, start a workflow and let it complete partially - handle1, err := RunAsWorkflow(dbosCtx, workflowWithMultipleSteps, "test-input") + handle, err := RunAsWorkflow(dbosCtx, workflowWithMultipleSteps, "test-input") if err != nil { t.Fatalf("failed to start workflow: %v", err) } - - // Complete the workflow - result, err := handle1.GetResult() + result, err := handle.GetResult() if err != nil { t.Fatalf("failed to get result from workflow: %v", err) } @@ -2112,12 +2106,10 @@ func TestWorkflowExecutionMismatch(t *testing.T) { t.Fatalf("expected 'step-a-result-step-b-result', got '%s'", result) } - // Now simulate what happens if we try to check operation execution - // with a different step name for the same step ID - workflowID := handle1.GetWorkflowID() + // Check operation execution with a different step name for the same step ID + workflowID := handle.GetWorkflowID() // This directly tests the CheckOperationExecution method with mismatched step name - // We'll check step ID 0 (first step) but with wrong step name wrongStepName := "wrong-step-name" _, err = dbosCtx.(*dbosContext).systemDB.CheckOperationExecution(dbosCtx, checkOperationExecutionDBInput{ workflowID: workflowID, From e9d5c4f53515c87ae1fc318ca89c38d716d3d1e0 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 11 Aug 2025 10:34:21 -0700 Subject: [PATCH 17/19] fix test + nits --- dbos/queues_test.go | 8 ++++---- dbos/workflows_test.go | 31 +++++++++++++++---------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index eaee7a2c..16f386a3 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -265,18 +265,18 @@ func TestWorkflowQueues(t *testing.T) { workflowID := "conflicting-workflow-id" // Enqueue the same workflow ID on the first queue - handle1, err := RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-1", WithQueue(conflictQueue1.Name), WithWorkflowID(workflowID)) + handle, err := RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-1", WithQueue(conflictQueue1.Name), WithWorkflowID(workflowID)) if err != nil { t.Fatalf("failed to enqueue workflow on first queue: %v", err) } // Get the result from the first workflow to ensure it completes - result1, err := handle1.GetResult() + result, err := handle.GetResult() if err != nil { t.Fatalf("failed to get result from first workflow: %v", err) } - if result1 != "test-input-1" { - t.Fatalf("expected 'test-input-1', got %v", result1) + if result != "test-input-1" { + t.Fatalf("expected 'test-input-1', got %v", result) } // Now try to enqueue the same workflow ID on a different queue diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index c412943e..ccc103ec 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -44,12 +44,6 @@ func simpleStep(_ context.Context) (string, error) { return "from step", nil } -func concurrentSimpleWorkflow(dbosCtx DBOSContext, input int) (int, error) { - return RunAsStep(dbosCtx, func(ctx context.Context) (int, error) { - return input * 2, nil - }) -} - func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } @@ -808,8 +802,6 @@ func TestChildWorkflow(t *testing.T) { // Simple child workflow that returns a result pollingHandleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { - // Signal the child workflow is started - pollingHandleStartEvent.Set() // Wait pollingHandleCompleteEvent.Wait() return input + "-result", nil @@ -840,6 +832,9 @@ func TestChildWorkflow(t *testing.T) { } } + // Signal the child workflow is started + pollingHandleStartEvent.Set() + result, err := childHandle.GetResult() if err != nil { return "", fmt.Errorf("failed to get result from child workflow: %w", err) @@ -854,7 +849,7 @@ func TestChildWorkflow(t *testing.T) { t.Fatalf("failed to start parent workflow: %v", err) } - // Wait for the child workflow to start + // Wait for the workflows to start pollingHandleStartEvent.Wait() // Recover pending workflows - this should give us both parent and child handles @@ -3056,6 +3051,12 @@ func sendRecvSenderWorkflow(ctx DBOSContext, pairID int) (string, error) { return "message-sent", nil } +func concurrentSimpleWorkflow(dbosCtx DBOSContext, input int) (int, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (int, error) { + return input * 2, nil + }) +} + func TestConcurrentWorkflows(t *testing.T) { dbosCtx := setupDBOS(t, true, true) RegisterWorkflow(dbosCtx, concurrentSimpleWorkflow) @@ -3084,6 +3085,11 @@ func TestConcurrentWorkflows(t *testing.T) { errors <- fmt.Errorf("failed to get result for workflow %d: %w", input, err) return } + expectedResult := input * 2 + if result != expectedResult { + errors <- fmt.Errorf("workflow %d: expected result %d, got %d", input, expectedResult, result) + return + } results <- result }(i) } @@ -3113,13 +3119,6 @@ func TestConcurrentWorkflows(t *testing.T) { if resultCount != numGoroutines { t.Fatalf("Expected %d results, got %d", numGoroutines, resultCount) } - - for i := range numGoroutines { - expectedResult := i * 2 - if !receivedResults[expectedResult] { - t.Errorf("Expected result %d not found", expectedResult) - } - } }) t.Run("NotificationWorkflows", func(t *testing.T) { From 04390cf32362f01385a29ad6e0e9c5d651149e4a Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 11 Aug 2025 10:34:26 -0700 Subject: [PATCH 18/19] use a sync.Map --- dbos/workflow.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 9a94acf4..5294582b 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -719,7 +719,12 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { BackoffFactor: 2.0, BaseInterval: 100 * time.Millisecond, // Default base interval MaxInterval: 5 * time.Second, // Default max interval - StepName: getTypeErasedStepName(stepName), + StepName: func() string { + if value, ok := typeErasedStepNameToStepName.Load(stepName); ok { + return value.(string) + } + return "" // This should never happen + }(), } } @@ -733,24 +738,17 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { if params.MaxInterval == 0 { params.MaxInterval = 5 * time.Second // Default max interval } - if params.StepName == "" { + if len(params.StepName) == 0 { // If the step name is not provided, use the function name - params.StepName = getTypeErasedStepName(stepName) + if value, ok := typeErasedStepNameToStepName.Load(stepName); ok { + params.StepName = value.(string) + } } return params } -var ( - typeErasedStepNameToStepName = make(map[string]string) - typeErasedStepNameMutex sync.RWMutex -) - -func getTypeErasedStepName(stepName string) string { - typeErasedStepNameMutex.RLock() - defer typeErasedStepNameMutex.RUnlock() - return typeErasedStepNameToStepName[stepName] -} +var typeErasedStepNameToStepName sync.Map func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { if ctx == nil { @@ -766,9 +764,7 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { // Type-erase the function typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) typeErasedFnName := runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name() - typeErasedStepNameMutex.Lock() - typeErasedStepNameToStepName[typeErasedFnName] = stepName - typeErasedStepNameMutex.Unlock() + typeErasedStepNameToStepName.LoadOrStore(typeErasedFnName, stepName) // Call the executor method and pass through the result/error result, err := ctx.RunAsStep(ctx, typeErasedFn) From 2960ccea434387dbfdf27a2fc5548195c82cc8d7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 11 Aug 2025 10:44:55 -0700 Subject: [PATCH 19/19] for now return a flat error if we try to enqueue a workflow in a different queu --- dbos/queues_test.go | 2 +- dbos/system_database.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 16f386a3..37ad6304 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -297,7 +297,7 @@ func TestWorkflowQueues(t *testing.T) { } // Check that the error message contains queue information - expectedMsgPart := "different queue" + expectedMsgPart := "Workflow already exists in a different queue" if !strings.Contains(err.Error(), expectedMsgPart) { t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error()) } diff --git a/dbos/system_database.go b/dbos/system_database.go index e873ef94..a71cd658 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -380,7 +380,7 @@ func (s *systemDatabase) InsertWorkflowStatus(ctx context.Context, input insertW return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists with a different name: %s, but the provided name is: %s", result.name, input.status.Name)) } if len(input.status.QueueName) > 0 && result.queueName != nil && input.status.QueueName != *result.queueName { - s.logger.Warn("Queue name conflict for workflow", "workflow_id", input.status.ID, "result_queue", *result.queueName, "status_queue", input.status.QueueName) + return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists in a different queue: %s, but the provided queue is: %s", *result.queueName, input.status.QueueName)) } // Every time we start executing a workflow (and thus attempt to insert its status), we increment `recovery_attempts` by 1.