From f9dfcad5b267df046ad41ed4516dedb76553466f Mon Sep 17 00:00:00 2001 From: Afzal Muhammad Date: Sat, 19 Jul 2025 14:05:13 +0100 Subject: [PATCH 1/4] Fix workflow ID reference in Send function to use current workflow state --- dbos/system_database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index a4d68b2c..8a77e94e 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1030,7 +1030,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro // Check if operation was already executed and do nothing if so checkInput := CheckOperationExecutionDBInput{ - workflowID: input.DestinationID, + workflowID: workflowState.WorkflowID, operationID: stepID, functionName: functionName, tx: tx, From bc29c11cd844f56f8918195b2f1737c1f07d3b6d Mon Sep 17 00:00:00 2001 From: Afzal Muhammad Date: Sun, 20 Jul 2025 14:22:56 +0100 Subject: [PATCH 2/4] Test if Send function uses correct WorkflowID when checking for Idempotency --- dbos/workflows_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index a3ffb8f1..c0ae2cf2 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -981,7 +981,9 @@ func TestScheduledWorkflows(t *testing.T) { var ( sendWf = WithWorkflow(sendWorkflow) + singleSendWf = WithWorkflow(singleSendWorkflow) receiveWf = WithWorkflow(receiveWorkflow) + receiveWfSteps = WithWorkflow(receiveWorkflowWithSteps) sendStructWf = WithWorkflow(sendStructWorkflow) receiveStructWf = WithWorkflow(receiveStructWorkflow) ) @@ -991,6 +993,11 @@ type sendWorkflowInput struct { Topic string } +var ( + testRecvStartEvent = NewEvent() + testSendCompleteEvent = NewEvent() +) + func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { fmt.Println("Starting send workflow with input:", input) err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"}) @@ -1009,6 +1016,20 @@ func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) return "", nil } +func singleSendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { + fmt.Println("Starting send workflow with input:", input) + + testSendCompleteEvent.Set() + err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"}) + if err != nil { + fmt.Printf("Send failed with error: %v\n", err) + return "", err + } + + fmt.Println("Sending message on topic:", input.Topic, "to destination:", input.DestinationID) + return "", nil +} + func receiveWorkflow(ctx context.Context, topic string) (string, error) { msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) if err != nil { @@ -1025,6 +1046,25 @@ func receiveWorkflow(ctx context.Context, topic string) (string, error) { return msg1 + "-" + msg2 + "-" + msg3, nil } +func receiveWorkflowWithSteps(ctx context.Context, input string) (string, error) { + _, err := RunAsStep(ctx, simpleStep, input) + if err != nil { + return "", err + } + + _, err = RunAsStep(ctx, simpleStep, input) + if err != nil { + return "", err + } + + testRecvStartEvent.Set() + msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: input, Timeout: 1 * time.Second}) + if err != nil { + return "", err + } + return msg1, nil +} + func sendStructWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { testStruct := sendRecvType{Value: "test-struct-value"} err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct}) @@ -1118,6 +1158,32 @@ func TestSendRecv(t *testing.T) { } }) + t.Run("SendUsesCorrectWorkflowIDForIdempotency", func(t *testing.T) { + receiveHandle, err := receiveWfSteps(context.Background(), "test-topic") + if err != nil { + t.Fatalf("failed to start receive workflow: %v", err) + } + + testRecvStartEvent.Wait() + + sendHandle, err := singleSendWf(context.Background(), sendWorkflowInput{ + DestinationID: receiveHandle.GetWorkflowID(), + Topic: "test-topic", + }) + + if err != nil { + t.Fatalf("failed to send message: %v", err) + } + + testSendCompleteEvent.Wait() + + // Verify send completed without any error + _, err = sendHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("send workflow failed: %v", err) + } + }) + t.Run("SendRecvCustomStruct", func(t *testing.T) { // Start the receive workflow receiveHandle, err := receiveStructWf(context.Background(), "struct-topic") From cfe0aed12ca6bb6a56543df6943d36bf8e6b0174 Mon Sep 17 00:00:00 2001 From: Afzal Muhammad Date: Sun, 20 Jul 2025 14:23:54 +0100 Subject: [PATCH 3/4] remove comment --- dbos/workflows_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index c0ae2cf2..8aa72e89 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1177,7 +1177,6 @@ func TestSendRecv(t *testing.T) { testSendCompleteEvent.Wait() - // Verify send completed without any error _, err = sendHandle.GetResult(context.Background()) if err != nil { t.Fatalf("send workflow failed: %v", err) From 6c890b3df2a47f356f1590e1f5b3114a6ed09124 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 22:06:46 -0700 Subject: [PATCH 4/4] add step idempotency tests --- dbos/queues_test.go | 2 + dbos/workflows_test.go | 181 ++++++++++++++++++++++++----------------- 2 files changed, 109 insertions(+), 74 deletions(-) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 310b9828..c78fe656 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -634,6 +634,8 @@ func TestWorkerConcurrencyXRecovery(t *testing.T) { // Start the first workflow and wait for it to start workerConcurrencyRecoveryStartEvent1.Wait() workerConcurrencyRecoveryStartEvent1.Clear() + // Wait for a few seconds to let the queue runner loop + time.Sleep(2 * time.Second) // Ensure the 2nd workflow is still ENQUEUED status2, err := handle2.GetStatus() diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 8aa72e89..11e8fe24 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -335,7 +335,16 @@ func stepRetryAlwaysFailsStep(ctx context.Context, input string) (string, error) return "", fmt.Errorf("always fails - attempt %d", stepRetryAttemptCount) } +var stepIdempotencyCounter int + +func stepIdempotencyTest(ctx context.Context, input string) (string, error) { + stepIdempotencyCounter++ + fmt.Println("Executing idempotency step:", stepIdempotencyCounter) + return input, nil +} + func stepRetryWorkflow(ctx context.Context, input string) (string, error) { + RunAsStep(ctx, stepIdempotencyTest, input) return RunAsStep(ctx, stepRetryAlwaysFailsStep, input, WithStepMaxRetries(5), WithBackoffFactor(2.0), @@ -400,8 +409,9 @@ func TestSteps(t *testing.T) { }) t.Run("StepRetryWithExponentialBackoff", func(t *testing.T) { - // Reset the global counter before test + // Reset the global counters before test stepRetryAttemptCount = 0 + stepIdempotencyCounter = 0 // Execute the workflow handle, err := stepRetryWf(context.Background(), "test") @@ -449,12 +459,12 @@ func TestSteps(t *testing.T) { t.Fatal("failed to get workflow steps:", err) } - if len(steps) != 1 { - t.Fatalf("expected 1 recorded step, got %d", len(steps)) + if len(steps) != 2 { + t.Fatalf("expected 2 recorded step, got %d", len(steps)) } - // Verify the recorded step has the error - step := steps[0] + // Verify the second step has the error + step := steps[1] if step.Error == nil { t.Fatal("expected error in recorded step, got none") } @@ -462,6 +472,11 @@ func TestSteps(t *testing.T) { if step.Error.Error() != dbosErr.Error() { t.Fatalf("expected recorded step error to match joined error, got '%s', expected '%s'", step.Error.Error(), dbosErr.Error()) } + + // Verify the idempotency step was executed only once + if stepIdempotencyCounter != 1 { + t.Fatalf("expected idempotency step to be executed only once, got %d", stepIdempotencyCounter) + } }) } @@ -980,12 +995,15 @@ func TestScheduledWorkflows(t *testing.T) { } var ( - sendWf = WithWorkflow(sendWorkflow) - singleSendWf = WithWorkflow(singleSendWorkflow) - receiveWf = WithWorkflow(receiveWorkflow) - receiveWfSteps = WithWorkflow(receiveWorkflowWithSteps) - sendStructWf = WithWorkflow(sendStructWorkflow) - receiveStructWf = WithWorkflow(receiveStructWorkflow) + sendWf = WithWorkflow(sendWorkflow) + receiveWf = WithWorkflow(receiveWorkflow) + sendStructWf = WithWorkflow(sendStructWorkflow) + receiveStructWf = WithWorkflow(receiveStructWorkflow) + sendIdempotencyWf = WithWorkflow(sendIdempotencyWorkflow) + sendIdempotencyEvent = NewEvent() + recvIdempotencyWf = WithWorkflow(receiveIdempotencyWorkflow) + receiveIdempotencyStartEvent = NewEvent() + receiveIdempotencyStopEvent = NewEvent() ) type sendWorkflowInput struct { @@ -993,11 +1011,6 @@ type sendWorkflowInput struct { Topic string } -var ( - testRecvStartEvent = NewEvent() - testSendCompleteEvent = NewEvent() -) - func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { fmt.Println("Starting send workflow with input:", input) err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"}) @@ -1016,20 +1029,6 @@ func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) return "", nil } -func singleSendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { - fmt.Println("Starting send workflow with input:", input) - - testSendCompleteEvent.Set() - err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"}) - if err != nil { - fmt.Printf("Send failed with error: %v\n", err) - return "", err - } - - fmt.Println("Sending message on topic:", input.Topic, "to destination:", input.DestinationID) - return "", nil -} - func receiveWorkflow(ctx context.Context, topic string) (string, error) { msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) if err != nil { @@ -1046,25 +1045,6 @@ func receiveWorkflow(ctx context.Context, topic string) (string, error) { return msg1 + "-" + msg2 + "-" + msg3, nil } -func receiveWorkflowWithSteps(ctx context.Context, input string) (string, error) { - _, err := RunAsStep(ctx, simpleStep, input) - if err != nil { - return "", err - } - - _, err = RunAsStep(ctx, simpleStep, input) - if err != nil { - return "", err - } - - testRecvStartEvent.Set() - msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: input, Timeout: 1 * time.Second}) - if err != nil { - return "", err - } - return msg1, nil -} - func sendStructWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { testStruct := sendRecvType{Value: "test-struct-value"} err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct}) @@ -1075,6 +1055,25 @@ func receiveStructWorkflow(ctx context.Context, topic string) (sendRecvType, err return Recv[sendRecvType](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) } +func sendIdempotencyWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { + err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "m1"}) + if err != nil { + return "", err + } + sendIdempotencyEvent.Wait() + return "idempotent-send-completed", nil +} + +func receiveIdempotencyWorkflow(ctx context.Context, topic string) (string, error) { + msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) + if err != nil { + return "", err + } + receiveIdempotencyStartEvent.Set() + receiveIdempotencyStopEvent.Wait() + return msg, nil +} + type sendRecvType struct { Value string } @@ -1158,31 +1157,6 @@ func TestSendRecv(t *testing.T) { } }) - t.Run("SendUsesCorrectWorkflowIDForIdempotency", func(t *testing.T) { - receiveHandle, err := receiveWfSteps(context.Background(), "test-topic") - if err != nil { - t.Fatalf("failed to start receive workflow: %v", err) - } - - testRecvStartEvent.Wait() - - sendHandle, err := singleSendWf(context.Background(), sendWorkflowInput{ - DestinationID: receiveHandle.GetWorkflowID(), - Topic: "test-topic", - }) - - if err != nil { - t.Fatalf("failed to send message: %v", err) - } - - testSendCompleteEvent.Wait() - - _, err = sendHandle.GetResult(context.Background()) - if err != nil { - t.Fatalf("send workflow failed: %v", err) - } - }) - t.Run("SendRecvCustomStruct", func(t *testing.T) { // Start the receive workflow receiveHandle, err := receiveStructWf(context.Background(), "struct-topic") @@ -1310,5 +1284,64 @@ func TestSendRecv(t *testing.T) { t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error()) } }) + t.Run("SendRecvIdempotency", func(t *testing.T) { + // Start the receive workflow and wait for it to be ready + receiveHandle, err := recvIdempotencyWf(context.Background(), "idempotency-topic") + if err != nil { + t.Fatalf("failed to start receive idempotency workflow: %v", err) + } + + // Send the message to the receive workflow + sendHandle, err := sendIdempotencyWf(context.Background(), sendWorkflowInput{ + DestinationID: receiveHandle.GetWorkflowID(), + Topic: "idempotency-topic", + }) + if err != nil { + t.Fatalf("failed to send idempotency message: %v", err) + } + // Wait for the receive workflow to have received the message + receiveIdempotencyStartEvent.Wait() + + // Attempt recovering both workflows. There should be only 2 steps recorded after recovery. + recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"}) + if err != nil { + t.Fatalf("failed to recover pending workflows: %v", err) + } + if len(recoveredHandles) != 2 { + t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles)) + } + steps, err := getExecutor().systemDB.GetWorkflowSteps(context.Background(), sendHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get steps for send idempotency workflow: %v", err) + } + if len(steps) != 1 { + t.Fatalf("expected 1 step in send idempotency workflow, got %d", len(steps)) + } + steps, err = getExecutor().systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get steps for receive idempotency workflow: %v", err) + } + if len(steps) != 1 { + t.Fatalf("expected 1 step in receive idempotency workflow, got %d", len(steps)) + } + + // Unblock the workflows to complete + receiveIdempotencyStopEvent.Set() + result, err := receiveHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from receive idempotency workflow: %v", err) + } + if result != "m1" { + t.Fatalf("expected result to be 'm1', got '%s'", result) + } + sendIdempotencyEvent.Set() + result, err = sendHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from send idempotency workflow: %v", err) + } + if result != "idempotent-send-completed" { + t.Fatalf("expected result to be 'idempotent-send-completed', got '%s'", result) + } + }) }