Skip to content

Commit 56c3764

Browse files
committed
Test if Send function uses correct WorkflowID when checking for Idempotency
1 parent c1f3663 commit 56c3764

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

dbos/workflows_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,9 @@ func TestScheduledWorkflows(t *testing.T) {
981981

982982
var (
983983
sendWf = WithWorkflow(sendWorkflow)
984+
singleSendWf = WithWorkflow(singleSendWorkflow)
984985
receiveWf = WithWorkflow(receiveWorkflow)
986+
receiveWfSteps = WithWorkflow(receiveWorkflowWithSteps)
985987
sendStructWf = WithWorkflow(sendStructWorkflow)
986988
receiveStructWf = WithWorkflow(receiveStructWorkflow)
987989
)
@@ -991,6 +993,11 @@ type sendWorkflowInput struct {
991993
Topic string
992994
}
993995

996+
var (
997+
testRecvStartEvent = NewEvent()
998+
testSendCompleteEvent = NewEvent()
999+
)
1000+
9941001
func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
9951002
fmt.Println("Starting send workflow with input:", input)
9961003
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"})
@@ -1009,6 +1016,20 @@ func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error)
10091016
return "", nil
10101017
}
10111018

1019+
func singleSendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
1020+
fmt.Println("Starting send workflow with input:", input)
1021+
1022+
testSendCompleteEvent.Set()
1023+
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"})
1024+
if err != nil {
1025+
fmt.Printf("Send failed with error: %v\n", err)
1026+
return "", err
1027+
}
1028+
1029+
fmt.Println("Sending message on topic:", input.Topic, "to destination:", input.DestinationID)
1030+
return "", nil
1031+
}
1032+
10121033
func receiveWorkflow(ctx context.Context, topic string) (string, error) {
10131034
msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
10141035
if err != nil {
@@ -1025,6 +1046,25 @@ func receiveWorkflow(ctx context.Context, topic string) (string, error) {
10251046
return msg1 + "-" + msg2 + "-" + msg3, nil
10261047
}
10271048

1049+
func receiveWorkflowWithSteps(ctx context.Context, input string) (string, error) {
1050+
_, err := RunAsStep(ctx, simpleStep, input)
1051+
if err != nil {
1052+
return "", err
1053+
}
1054+
1055+
_, err = RunAsStep(ctx, simpleStep, input)
1056+
if err != nil {
1057+
return "", err
1058+
}
1059+
1060+
testRecvStartEvent.Set()
1061+
msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: input, Timeout: 1 * time.Second})
1062+
if err != nil {
1063+
return "", err
1064+
}
1065+
return msg1, nil
1066+
}
1067+
10281068
func sendStructWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
10291069
testStruct := sendRecvType{Value: "test-struct-value"}
10301070
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct})
@@ -1118,6 +1158,32 @@ func TestSendRecv(t *testing.T) {
11181158
}
11191159
})
11201160

1161+
t.Run("SendUsesCorrectWorkflowIDForIdempotency", func(t *testing.T) {
1162+
receiveHandle, err := receiveWfSteps(context.Background(), "test-topic")
1163+
if err != nil {
1164+
t.Fatalf("failed to start receive workflow: %v", err)
1165+
}
1166+
1167+
testRecvStartEvent.Wait()
1168+
1169+
sendHandle, err := singleSendWf(context.Background(), sendWorkflowInput{
1170+
DestinationID: receiveHandle.GetWorkflowID(),
1171+
Topic: "test-topic",
1172+
})
1173+
1174+
if err != nil {
1175+
t.Fatalf("failed to send message: %v", err)
1176+
}
1177+
1178+
testSendCompleteEvent.Wait()
1179+
1180+
// Verify send completed without any error
1181+
_, err = sendHandle.GetResult(context.Background())
1182+
if err != nil {
1183+
t.Fatalf("send workflow failed: %v", err)
1184+
}
1185+
})
1186+
11211187
t.Run("SendRecvCustomStruct", func(t *testing.T) {
11221188
// Start the receive workflow
11231189
receiveHandle, err := receiveStructWf(context.Background(), "struct-topic")

0 commit comments

Comments
 (0)