@@ -1491,6 +1491,7 @@ var (
14911491 sendIdempotencyEvent = NewEvent ()
14921492 receiveIdempotencyStartEvent = NewEvent ()
14931493 receiveIdempotencyStopEvent = NewEvent ()
1494+ sendRecvSyncEvent = NewEvent () // Event to synchronize send/recv in tests
14941495 numConcurrentRecvWfs = 5
14951496 concurrentRecvReadyEvents = make ([]* Event , numConcurrentRecvWfs )
14961497 concurrentRecvStartEvent = NewEvent ()
@@ -1518,6 +1519,9 @@ func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) {
15181519}
15191520
15201521func receiveWorkflow (ctx DBOSContext , topic string ) (string , error ) {
1522+ // Wait for the test to signal it's ready
1523+ sendRecvSyncEvent .Wait ()
1524+
15211525 msg1 , err := Recv [string ](ctx , topic , 2 * time .Second )
15221526 if err != nil {
15231527 return "" , err
@@ -1651,27 +1655,34 @@ func TestSendRecv(t *testing.T) {
16511655 Launch (dbosCtx )
16521656
16531657 t .Run ("SendRecvSuccess" , func (t * testing.T ) {
1654- // Start the receive workflow
1658+ // Clear the sync event before starting
1659+ sendRecvSyncEvent .Clear ()
1660+
1661+ // Start the receive workflow - it will wait for sendRecvSyncEvent before calling Recv
16551662 receiveHandle , err := RunWorkflow (dbosCtx , receiveWorkflow , "test-topic" )
16561663 require .NoError (t , err , "failed to start receive workflow" )
16571664
1658- time .Sleep (500 * time .Millisecond ) // Ensure receive workflow is waiting so we don't miss the notification
1659-
1660- // Send a message to the receive workflow
1661- handle , err := RunWorkflow (dbosCtx , sendWorkflow , sendWorkflowInput {
1665+ // Send messages to the receive workflow
1666+ sendHandle , err := RunWorkflow (dbosCtx , sendWorkflow , sendWorkflowInput {
16621667 DestinationID : receiveHandle .GetWorkflowID (),
16631668 Topic : "test-topic" ,
16641669 })
16651670 require .NoError (t , err , "failed to send message" )
1666- _ , err = handle .GetResult ()
1671+
1672+ // Wait for send workflow to complete
1673+ _ , err = sendHandle .GetResult ()
16671674 require .NoError (t , err , "failed to get result from send workflow" )
16681675
1676+ // Now that the send workflow has completed, signal the receive workflow to proceed
1677+ sendRecvSyncEvent .Set ()
1678+
1679+ // Wait for receive workflow to complete
16691680 result , err := receiveHandle .GetResult ()
16701681 require .NoError (t , err , "failed to get result from receive workflow" )
16711682 require .Equal (t , "message1-message2-message3" , result )
16721683
16731684 // Verify step counting for send workflow (sendWorkflow calls Send 3 times)
1674- sendSteps , err := GetWorkflowSteps (dbosCtx , handle .GetWorkflowID ())
1685+ sendSteps , err := GetWorkflowSteps (dbosCtx , sendHandle .GetWorkflowID ())
16751686 require .NoError (t , err , "failed to get workflow steps for send workflow" )
16761687 require .Len (t , sendSteps , 3 , "expected 3 steps in send workflow (3 Send calls), got %d" , len (sendSteps ))
16771688 for i , step := range sendSteps {
@@ -1682,12 +1693,10 @@ func TestSendRecv(t *testing.T) {
16821693 // Verify step counting for receive workflow (receiveWorkflow calls Recv 3 times)
16831694 receiveSteps , err := GetWorkflowSteps (dbosCtx , receiveHandle .GetWorkflowID ())
16841695 require .NoError (t , err , "failed to get workflow steps for receive workflow" )
1685- require .Len (t , receiveSteps , 4 , "expected 4 steps in receive workflow (3 Recv calls + 1 sleep call during the first recv), got %d" , len (receiveSteps ))
1686- // Steps 0, 2 and 4 are recv
1696+ require .Len (t , receiveSteps , 3 , "expected 3 steps in receive workflow (3 Recv calls), got %d" , len (receiveSteps ))
16871697 require .Equal (t , "DBOS.recv" , receiveSteps [0 ].StepName , "expected step 0 to have StepName 'DBOS.recv'" )
1688- require .Equal (t , "DBOS.sleep " , receiveSteps [1 ].StepName , "expected step 1 to have StepName 'DBOS.sleep '" )
1698+ require .Equal (t , "DBOS.recv " , receiveSteps [1 ].StepName , "expected step 1 to have StepName 'DBOS.recv '" )
16891699 require .Equal (t , "DBOS.recv" , receiveSteps [2 ].StepName , "expected step 2 to have StepName 'DBOS.recv'" )
1690- require .Equal (t , "DBOS.recv" , receiveSteps [3 ].StepName , "expected step 3 to have StepName 'DBOS.recv'" )
16911700 })
16921701
16931702 t .Run ("SendRecvCustomStruct" , func (t * testing.T ) {
@@ -1754,12 +1763,25 @@ func TestSendRecv(t *testing.T) {
17541763 })
17551764
17561765 t .Run ("RecvTimeout" , func (t * testing.T ) {
1766+ // Set the event so the receive workflow can proceed immediately
1767+ sendRecvSyncEvent .Set ()
1768+
17571769 // Create a receive workflow that tries to receive a message but no send happens
17581770 receiveHandle , err := RunWorkflow (dbosCtx , receiveWorkflow , "timeout-test-topic" )
17591771 require .NoError (t , err , "failed to start receive workflow" )
17601772 result , err := receiveHandle .GetResult ()
17611773 require .NoError (t , err , "expected no error on timeout" )
17621774 assert .Equal (t , "--" , result , "expected -- result on timeout" )
1775+ // Check that six steps were recorded: recv, sleep, recv, sleep, recv, sleep
1776+ steps , err := GetWorkflowSteps (dbosCtx , receiveHandle .GetWorkflowID ())
1777+ require .NoError (t , err , "failed to get workflow steps" )
1778+ require .Len (t , steps , 6 , "expected 6 steps in receive workflow, got %d" , len (steps ))
1779+ require .Equal (t , "DBOS.recv" , steps [0 ].StepName , "expected step 0 to have StepName 'DBOS.recv'" )
1780+ require .Equal (t , "DBOS.sleep" , steps [1 ].StepName , "expected step 1 to have StepName 'DBOS.sleep'" )
1781+ require .Equal (t , "DBOS.recv" , steps [2 ].StepName , "expected step 2 to have StepName 'DBOS.recv'" )
1782+ require .Equal (t , "DBOS.sleep" , steps [3 ].StepName , "expected step 3 to have StepName 'DBOS.sleep'" )
1783+ require .Equal (t , "DBOS.recv" , steps [4 ].StepName , "expected step 4 to have StepName 'DBOS.recv'" )
1784+ require .Equal (t , "DBOS.sleep" , steps [5 ].StepName , "expected step 5 to have StepName 'DBOS.sleep'" )
17631785 })
17641786
17651787 t .Run ("RecvMustRunInsideWorkflows" , func (t * testing.T ) {
@@ -1778,6 +1800,9 @@ func TestSendRecv(t *testing.T) {
17781800 })
17791801
17801802 t .Run ("SendOutsideWorkflow" , func (t * testing.T ) {
1803+ // Set the event so the receive workflow can proceed immediately
1804+ sendRecvSyncEvent .Set ()
1805+
17811806 // Start a receive workflow to have a valid destination
17821807 receiveHandle , err := RunWorkflow (dbosCtx , receiveWorkflow , "outside-workflow-topic" )
17831808 require .NoError (t , err , "failed to start receive workflow" )
@@ -1852,6 +1877,9 @@ func TestSendRecv(t *testing.T) {
18521877 })
18531878
18541879 t .Run ("SendCannotBeCalledWithinStep" , func (t * testing.T ) {
1880+ // Set the event so the receive workflow can proceed immediately
1881+ sendRecvSyncEvent .Set ()
1882+
18551883 // Start a receive workflow to have a valid destination
18561884 receiveHandle , err := RunWorkflow (dbosCtx , receiveWorkflow , "send-within-step-topic" )
18571885 require .NoError (t , err , "failed to start receive workflow" )
0 commit comments