Skip to content

Commit 6c890b3

Browse files
committed
add step idempotency tests
1 parent cfe0aed commit 6c890b3

File tree

2 files changed

+109
-74
lines changed

2 files changed

+109
-74
lines changed

dbos/queues_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,8 @@ func TestWorkerConcurrencyXRecovery(t *testing.T) {
634634
// Start the first workflow and wait for it to start
635635
workerConcurrencyRecoveryStartEvent1.Wait()
636636
workerConcurrencyRecoveryStartEvent1.Clear()
637+
// Wait for a few seconds to let the queue runner loop
638+
time.Sleep(2 * time.Second)
637639

638640
// Ensure the 2nd workflow is still ENQUEUED
639641
status2, err := handle2.GetStatus()

dbos/workflows_test.go

Lines changed: 107 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,16 @@ func stepRetryAlwaysFailsStep(ctx context.Context, input string) (string, error)
335335
return "", fmt.Errorf("always fails - attempt %d", stepRetryAttemptCount)
336336
}
337337

338+
var stepIdempotencyCounter int
339+
340+
func stepIdempotencyTest(ctx context.Context, input string) (string, error) {
341+
stepIdempotencyCounter++
342+
fmt.Println("Executing idempotency step:", stepIdempotencyCounter)
343+
return input, nil
344+
}
345+
338346
func stepRetryWorkflow(ctx context.Context, input string) (string, error) {
347+
RunAsStep(ctx, stepIdempotencyTest, input)
339348
return RunAsStep(ctx, stepRetryAlwaysFailsStep, input,
340349
WithStepMaxRetries(5),
341350
WithBackoffFactor(2.0),
@@ -400,8 +409,9 @@ func TestSteps(t *testing.T) {
400409
})
401410

402411
t.Run("StepRetryWithExponentialBackoff", func(t *testing.T) {
403-
// Reset the global counter before test
412+
// Reset the global counters before test
404413
stepRetryAttemptCount = 0
414+
stepIdempotencyCounter = 0
405415

406416
// Execute the workflow
407417
handle, err := stepRetryWf(context.Background(), "test")
@@ -449,19 +459,24 @@ func TestSteps(t *testing.T) {
449459
t.Fatal("failed to get workflow steps:", err)
450460
}
451461

452-
if len(steps) != 1 {
453-
t.Fatalf("expected 1 recorded step, got %d", len(steps))
462+
if len(steps) != 2 {
463+
t.Fatalf("expected 2 recorded step, got %d", len(steps))
454464
}
455465

456-
// Verify the recorded step has the error
457-
step := steps[0]
466+
// Verify the second step has the error
467+
step := steps[1]
458468
if step.Error == nil {
459469
t.Fatal("expected error in recorded step, got none")
460470
}
461471

462472
if step.Error.Error() != dbosErr.Error() {
463473
t.Fatalf("expected recorded step error to match joined error, got '%s', expected '%s'", step.Error.Error(), dbosErr.Error())
464474
}
475+
476+
// Verify the idempotency step was executed only once
477+
if stepIdempotencyCounter != 1 {
478+
t.Fatalf("expected idempotency step to be executed only once, got %d", stepIdempotencyCounter)
479+
}
465480
})
466481
}
467482

@@ -980,24 +995,22 @@ func TestScheduledWorkflows(t *testing.T) {
980995
}
981996

982997
var (
983-
sendWf = WithWorkflow(sendWorkflow)
984-
singleSendWf = WithWorkflow(singleSendWorkflow)
985-
receiveWf = WithWorkflow(receiveWorkflow)
986-
receiveWfSteps = WithWorkflow(receiveWorkflowWithSteps)
987-
sendStructWf = WithWorkflow(sendStructWorkflow)
988-
receiveStructWf = WithWorkflow(receiveStructWorkflow)
998+
sendWf = WithWorkflow(sendWorkflow)
999+
receiveWf = WithWorkflow(receiveWorkflow)
1000+
sendStructWf = WithWorkflow(sendStructWorkflow)
1001+
receiveStructWf = WithWorkflow(receiveStructWorkflow)
1002+
sendIdempotencyWf = WithWorkflow(sendIdempotencyWorkflow)
1003+
sendIdempotencyEvent = NewEvent()
1004+
recvIdempotencyWf = WithWorkflow(receiveIdempotencyWorkflow)
1005+
receiveIdempotencyStartEvent = NewEvent()
1006+
receiveIdempotencyStopEvent = NewEvent()
9891007
)
9901008

9911009
type sendWorkflowInput struct {
9921010
DestinationID string
9931011
Topic string
9941012
}
9951013

996-
var (
997-
testRecvStartEvent = NewEvent()
998-
testSendCompleteEvent = NewEvent()
999-
)
1000-
10011014
func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
10021015
fmt.Println("Starting send workflow with input:", input)
10031016
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"})
@@ -1016,20 +1029,6 @@ func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error)
10161029
return "", nil
10171030
}
10181031

1019-
func singleSendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
1020-
fmt.Println("Starting send workflow with input:", input)
1021-
1022-
testSendCompleteEvent.Set()
1023-
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"})
1024-
if err != nil {
1025-
fmt.Printf("Send failed with error: %v\n", err)
1026-
return "", err
1027-
}
1028-
1029-
fmt.Println("Sending message on topic:", input.Topic, "to destination:", input.DestinationID)
1030-
return "", nil
1031-
}
1032-
10331032
func receiveWorkflow(ctx context.Context, topic string) (string, error) {
10341033
msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
10351034
if err != nil {
@@ -1046,25 +1045,6 @@ func receiveWorkflow(ctx context.Context, topic string) (string, error) {
10461045
return msg1 + "-" + msg2 + "-" + msg3, nil
10471046
}
10481047

1049-
func receiveWorkflowWithSteps(ctx context.Context, input string) (string, error) {
1050-
_, err := RunAsStep(ctx, simpleStep, input)
1051-
if err != nil {
1052-
return "", err
1053-
}
1054-
1055-
_, err = RunAsStep(ctx, simpleStep, input)
1056-
if err != nil {
1057-
return "", err
1058-
}
1059-
1060-
testRecvStartEvent.Set()
1061-
msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: input, Timeout: 1 * time.Second})
1062-
if err != nil {
1063-
return "", err
1064-
}
1065-
return msg1, nil
1066-
}
1067-
10681048
func sendStructWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
10691049
testStruct := sendRecvType{Value: "test-struct-value"}
10701050
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct})
@@ -1075,6 +1055,25 @@ func receiveStructWorkflow(ctx context.Context, topic string) (sendRecvType, err
10751055
return Recv[sendRecvType](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
10761056
}
10771057

1058+
func sendIdempotencyWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
1059+
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "m1"})
1060+
if err != nil {
1061+
return "", err
1062+
}
1063+
sendIdempotencyEvent.Wait()
1064+
return "idempotent-send-completed", nil
1065+
}
1066+
1067+
func receiveIdempotencyWorkflow(ctx context.Context, topic string) (string, error) {
1068+
msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
1069+
if err != nil {
1070+
return "", err
1071+
}
1072+
receiveIdempotencyStartEvent.Set()
1073+
receiveIdempotencyStopEvent.Wait()
1074+
return msg, nil
1075+
}
1076+
10781077
type sendRecvType struct {
10791078
Value string
10801079
}
@@ -1158,31 +1157,6 @@ func TestSendRecv(t *testing.T) {
11581157
}
11591158
})
11601159

1161-
t.Run("SendUsesCorrectWorkflowIDForIdempotency", func(t *testing.T) {
1162-
receiveHandle, err := receiveWfSteps(context.Background(), "test-topic")
1163-
if err != nil {
1164-
t.Fatalf("failed to start receive workflow: %v", err)
1165-
}
1166-
1167-
testRecvStartEvent.Wait()
1168-
1169-
sendHandle, err := singleSendWf(context.Background(), sendWorkflowInput{
1170-
DestinationID: receiveHandle.GetWorkflowID(),
1171-
Topic: "test-topic",
1172-
})
1173-
1174-
if err != nil {
1175-
t.Fatalf("failed to send message: %v", err)
1176-
}
1177-
1178-
testSendCompleteEvent.Wait()
1179-
1180-
_, err = sendHandle.GetResult(context.Background())
1181-
if err != nil {
1182-
t.Fatalf("send workflow failed: %v", err)
1183-
}
1184-
})
1185-
11861160
t.Run("SendRecvCustomStruct", func(t *testing.T) {
11871161
// Start the receive workflow
11881162
receiveHandle, err := receiveStructWf(context.Background(), "struct-topic")
@@ -1310,5 +1284,64 @@ func TestSendRecv(t *testing.T) {
13101284
t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error())
13111285
}
13121286
})
1287+
t.Run("SendRecvIdempotency", func(t *testing.T) {
1288+
// Start the receive workflow and wait for it to be ready
1289+
receiveHandle, err := recvIdempotencyWf(context.Background(), "idempotency-topic")
1290+
if err != nil {
1291+
t.Fatalf("failed to start receive idempotency workflow: %v", err)
1292+
}
1293+
1294+
// Send the message to the receive workflow
1295+
sendHandle, err := sendIdempotencyWf(context.Background(), sendWorkflowInput{
1296+
DestinationID: receiveHandle.GetWorkflowID(),
1297+
Topic: "idempotency-topic",
1298+
})
1299+
if err != nil {
1300+
t.Fatalf("failed to send idempotency message: %v", err)
1301+
}
13131302

1303+
// Wait for the receive workflow to have received the message
1304+
receiveIdempotencyStartEvent.Wait()
1305+
1306+
// Attempt recovering both workflows. There should be only 2 steps recorded after recovery.
1307+
recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"})
1308+
if err != nil {
1309+
t.Fatalf("failed to recover pending workflows: %v", err)
1310+
}
1311+
if len(recoveredHandles) != 2 {
1312+
t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles))
1313+
}
1314+
steps, err := getExecutor().systemDB.GetWorkflowSteps(context.Background(), sendHandle.GetWorkflowID())
1315+
if err != nil {
1316+
t.Fatalf("failed to get steps for send idempotency workflow: %v", err)
1317+
}
1318+
if len(steps) != 1 {
1319+
t.Fatalf("expected 1 step in send idempotency workflow, got %d", len(steps))
1320+
}
1321+
steps, err = getExecutor().systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID())
1322+
if err != nil {
1323+
t.Fatalf("failed to get steps for receive idempotency workflow: %v", err)
1324+
}
1325+
if len(steps) != 1 {
1326+
t.Fatalf("expected 1 step in receive idempotency workflow, got %d", len(steps))
1327+
}
1328+
1329+
// Unblock the workflows to complete
1330+
receiveIdempotencyStopEvent.Set()
1331+
result, err := receiveHandle.GetResult(context.Background())
1332+
if err != nil {
1333+
t.Fatalf("failed to get result from receive idempotency workflow: %v", err)
1334+
}
1335+
if result != "m1" {
1336+
t.Fatalf("expected result to be 'm1', got '%s'", result)
1337+
}
1338+
sendIdempotencyEvent.Set()
1339+
result, err = sendHandle.GetResult(context.Background())
1340+
if err != nil {
1341+
t.Fatalf("failed to get result from send idempotency workflow: %v", err)
1342+
}
1343+
if result != "idempotent-send-completed" {
1344+
t.Fatalf("expected result to be 'idempotent-send-completed', got '%s'", result)
1345+
}
1346+
})
13141347
}

0 commit comments

Comments
 (0)