Skip to content

Commit 274525f

Browse files
af-mdmaxdml
andauthored
Fix workflow ID reference in Send function to use current workflow state (#32)
Co-authored-by: maxdml <[email protected]>
1 parent d519b2f commit 274525f

File tree

3 files changed

+110
-10
lines changed

3 files changed

+110
-10
lines changed

dbos/queues_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,8 @@ func TestWorkerConcurrencyXRecovery(t *testing.T) {
634634
// Start the first workflow and wait for it to start
635635
workerConcurrencyRecoveryStartEvent1.Wait()
636636
workerConcurrencyRecoveryStartEvent1.Clear()
637+
// Wait for a few seconds to let the queue runner loop
638+
time.Sleep(2 * time.Second)
637639

638640
// Ensure the 2nd workflow is still ENQUEUED
639641
status2, err := handle2.GetStatus()

dbos/system_database.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro
10301030

10311031
// Check if operation was already executed and do nothing if so
10321032
checkInput := CheckOperationExecutionDBInput{
1033-
workflowID: input.DestinationID,
1033+
workflowID: workflowState.WorkflowID,
10341034
operationID: stepID,
10351035
functionName: functionName,
10361036
tx: tx,

dbos/workflows_test.go

Lines changed: 107 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,16 @@ func stepRetryAlwaysFailsStep(ctx context.Context, input string) (string, error)
335335
return "", fmt.Errorf("always fails - attempt %d", stepRetryAttemptCount)
336336
}
337337

338+
var stepIdempotencyCounter int
339+
340+
func stepIdempotencyTest(ctx context.Context, input string) (string, error) {
341+
stepIdempotencyCounter++
342+
fmt.Println("Executing idempotency step:", stepIdempotencyCounter)
343+
return input, nil
344+
}
345+
338346
func stepRetryWorkflow(ctx context.Context, input string) (string, error) {
347+
RunAsStep(ctx, stepIdempotencyTest, input)
339348
return RunAsStep(ctx, stepRetryAlwaysFailsStep, input,
340349
WithStepMaxRetries(5),
341350
WithBackoffFactor(2.0),
@@ -400,8 +409,9 @@ func TestSteps(t *testing.T) {
400409
})
401410

402411
t.Run("StepRetryWithExponentialBackoff", func(t *testing.T) {
403-
// Reset the global counter before test
412+
// Reset the global counters before test
404413
stepRetryAttemptCount = 0
414+
stepIdempotencyCounter = 0
405415

406416
// Execute the workflow
407417
handle, err := stepRetryWf(context.Background(), "test")
@@ -449,19 +459,24 @@ func TestSteps(t *testing.T) {
449459
t.Fatal("failed to get workflow steps:", err)
450460
}
451461

452-
if len(steps) != 1 {
453-
t.Fatalf("expected 1 recorded step, got %d", len(steps))
462+
if len(steps) != 2 {
463+
t.Fatalf("expected 2 recorded step, got %d", len(steps))
454464
}
455465

456-
// Verify the recorded step has the error
457-
step := steps[0]
466+
// Verify the second step has the error
467+
step := steps[1]
458468
if step.Error == nil {
459469
t.Fatal("expected error in recorded step, got none")
460470
}
461471

462472
if step.Error.Error() != dbosErr.Error() {
463473
t.Fatalf("expected recorded step error to match joined error, got '%s', expected '%s'", step.Error.Error(), dbosErr.Error())
464474
}
475+
476+
// Verify the idempotency step was executed only once
477+
if stepIdempotencyCounter != 1 {
478+
t.Fatalf("expected idempotency step to be executed only once, got %d", stepIdempotencyCounter)
479+
}
465480
})
466481
}
467482

@@ -980,10 +995,15 @@ func TestScheduledWorkflows(t *testing.T) {
980995
}
981996

982997
var (
983-
sendWf = WithWorkflow(sendWorkflow)
984-
receiveWf = WithWorkflow(receiveWorkflow)
985-
sendStructWf = WithWorkflow(sendStructWorkflow)
986-
receiveStructWf = WithWorkflow(receiveStructWorkflow)
998+
sendWf = WithWorkflow(sendWorkflow)
999+
receiveWf = WithWorkflow(receiveWorkflow)
1000+
sendStructWf = WithWorkflow(sendStructWorkflow)
1001+
receiveStructWf = WithWorkflow(receiveStructWorkflow)
1002+
sendIdempotencyWf = WithWorkflow(sendIdempotencyWorkflow)
1003+
sendIdempotencyEvent = NewEvent()
1004+
recvIdempotencyWf = WithWorkflow(receiveIdempotencyWorkflow)
1005+
receiveIdempotencyStartEvent = NewEvent()
1006+
receiveIdempotencyStopEvent = NewEvent()
9871007
)
9881008

9891009
type sendWorkflowInput struct {
@@ -1035,6 +1055,25 @@ func receiveStructWorkflow(ctx context.Context, topic string) (sendRecvType, err
10351055
return Recv[sendRecvType](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
10361056
}
10371057

1058+
func sendIdempotencyWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
1059+
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "m1"})
1060+
if err != nil {
1061+
return "", err
1062+
}
1063+
sendIdempotencyEvent.Wait()
1064+
return "idempotent-send-completed", nil
1065+
}
1066+
1067+
func receiveIdempotencyWorkflow(ctx context.Context, topic string) (string, error) {
1068+
msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
1069+
if err != nil {
1070+
return "", err
1071+
}
1072+
receiveIdempotencyStartEvent.Set()
1073+
receiveIdempotencyStopEvent.Wait()
1074+
return msg, nil
1075+
}
1076+
10381077
type sendRecvType struct {
10391078
Value string
10401079
}
@@ -1245,5 +1284,64 @@ func TestSendRecv(t *testing.T) {
12451284
t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error())
12461285
}
12471286
})
1287+
t.Run("SendRecvIdempotency", func(t *testing.T) {
1288+
// Start the receive workflow and wait for it to be ready
1289+
receiveHandle, err := recvIdempotencyWf(context.Background(), "idempotency-topic")
1290+
if err != nil {
1291+
t.Fatalf("failed to start receive idempotency workflow: %v", err)
1292+
}
1293+
1294+
// Send the message to the receive workflow
1295+
sendHandle, err := sendIdempotencyWf(context.Background(), sendWorkflowInput{
1296+
DestinationID: receiveHandle.GetWorkflowID(),
1297+
Topic: "idempotency-topic",
1298+
})
1299+
if err != nil {
1300+
t.Fatalf("failed to send idempotency message: %v", err)
1301+
}
12481302

1303+
// Wait for the receive workflow to have received the message
1304+
receiveIdempotencyStartEvent.Wait()
1305+
1306+
// Attempt recovering both workflows. There should be only 2 steps recorded after recovery.
1307+
recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"})
1308+
if err != nil {
1309+
t.Fatalf("failed to recover pending workflows: %v", err)
1310+
}
1311+
if len(recoveredHandles) != 2 {
1312+
t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles))
1313+
}
1314+
steps, err := getExecutor().systemDB.GetWorkflowSteps(context.Background(), sendHandle.GetWorkflowID())
1315+
if err != nil {
1316+
t.Fatalf("failed to get steps for send idempotency workflow: %v", err)
1317+
}
1318+
if len(steps) != 1 {
1319+
t.Fatalf("expected 1 step in send idempotency workflow, got %d", len(steps))
1320+
}
1321+
steps, err = getExecutor().systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID())
1322+
if err != nil {
1323+
t.Fatalf("failed to get steps for receive idempotency workflow: %v", err)
1324+
}
1325+
if len(steps) != 1 {
1326+
t.Fatalf("expected 1 step in receive idempotency workflow, got %d", len(steps))
1327+
}
1328+
1329+
// Unblock the workflows to complete
1330+
receiveIdempotencyStopEvent.Set()
1331+
result, err := receiveHandle.GetResult(context.Background())
1332+
if err != nil {
1333+
t.Fatalf("failed to get result from receive idempotency workflow: %v", err)
1334+
}
1335+
if result != "m1" {
1336+
t.Fatalf("expected result to be 'm1', got '%s'", result)
1337+
}
1338+
sendIdempotencyEvent.Set()
1339+
result, err = sendHandle.GetResult(context.Background())
1340+
if err != nil {
1341+
t.Fatalf("failed to get result from send idempotency workflow: %v", err)
1342+
}
1343+
if result != "idempotent-send-completed" {
1344+
t.Fatalf("expected result to be 'idempotent-send-completed', got '%s'", result)
1345+
}
1346+
})
12491347
}

0 commit comments

Comments
 (0)