diff --git a/dbos/queues_test.go b/dbos/queues_test.go index eda6dd57..37ad6304 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "strings" "sync/atomic" "testing" "time" @@ -23,6 +24,7 @@ This suite tests [x] worker concurrency (2 at a time across two "workers") [x] worker concurrency X recovery [x] rate limiter +[x] conflicting workflow on different queues [] queue deduplication [] queue priority [x] queued workflow times out @@ -48,6 +50,8 @@ func TestWorkflowQueues(t *testing.T) { queue := NewWorkflowQueue(dbosCtx, "test-queue") dlqEnqueueQueue := NewWorkflowQueue(dbosCtx, "test-successive-enqueue-queue") + conflictQueue1 := NewWorkflowQueue(dbosCtx, "conflict-queue-1") + conflictQueue2 := NewWorkflowQueue(dbosCtx, "conflict-queue-2") dlqStartEvent := NewEvent() dlqCompleteEvent := NewEvent() @@ -172,14 +176,15 @@ func TestWorkflowQueues(t *testing.T) { } }) - /* TODO: we will move queue registry in the new interface in a subsequent PR t.Run("DynamicRegistration", func(t *testing.T) { - q := NewWorkflowQueue("dynamic-queue") - if len(q.name) > 0 { - t.Fatalf("expected nil queue for dynamic registration after DBOS initialization, got %v", q) - } + // Attempting to register a queue after launch should panic + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic from queue registration after launch but got none") + } + }() + NewWorkflowQueue(dbosCtx, "dynamic-queue") }) - */ t.Run("QueueWorkflowDLQ", func(t *testing.T) { workflowID := "blocking-workflow-test" @@ -255,6 +260,52 @@ func TestWorkflowQueues(t *testing.T) { t.Fatal("expected queue entries to be cleaned up after successive enqueues test") } }) + + t.Run("ConflictingWorkflowOnDifferentQueues", func(t *testing.T) { + workflowID := "conflicting-workflow-id" + + // Enqueue the same workflow ID on the first queue + handle, err := RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-1", WithQueue(conflictQueue1.Name), WithWorkflowID(workflowID)) + if err != nil { + t.Fatalf("failed to enqueue workflow on first queue: %v", err) + } + + // Get the result from the first workflow to ensure it completes + result, err := handle.GetResult() + if err != nil { + t.Fatalf("failed to get result from first workflow: %v", err) + } + if result != "test-input-1" { + t.Fatalf("expected 'test-input-1', got %v", result) + } + + // Now try to enqueue the same workflow ID on a different queue + // This should trigger a ConflictingWorkflowError + _, err = RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-2", WithQueue(conflictQueue2.Name), WithWorkflowID(workflowID)) + if err == nil { + t.Fatal("expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none") + } + + // Check that it's the correct error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != ConflictingWorkflowError { + t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code) + } + + // Check that the error message contains queue information + expectedMsgPart := "Workflow already exists in a different queue" + if !strings.Contains(err.Error(), expectedMsgPart) { + t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error()) + } + + if !queueEntriesAreCleanedUp(dbosCtx) { + t.Fatal("expected queue entries to be cleaned up after conflicting workflow test") + } + }) } func TestQueueRecovery(t *testing.T) { diff --git a/dbos/system_database.go b/dbos/system_database.go index e873ef94..a71cd658 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -380,7 +380,7 @@ func (s *systemDatabase) InsertWorkflowStatus(ctx context.Context, input insertW return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists with a different name: %s, but the provided name is: %s", result.name, input.status.Name)) } if len(input.status.QueueName) > 0 && result.queueName != nil && input.status.QueueName != *result.queueName { - s.logger.Warn("Queue name conflict for workflow", "workflow_id", input.status.ID, "result_queue", *result.queueName, "status_queue", input.status.QueueName) + return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists in a different queue: %s, but the provided queue is: %s", *result.queueName, input.status.QueueName)) } // Every time we start executing a workflow (and thus attempt to insert its status), we increment `recovery_attempts` by 1. diff --git a/dbos/workflow.go b/dbos/workflow.go index a3eac4a2..5294582b 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -8,6 +8,7 @@ import ( "math" "reflect" "runtime" + "sync" "time" "github.com/google/uuid" @@ -204,10 +205,11 @@ type WrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption type workflowRegistryEntry struct { wrappedFunction WrappedWorkflowFunc maxRetries int + name string } // Register adds a workflow function to the registry (thread-safe, only once per name) -func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFunc, maxRetries int) { +func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFunc, maxRetries int, customName string) { // Skip if we don't have a concrete dbosContext c, ok := ctx.(*dbosContext) if !ok { @@ -221,14 +223,15 @@ func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFu c.workflowRegMutex.Lock() defer c.workflowRegMutex.Unlock() - if _, exists := c.workflowRegistry[workflowName]; exists { - c.logger.Error("workflow function already registered", "fqn", workflowName) - panic(newConflictingRegistrationError(workflowName)) + if _, exists := c.workflowRegistry[workflowFQN]; exists { + c.logger.Error("workflow function already registered", "fqn", workflowFQN) + panic(newConflictingRegistrationError(workflowFQN)) } - c.workflowRegistry[workflowName] = workflowRegistryEntry{ + c.workflowRegistry[workflowFQN] = workflowRegistryEntry{ wrappedFunction: fn, maxRetries: maxRetries, + name: customName, } } @@ -274,6 +277,7 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow type workflowRegistrationParams struct { cronSchedule string maxRetries int + name string } type workflowRegistrationOption func(*workflowRegistrationParams) @@ -294,6 +298,12 @@ func WithSchedule(schedule string) workflowRegistrationOption { } } +func WithWorkflowName(name string) workflowRegistrationOption { + return func(p *workflowRegistrationParams) { + p.name = name + } +} + // RegisterWorkflow registers the provided function as a durable workflow with the provided DBOSContext workflow registry // If the workflow is a scheduled workflow (determined by the presence of a cron schedule), it will also register a cron job to execute it // RegisterWorkflow is generically typed, providing compile-time type checking and allowing us to register the workflow input and output types for gob encoding @@ -347,7 +357,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R } return &workflowPollingHandle[any]{workflowID: handle.GetWorkflowID(), dbosContext: ctx}, nil // this is only used by recovery and queue runner so far -- queue runner dismisses it }) - registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries) + registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries, registrationParams.name) // If this is a scheduled workflow, register a cron job if registrationParams.cronSchedule != "" { @@ -397,6 +407,7 @@ func WithApplicationVersion(version string) WorkflowOption { } } +// An internal option we use to map the reflection function name to the registration options. func withWorkflowName(name string) WorkflowOption { return func(p *workflowParams) { p.workflowName = name @@ -484,6 +495,9 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o if registeredWorkflow.maxRetries > 0 { params.maxRetries = registeredWorkflow.maxRetries } + if len(registeredWorkflow.name) > 0 { + params.workflowName = registeredWorkflow.name + } // Check if we are within a workflow (and thus a child workflow) parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState) @@ -705,7 +719,12 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { BackoffFactor: 2.0, BaseInterval: 100 * time.Millisecond, // Default base interval MaxInterval: 5 * time.Second, // Default max interval - StepName: typeErasedStepNameToStepName[stepName], + StepName: func() string { + if value, ok := typeErasedStepNameToStepName.Load(stepName); ok { + return value.(string) + } + return "" // This should never happen + }(), } } @@ -719,15 +738,17 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams { if params.MaxInterval == 0 { params.MaxInterval = 5 * time.Second // Default max interval } - if params.StepName == "" { + if len(params.StepName) == 0 { // If the step name is not provided, use the function name - params.StepName = typeErasedStepNameToStepName[stepName] + if value, ok := typeErasedStepNameToStepName.Load(stepName); ok { + params.StepName = value.(string) + } } return params } -var typeErasedStepNameToStepName = make(map[string]string) +var typeErasedStepNameToStepName sync.Map func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { if ctx == nil { @@ -742,7 +763,8 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) { // Type-erase the function typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) - typeErasedStepNameToStepName[runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()] = stepName + typeErasedFnName := runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name() + typeErasedStepNameToStepName.LoadOrStore(typeErasedFnName, stepName) // Call the executor method and pass through the result/error result, err := ctx.RunAsStep(ctx, typeErasedFn) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 45112b5f..ccc103ec 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -5,7 +5,7 @@ Test workflow and steps features [x] Wrapping various golang methods in DBOS workflows [x] workflow idempotency [x] workflow DLQ -[] workflow conflicting name +[x] workflow conflicting name [] workflow timeouts & deadlines (including child workflows) */ @@ -142,11 +142,11 @@ func TestWorkflowsRegistration(t *testing.T) { result, err := handle.GetResult() _, err2 := handle.GetResult() if err2 == nil { - t.Fatal("Second call to GetResult should return an error") + return nil, fmt.Errorf("Second call to GetResult should return an error") } expectedErrorMsg := "workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?" if err2.Error() != expectedErrorMsg { - t.Fatal("Unexpected error message:", err2, "expected:", expectedErrorMsg) + return nil, fmt.Errorf("Unexpected error message: %v, expected: %s", err2, expectedErrorMsg) } return result, err }, @@ -294,6 +294,74 @@ func TestWorkflowsRegistration(t *testing.T) { } }) } + + t.Run("DoubleRegistrationWithoutName", func(t *testing.T) { + // Create a fresh DBOS context for this test + freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB + + // First registration should work + RegisterWorkflow(freshCtx, simpleWorkflow) + + // Second registration of the same workflow should panic with ConflictingRegistrationError + defer func() { + r := recover() + if r == nil { + t.Fatal("expected panic from double registration but got none") + } + dbosErr, ok := r.(*DBOSError) + if !ok { + t.Fatalf("expected panic to be *DBOSError, got %T", r) + } + if dbosErr.Code != ConflictingRegistrationError { + t.Fatalf("expected ConflictingRegistrationError, got %v", dbosErr.Code) + } + }() + RegisterWorkflow(freshCtx, simpleWorkflow) + }) + + t.Run("DoubleRegistrationWithCustomName", func(t *testing.T) { + // Create a fresh DBOS context for this test + freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB + + // First registration with custom name should work + RegisterWorkflow(freshCtx, simpleWorkflow, WithWorkflowName("custom-workflow")) + + // Second registration with same custom name should panic with ConflictingRegistrationError + defer func() { + r := recover() + if r == nil { + t.Fatal("expected panic from double registration with custom name but got none") + } + dbosErr, ok := r.(*DBOSError) + if !ok { + t.Fatalf("expected panic to be *DBOSError, got %T", r) + } + if dbosErr.Code != ConflictingRegistrationError { + t.Fatalf("expected ConflictingRegistrationError, got %v", dbosErr.Code) + } + }() + RegisterWorkflow(freshCtx, simpleWorkflow, WithWorkflowName("custom-workflow")) + }) + + t.Run("RegisterAfterLaunchPanics", func(t *testing.T) { + // Create a fresh DBOS context for this test + freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB + + // Launch DBOS context + err := freshCtx.Launch() + if err != nil { + t.Fatalf("failed to launch DBOS context: %v", err) + } + defer freshCtx.Cancel() + + // Attempting to register after launch should panic + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic from registration after launch but got none") + } + }() + RegisterWorkflow(freshCtx, simpleWorkflow) + }) } func stepWithinAStep(ctx context.Context) (string, error) { @@ -382,7 +450,7 @@ func TestSteps(t *testing.T) { t.Fatalf("expected result 'from step', got '%s'", result) } - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) if err != nil { t.Fatal("failed to list steps:", err) } @@ -437,7 +505,7 @@ func TestSteps(t *testing.T) { } // Verify that the failed step was still recorded in the database - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) if err != nil { t.Fatal("failed to get workflow steps:", err) } @@ -694,7 +762,7 @@ func TestChildWorkflow(t *testing.T) { } // Verify the child workflow was recorded as step 0 - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), parentHandle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, parentHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get workflow steps: %v", err) } @@ -724,6 +792,107 @@ func TestChildWorkflow(t *testing.T) { t.Fatalf("expected second step child workflow ID to be %s, got %s", customChildID, steps[1].ChildWorkflowID) } }) + + t.Run("RecoveredChildWorkflowPollingHandle", func(t *testing.T) { + pollingHandleStartEvent := NewEvent() + pollingHandleCompleteEvent := NewEvent() + knownChildID := "known-child-workflow-id" + knownParentID := "known-parent-workflow-id" + counter := 0 + + // Simple child workflow that returns a result + pollingHandleChildWf := func(dbosCtx DBOSContext, input string) (string, error) { + // Wait + pollingHandleCompleteEvent.Wait() + return input + "-result", nil + } + RegisterWorkflow(dbosCtx, pollingHandleChildWf) + + pollingHandleParentWf := func(ctx DBOSContext, input string) (string, error) { + counter++ + + // Run child workflow with a known ID + childHandle, err := RunAsWorkflow(ctx, pollingHandleChildWf, "child-input", WithWorkflowID(knownChildID)) + if err != nil { + return "", fmt.Errorf("failed to run child workflow: %w", err) + } + + switch counter { + case 1: + // First handle will be a direct handle + _, ok := childHandle.(*workflowHandle[string]) + if !ok { + return "", fmt.Errorf("expected child handle to be of type workflowDirectHandle, got %T", childHandle) + } + case 2: + // Second handle will be a polling handle + _, ok := childHandle.(*workflowPollingHandle[string]) + if !ok { + return "", fmt.Errorf("expected recovered child handle to be of type workflowPollingHandle, got %T", childHandle) + } + } + + // Signal the child workflow is started + pollingHandleStartEvent.Set() + + result, err := childHandle.GetResult() + if err != nil { + return "", fmt.Errorf("failed to get result from child workflow: %w", err) + } + return result, nil + } + RegisterWorkflow(dbosCtx, pollingHandleParentWf) + + // Execute parent workflow - it will block after starting the child + parentHandle, err := RunAsWorkflow(dbosCtx, pollingHandleParentWf, "parent-input", WithWorkflowID(knownParentID)) + if err != nil { + t.Fatalf("failed to start parent workflow: %v", err) + } + + // Wait for the workflows to start + pollingHandleStartEvent.Wait() + + // Recover pending workflows - this should give us both parent and child handles + recoveredHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"}) + if err != nil { + t.Fatalf("failed to recover pending workflows: %v", err) + } + + // Should have recovered both parent and child workflows + if len(recoveredHandles) != 2 { + t.Fatalf("expected 2 recovered handles (parent and child), got %d", len(recoveredHandles)) + } + + // Find the child handle and verify it's a polling handle with the correct ID + var childRecoveredHandle WorkflowHandle[any] + for _, handle := range recoveredHandles { + if handle.GetWorkflowID() == knownChildID { + childRecoveredHandle = handle + break + } + } + + if childRecoveredHandle == nil { + t.Fatalf("failed to find recovered child workflow handle with ID %s", knownChildID) + } + + // Complete both workflows + pollingHandleCompleteEvent.Set() + result, err := parentHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from original parent workflow: %v", err) + } + if result != "child-input-result" { + t.Fatalf("expected result 'child-input-result', got '%s'", result) + } + childResult, err := childRecoveredHandle.GetResult() + if err != nil { + t.Fatalf("failed to get result from recovered child handle: %v", err) + } + if childResult != result { + t.Fatalf("expected child result '%s', got '%s'", result, childResult) + } + }) } // Idempotency workflows moved to test functions @@ -1329,6 +1498,40 @@ func TestSendRecv(t *testing.T) { if time.Since(start) > 10*time.Second { t.Fatalf("receive workflow took too long to complete, expected < 5s, got %v", time.Since(start)) } + + // Verify step counting for send workflow (sendWorkflow calls Send 3 times) + sendSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for send workflow: %v", err) + } + if len(sendSteps) != 3 { + t.Fatalf("expected 3 steps in send workflow (3 Send calls), got %d", len(sendSteps)) + } + for i, step := range sendSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.send" { + t.Fatalf("expected step %d to have StepName 'DBOS.send', got '%s'", i, step.StepName) + } + } + + // Verify step counting for receive workflow (receiveWorkflow calls Recv 3 times) + receiveSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for receive workflow: %v", err) + } + if len(receiveSteps) != 3 { + t.Fatalf("expected 3 steps in receive workflow (3 Recv calls), got %d", len(receiveSteps)) + } + for i, step := range receiveSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.recv" { + t.Fatalf("expected step %d to have StepName 'DBOS.recv', got '%s'", i, step.StepName) + } + } }) t.Run("SendRecvCustomStruct", func(t *testing.T) { @@ -1362,6 +1565,36 @@ func TestSendRecv(t *testing.T) { if result.Value != "test-struct-value" { t.Fatalf("expected received struct value to be 'test-struct-value', got '%s'", result.Value) } + + // Verify step counting for sendStructWorkflow (calls Send 1 time) + sendSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for send struct workflow: %v", err) + } + if len(sendSteps) != 1 { + t.Fatalf("expected 1 step in send struct workflow (1 Send call), got %d", len(sendSteps)) + } + if sendSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", sendSteps[0].StepID) + } + if sendSteps[0].StepName != "DBOS.send" { + t.Fatalf("expected step to have StepName 'DBOS.send', got '%s'", sendSteps[0].StepName) + } + + // Verify step counting for receiveStructWorkflow (calls Recv 1 time) + receiveSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for receive struct workflow: %v", err) + } + if len(receiveSteps) != 1 { + t.Fatalf("expected 1 step in receive struct workflow (1 Recv call), got %d", len(receiveSteps)) + } + if receiveSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", receiveSteps[0].StepID) + } + if receiveSteps[0].StepName != "DBOS.recv" { + t.Fatalf("expected step to have StepName 'DBOS.recv', got '%s'", receiveSteps[0].StepName) + } }) t.Run("SendToNonExistentUUID", func(t *testing.T) { @@ -1463,6 +1696,23 @@ func TestSendRecv(t *testing.T) { if result != "message1-message2-message3" { t.Fatalf("expected result to be 'message1-message2-message3', got '%s'", result) } + + // Verify step counting for receive workflow (calls Recv 3 times) + receiveSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for receive workflow: %v", err) + } + if len(receiveSteps) != 3 { + t.Fatalf("expected 3 steps in receive workflow (3 Recv calls), got %d", len(receiveSteps)) + } + for i, step := range receiveSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.recv" { + t.Fatalf("expected step %d to have StepName 'DBOS.recv', got '%s'", i, step.StepName) + } + } }) t.Run("SendRecvIdempotency", func(t *testing.T) { // Start the receive workflow and wait for it to be ready @@ -1491,20 +1741,32 @@ func TestSendRecv(t *testing.T) { if len(recoveredHandles) != 2 { t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles)) } - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), sendHandle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get workflow steps: %v", err) } if len(steps) != 1 { t.Fatalf("expected 1 step in send idempotency workflow, got %d", len(steps)) } - steps, err = dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID()) + if steps[0].StepID != 0 { + t.Fatalf("expected send idempotency step to have StepID 0, got %d", steps[0].StepID) + } + if steps[0].StepName != "DBOS.send" { + t.Fatalf("expected send idempotency step to have StepName 'DBOS.send', got '%s'", steps[0].StepName) + } + steps, err = dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for receive idempotency workflow: %v", err) } if len(steps) != 1 { t.Fatalf("expected 1 step in receive idempotency workflow, got %d", len(steps)) } + if steps[0].StepID != 0 { + t.Fatalf("expected receive idempotency step to have StepID 0, got %d", steps[0].StepID) + } + if steps[0].StepName != "DBOS.recv" { + t.Fatalf("expected receive idempotency step to have StepName 'DBOS.recv', got '%s'", steps[0].StepName) + } // Unblock the workflows to complete receiveIdempotencyStopEvent.Set() @@ -1735,6 +1997,145 @@ func getEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) ( return result, nil } +// Test workflows and steps for parameter mismatch validation +func conflictWorkflowA(dbosCtx DBOSContext, input string) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepA(ctx) + }) +} + +func conflictWorkflowB(dbosCtx DBOSContext, input string) (string, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepB(ctx) + }) +} + +func conflictStepA(_ context.Context) (string, error) { + return "step-a-result", nil +} + +func conflictStepB(_ context.Context) (string, error) { + return "step-b-result", nil +} + +func workflowWithMultipleSteps(dbosCtx DBOSContext, input string) (string, error) { + // First step + result1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepA(ctx) + }) + if err != nil { + return "", err + } + + // Second step - this is where we'll test step name conflicts + result2, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return conflictStepB(ctx) + }) + if err != nil { + return "", err + } + + return result1 + "-" + result2, nil +} + +func TestWorkflowExecutionMismatch(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Register workflows for testing + RegisterWorkflow(dbosCtx, conflictWorkflowA) + RegisterWorkflow(dbosCtx, conflictWorkflowB) + RegisterWorkflow(dbosCtx, workflowWithMultipleSteps) + + t.Run("WorkflowNameConflict", func(t *testing.T) { + workflowID := uuid.NewString() + + // First, run conflictWorkflowA with a specific workflow ID + handle, err := RunAsWorkflow(dbosCtx, conflictWorkflowA, "test-input", WithWorkflowID(workflowID)) + if err != nil { + t.Fatalf("failed to start first workflow: %v", err) + } + + // Get the result to ensure it completes + result, err := handle.GetResult() + if err != nil { + t.Fatalf("failed to get result from first workflow: %v", err) + } + if result != "step-a-result" { + t.Fatalf("expected 'step-a-result', got '%s'", result) + } + + // Now try to run conflictWorkflowB with the same workflow ID + // This should return a ConflictingWorkflowError + _, err = RunAsWorkflow(dbosCtx, conflictWorkflowB, "test-input", WithWorkflowID(workflowID)) + if err == nil { + t.Fatal("expected ConflictingWorkflowError when running different workflow with same ID, but got none") + } + + // Check that it's the correct error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != ConflictingWorkflowError { + t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code) + } + + // Check that the error message contains the workflow names + expectedMsgPart := "Workflow already exists with a different name" + if !strings.Contains(err.Error(), expectedMsgPart) { + t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error()) + } + }) + + t.Run("StepNameConflict", func(t *testing.T) { + handle, err := RunAsWorkflow(dbosCtx, workflowWithMultipleSteps, "test-input") + if err != nil { + t.Fatalf("failed to start workflow: %v", err) + } + result, err := handle.GetResult() + if err != nil { + t.Fatalf("failed to get result from workflow: %v", err) + } + if result != "step-a-result-step-b-result" { + t.Fatalf("expected 'step-a-result-step-b-result', got '%s'", result) + } + + // Check operation execution with a different step name for the same step ID + workflowID := handle.GetWorkflowID() + + // This directly tests the CheckOperationExecution method with mismatched step name + wrongStepName := "wrong-step-name" + _, err = dbosCtx.(*dbosContext).systemDB.CheckOperationExecution(dbosCtx, checkOperationExecutionDBInput{ + workflowID: workflowID, + stepID: 0, + stepName: wrongStepName, + }) + + if err == nil { + t.Fatal("expected UnexpectedStep error when checking operation with wrong step name, but got none") + } + + // Check that it's the correct error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != UnexpectedStep { + t.Fatalf("expected error code to be UnexpectedStep, got %v", dbosErr.Code) + } + + // Check that the error message contains step information + if !strings.Contains(err.Error(), "Check that your workflow is deterministic") { + t.Fatalf("expected error message to contain 'Check that your workflow is deterministic', got '%s'", err.Error()) + } + if !strings.Contains(err.Error(), wrongStepName) { + t.Fatalf("expected error message to contain wrong step name '%s', got '%s'", wrongStepName, err.Error()) + } + }) +} + func TestSetGetEvent(t *testing.T) { dbosCtx := setupDBOS(t, true, true) @@ -1805,6 +2206,53 @@ func TestSetGetEvent(t *testing.T) { if result != "two-events-set" { t.Fatalf("expected result to be 'two-events-set', got '%s'", result) } + + // Verify step counting for setTwoEventsWorkflow (calls SetEvent 2 times) + setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for set two events workflow: %v", err) + } + if len(setSteps) != 2 { + t.Fatalf("expected 2 steps in set two events workflow (2 SetEvent calls), got %d", len(setSteps)) + } + for i, step := range setSteps { + if step.StepID != i { + t.Fatalf("expected step %d to have StepID %d, got %d", i, i, step.StepID) + } + if step.StepName != "DBOS.setEvent" { + t.Fatalf("expected step %d to have StepName 'DBOS.setEvent', got '%s'", i, step.StepName) + } + } + + // Verify step counting for getFirstEventHandle (calls GetEvent 1 time) + getFirstSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, getFirstEventHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for get first event workflow: %v", err) + } + if len(getFirstSteps) != 1 { + t.Fatalf("expected 1 step in get first event workflow (1 GetEvent call), got %d", len(getFirstSteps)) + } + if getFirstSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", getFirstSteps[0].StepID) + } + if getFirstSteps[0].StepName != "DBOS.getEvent" { + t.Fatalf("expected step to have StepName 'DBOS.getEvent', got '%s'", getFirstSteps[0].StepName) + } + + // Verify step counting for getSecondEventHandle (calls GetEvent 1 time) + getSecondSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, getSecondEventHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for get second event workflow: %v", err) + } + if len(getSecondSteps) != 1 { + t.Fatalf("expected 1 step in get second event workflow (1 GetEvent call), got %d", len(getSecondSteps)) + } + if getSecondSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", getSecondSteps[0].StepID) + } + if getSecondSteps[0].StepName != "DBOS.getEvent" { + t.Fatalf("expected step to have StepName 'DBOS.getEvent', got '%s'", getSecondSteps[0].StepName) + } }) t.Run("GetEventFromOutsideWorkflow", func(t *testing.T) { @@ -1835,6 +2283,21 @@ func TestSetGetEvent(t *testing.T) { if message != "test-message" { t.Fatalf("expected received message to be 'test-message', got '%s'", message) } + + // Verify step counting for setEventWorkflow (calls SetEvent 1 time) + setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get workflow steps for set event workflow: %v", err) + } + if len(setSteps) != 1 { + t.Fatalf("expected 1 step in set event workflow (1 SetEvent call), got %d", len(setSteps)) + } + if setSteps[0].StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", setSteps[0].StepID) + } + if setSteps[0].StepName != "DBOS.setEvent" { + t.Fatalf("expected step to have StepName 'DBOS.setEvent', got '%s'", setSteps[0].StepName) + } }) t.Run("GetEventTimeout", func(t *testing.T) { @@ -1939,21 +2402,33 @@ func TestSetGetEvent(t *testing.T) { setEventStartIdempotencyEvent.Wait() // Verify step counts - setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), setHandle.GetWorkflowID()) + setSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for set event idempotency workflow: %v", err) } if len(setSteps) != 1 { t.Fatalf("expected 1 step in set event idempotency workflow, got %d", len(setSteps)) } + if setSteps[0].StepID != 0 { + t.Fatalf("expected set event idempotency step to have StepID 0, got %d", setSteps[0].StepID) + } + if setSteps[0].StepName != "DBOS.setEvent" { + t.Fatalf("expected set event idempotency step to have StepName 'DBOS.setEvent', got '%s'", setSteps[0].StepName) + } - getSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), getHandle.GetWorkflowID()) + getSteps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, getHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for get event idempotency workflow: %v", err) } if len(getSteps) != 1 { t.Fatalf("expected 1 step in get event idempotency workflow, got %d", len(getSteps)) } + if getSteps[0].StepID != 0 { + t.Fatalf("expected get event idempotency step to have StepID 0, got %d", getSteps[0].StepID) + } + if getSteps[0].StepName != "DBOS.getEvent" { + t.Fatalf("expected get event idempotency step to have StepName 'DBOS.getEvent', got '%s'", getSteps[0].StepName) + } // Complete the workflows setEvenStopIdempotencyEvent.Set() @@ -2097,7 +2572,7 @@ func TestSleep(t *testing.T) { } // Verify the sleep step was recorded correctly - steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) + steps, err := dbosCtx.(*dbosContext).systemDB.GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get workflow steps: %v", err) } @@ -2107,6 +2582,9 @@ func TestSleep(t *testing.T) { } step := steps[0] + if step.StepID != 0 { + t.Fatalf("expected step to have StepID 0, got %d", step.StepID) + } if step.StepName != "DBOS.sleep" { t.Fatalf("expected step name to be 'DBOS.sleep', got '%s'", step.StepName) } @@ -2221,7 +2699,7 @@ func TestWorkflowTimeout(t *testing.T) { // This step will trigger cancellation of the entire workflow context <-ctx.Done() if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) { - t.Fatalf("step was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err()) + return "", fmt.Errorf("step was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err()) } return "", ctx.Err() } @@ -2526,3 +3004,328 @@ func TestWorkflowTimeout(t *testing.T) { } }) } + +func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) { + result, err := GetEvent[string](ctx, WorkflowGetEventInput{ + TargetWorkflowID: fmt.Sprintf("notification-setter-%d", pairID), + Key: "event-key", + Timeout: 10 * time.Second, + }) + if err != nil { + return "", err + } + return result, nil +} + +func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) { + err := SetEvent(ctx, WorkflowSetEventInputGeneric[string]{ + Key: "event-key", + Message: fmt.Sprintf("notification-message-%d", pairID), + }) + if err != nil { + return "", err + } + return "event-set", nil +} + +func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) { + result, err := Recv[string](ctx, WorkflowRecvInput{ + Topic: "send-recv-topic", + Timeout: 10 * time.Second, + }) + if err != nil { + return "", err + } + return result, nil +} + +func sendRecvSenderWorkflow(ctx DBOSContext, pairID int) (string, error) { + err := Send(ctx, WorkflowSendInput[string]{ + DestinationID: fmt.Sprintf("send-recv-receiver-%d", pairID), + Topic: "send-recv-topic", + Message: fmt.Sprintf("send-recv-message-%d", pairID), + }) + if err != nil { + return "", err + } + return "message-sent", nil +} + +func concurrentSimpleWorkflow(dbosCtx DBOSContext, input int) (int, error) { + return RunAsStep(dbosCtx, func(ctx context.Context) (int, error) { + return input * 2, nil + }) +} + +func TestConcurrentWorkflows(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + RegisterWorkflow(dbosCtx, concurrentSimpleWorkflow) + RegisterWorkflow(dbosCtx, notificationWaiterWorkflow) + RegisterWorkflow(dbosCtx, notificationSetterWorkflow) + RegisterWorkflow(dbosCtx, sendRecvReceiverWorkflow) + RegisterWorkflow(dbosCtx, sendRecvSenderWorkflow) + + t.Run("SimpleWorkflow", func(t *testing.T) { + const numGoroutines = 500 + var wg sync.WaitGroup + results := make(chan int, numGoroutines) + errors := make(chan error, numGoroutines) + + wg.Add(numGoroutines) + for i := range numGoroutines { + go func(input int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, concurrentSimpleWorkflow, input) + if err != nil { + errors <- fmt.Errorf("failed to start workflow %d: %w", input, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for workflow %d: %w", input, err) + return + } + expectedResult := input * 2 + if result != expectedResult { + errors <- fmt.Errorf("workflow %d: expected result %d, got %d", input, expectedResult, result) + return + } + results <- result + }(i) + } + + wg.Wait() + close(results) + close(errors) + + if len(errors) > 0 { + for err := range errors { + t.Errorf("Workflow error: %v", err) + } + t.Fatalf("Expected no errors from concurrent workflows, got %d errors", len(errors)) + } + + resultCount := 0 + receivedResults := make(map[int]bool) + for result := range results { + resultCount++ + if result < 0 || result >= numGoroutines*2 || result%2 != 0 { + t.Errorf("Unexpected result %d", result) + } else { + receivedResults[result] = true + } + } + + if resultCount != numGoroutines { + t.Fatalf("Expected %d results, got %d", numGoroutines, resultCount) + } + }) + + t.Run("NotificationWorkflows", func(t *testing.T) { + const numPairs = 500 + var wg sync.WaitGroup + waiterResults := make(chan string, numPairs) + setterResults := make(chan string, numPairs) + errors := make(chan error, numPairs*2) + + wg.Add(numPairs * 2) + + for i := range numPairs { + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, notificationSetterWorkflow, pairID, WithWorkflowID(fmt.Sprintf("notification-setter-%d", pairID))) + if err != nil { + errors <- fmt.Errorf("failed to start setter workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for setter workflow %d: %w", pairID, err) + return + } + setterResults <- result + }(i) + + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, notificationWaiterWorkflow, pairID) + if err != nil { + errors <- fmt.Errorf("failed to start waiter workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for waiter workflow %d: %w", pairID, err) + return + } + expectedMessage := fmt.Sprintf("notification-message-%d", pairID) + if result != expectedMessage { + errors <- fmt.Errorf("waiter workflow %d: expected message '%s', got '%s'", pairID, expectedMessage, result) + return + } + waiterResults <- result + }(i) + } + + wg.Wait() + close(waiterResults) + close(setterResults) + close(errors) + + if len(errors) > 0 { + for err := range errors { + t.Errorf("Workflow error: %v", err) + } + t.Fatalf("Expected no errors from notification workflows, got %d errors", len(errors)) + } + + waiterCount := 0 + receivedWaiterResults := make(map[string]bool) + for result := range waiterResults { + waiterCount++ + receivedWaiterResults[result] = true + } + + setterCount := 0 + for result := range setterResults { + setterCount++ + if result != "event-set" { + t.Errorf("Expected setter result to be 'event-set', got '%s'", result) + } + } + + if waiterCount != numPairs { + t.Fatalf("Expected %d waiter results, got %d", numPairs, waiterCount) + } + + if setterCount != numPairs { + t.Fatalf("Expected %d setter results, got %d", numPairs, setterCount) + } + + for i := range numPairs { + expectedWaiterResult := fmt.Sprintf("notification-message-%d", i) + if !receivedWaiterResults[expectedWaiterResult] { + t.Errorf("Expected waiter result '%s' not found", expectedWaiterResult) + } + } + }) + + t.Run("SendRecvWorkflows", func(t *testing.T) { + const numPairs = 500 + var wg sync.WaitGroup + receiverResults := make(chan string, numPairs) + senderResults := make(chan string, numPairs) + errors := make(chan error, numPairs*2) + + wg.Add(numPairs * 2) + + for i := range numPairs { + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, sendRecvReceiverWorkflow, pairID, WithWorkflowID(fmt.Sprintf("send-recv-receiver-%d", pairID))) + if err != nil { + errors <- fmt.Errorf("failed to start receiver workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for receiver workflow %d: %w", pairID, err) + return + } + expectedMessage := fmt.Sprintf("send-recv-message-%d", pairID) + if result != expectedMessage { + errors <- fmt.Errorf("receiver workflow %d: expected message '%s', got '%s'", pairID, expectedMessage, result) + return + } + receiverResults <- result + }(i) + + go func(pairID int) { + defer wg.Done() + handle, err := RunAsWorkflow(dbosCtx, sendRecvSenderWorkflow, pairID) + if err != nil { + errors <- fmt.Errorf("failed to start sender workflow %d: %w", pairID, err) + return + } + result, err := handle.GetResult() + if err != nil { + errors <- fmt.Errorf("failed to get result for sender workflow %d: %w", pairID, err) + return + } + senderResults <- result + }(i) + } + + wg.Wait() + close(receiverResults) + close(senderResults) + close(errors) + + if len(errors) > 0 { + for err := range errors { + t.Errorf("Workflow error: %v", err) + } + t.Fatalf("Expected no errors from send/recv workflows, got %d errors", len(errors)) + } + + receiverCount := 0 + receivedReceiverResults := make(map[string]bool) + for result := range receiverResults { + receiverCount++ + receivedReceiverResults[result] = true + } + + senderCount := 0 + for result := range senderResults { + senderCount++ + if result != "message-sent" { + t.Errorf("Expected sender result to be 'message-sent', got '%s'", result) + } + } + + if receiverCount != numPairs { + t.Fatalf("Expected %d receiver results, got %d", numPairs, receiverCount) + } + + if senderCount != numPairs { + t.Fatalf("Expected %d sender results, got %d", numPairs, senderCount) + } + + for i := range numPairs { + expectedReceiverResult := fmt.Sprintf("send-recv-message-%d", i) + if !receivedReceiverResults[expectedReceiverResult] { + t.Errorf("Expected receiver result '%s' not found", expectedReceiverResult) + } + } + }) +} + +func TestWorkflowAtVersion(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + RegisterWorkflow(dbosCtx, simpleWorkflow) + + version := "test-app-version-12345" + handle, err := RunAsWorkflow(dbosCtx, simpleWorkflow, "input", WithApplicationVersion(version)) + if err != nil { + t.Fatalf("failed to start workflow: %v", err) + } + + _, err = handle.GetResult() + if err != nil { + t.Fatalf("failed to get workflow result: %v", err) + } + + retrieved, err := RetrieveWorkflow[string](dbosCtx, handle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to retrieve workflow: %v", err) + } + + status, err := retrieved.GetStatus() + if err != nil { + t.Fatalf("failed to get workflow status: %v", err) + } + if status.ApplicationVersion != version { + t.Fatalf("expected application version %q, got %q", version, status.ApplicationVersion) + } +}