@@ -981,7 +981,9 @@ func TestScheduledWorkflows(t *testing.T) {
981981
982982var (
983983 sendWf = WithWorkflow (sendWorkflow )
984+ singleSendWf = WithWorkflow (singleSendWorkflow )
984985 receiveWf = WithWorkflow (receiveWorkflow )
986+ receiveWfSteps = WithWorkflow (receiveWorkflowWithSteps )
985987 sendStructWf = WithWorkflow (sendStructWorkflow )
986988 receiveStructWf = WithWorkflow (receiveStructWorkflow )
987989)
@@ -991,6 +993,11 @@ type sendWorkflowInput struct {
991993 Topic string
992994}
993995
996+ var (
997+ testRecvStartEvent = NewEvent ()
998+ testSendCompleteEvent = NewEvent ()
999+ )
1000+
9941001func sendWorkflow (ctx context.Context , input sendWorkflowInput ) (string , error ) {
9951002 fmt .Println ("Starting send workflow with input:" , input )
9961003 err := Send (ctx , WorkflowSendInput {DestinationID : input .DestinationID , Topic : input .Topic , Message : "message1" })
@@ -1009,6 +1016,20 @@ func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error)
10091016 return "" , nil
10101017}
10111018
1019+ func singleSendWorkflow (ctx context.Context , input sendWorkflowInput ) (string , error ) {
1020+ fmt .Println ("Starting send workflow with input:" , input )
1021+
1022+ testSendCompleteEvent .Set ()
1023+ err := Send (ctx , WorkflowSendInput {DestinationID : input .DestinationID , Topic : input .Topic , Message : "message1" })
1024+ if err != nil {
1025+ fmt .Printf ("Send failed with error: %v\n " , err )
1026+ return "" , err
1027+ }
1028+
1029+ fmt .Println ("Sending message on topic:" , input .Topic , "to destination:" , input .DestinationID )
1030+ return "" , nil
1031+ }
1032+
10121033func receiveWorkflow (ctx context.Context , topic string ) (string , error ) {
10131034 msg1 , err := Recv [string ](ctx , WorkflowRecvInput {Topic : topic , Timeout : 3 * time .Second })
10141035 if err != nil {
@@ -1025,6 +1046,25 @@ func receiveWorkflow(ctx context.Context, topic string) (string, error) {
10251046 return msg1 + "-" + msg2 + "-" + msg3 , nil
10261047}
10271048
1049+ func receiveWorkflowWithSteps (ctx context.Context , input string ) (string , error ) {
1050+ _ , err := RunAsStep (ctx , simpleStep , input )
1051+ if err != nil {
1052+ return "" , err
1053+ }
1054+
1055+ _ , err = RunAsStep (ctx , simpleStep , input )
1056+ if err != nil {
1057+ return "" , err
1058+ }
1059+
1060+ testRecvStartEvent .Set ()
1061+ msg1 , err := Recv [string ](ctx , WorkflowRecvInput {Topic : input , Timeout : 1 * time .Second })
1062+ if err != nil {
1063+ return "" , err
1064+ }
1065+ return msg1 , nil
1066+ }
1067+
10281068func sendStructWorkflow (ctx context.Context , input sendWorkflowInput ) (string , error ) {
10291069 testStruct := sendRecvType {Value : "test-struct-value" }
10301070 err := Send (ctx , WorkflowSendInput {DestinationID : input .DestinationID , Topic : input .Topic , Message : testStruct })
@@ -1118,6 +1158,32 @@ func TestSendRecv(t *testing.T) {
11181158 }
11191159 })
11201160
1161+ t .Run ("SendUsesCorrectWorkflowIDForIdempotency" , func (t * testing.T ) {
1162+ receiveHandle , err := receiveWfSteps (context .Background (), "test-topic" )
1163+ if err != nil {
1164+ t .Fatalf ("failed to start receive workflow: %v" , err )
1165+ }
1166+
1167+ testRecvStartEvent .Wait ()
1168+
1169+ sendHandle , err := singleSendWf (context .Background (), sendWorkflowInput {
1170+ DestinationID : receiveHandle .GetWorkflowID (),
1171+ Topic : "test-topic" ,
1172+ })
1173+
1174+ if err != nil {
1175+ t .Fatalf ("failed to send message: %v" , err )
1176+ }
1177+
1178+ testSendCompleteEvent .Wait ()
1179+
1180+ // Verify send completed without any error
1181+ _ , err = sendHandle .GetResult (context .Background ())
1182+ if err != nil {
1183+ t .Fatalf ("send workflow failed: %v" , err )
1184+ }
1185+ })
1186+
11211187 t .Run ("SendRecvCustomStruct" , func (t * testing.T ) {
11221188 // Start the receive workflow
11231189 receiveHandle , err := receiveStructWf (context .Background (), "struct-topic" )
0 commit comments