Skip to content

Commit 87e6569

Browse files
committed
always do a sleep in recv even if the key exists
1 parent 28f1253 commit 87e6569

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

dbos/system_database.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1782,6 +1782,17 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
17821782
s.logger.Warn("Recv() context cancelled", "payload", payload, "cause", context.Cause(ctx))
17831783
return nil, ctx.Err()
17841784
}
1785+
} else {
1786+
// Do a "zero" sleep to record the sleep step in the workflow history
1787+
// This is to handle cases were consecutive Recv calls for the same key are made
1788+
// The key would exist and without this, the step IDs would be: 0 recv, 1 sleep, 2 recv, 4 recv, etc
1789+
_, err := s.sleep(ctx, sleepInput{
1790+
duration: 0,
1791+
stepID: &sleepStepID,
1792+
})
1793+
if err != nil {
1794+
return nil, fmt.Errorf("failed to sleep before recv timeout: %w", err)
1795+
}
17851796
}
17861797

17871798
// Find the oldest message and delete it atomically

dbos/workflows_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,10 +1535,10 @@ func TestSendRecv(t *testing.T) {
15351535
// Verify step counting for receive workflow (receiveWorkflow calls Recv 3 times)
15361536
receiveSteps, err := GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID())
15371537
require.NoError(t, err, "failed to get workflow steps for receive workflow")
1538-
require.Len(t, receiveSteps, 4, "expected 4 steps in receive workflow (3 Recv calls + 1 sleep call during the first recv), got %d", len(receiveSteps))
1538+
require.Len(t, receiveSteps, 6, "expected 4 steps in receive workflow (3 Recv calls + 1 sleep call during the first recv), got %d", len(receiveSteps))
15391539
for i, step := range receiveSteps {
15401540
require.Equal(t, i, step.StepID, "expected step %d to have correct StepID", i)
1541-
if i == 1 {
1541+
if i%2 == 1 {
15421542
require.Equal(t, "DBOS.sleep", step.StepName, "expected step %d to have StepName 'DBOS.sleep'", i)
15431543
} else {
15441544
require.Equal(t, "DBOS.recv", step.StepName, "expected step %d to have StepName 'DBOS.recv'", i)
@@ -1654,10 +1654,10 @@ func TestSendRecv(t *testing.T) {
16541654
// Verify step counting for receive workflow (calls Recv 3 times, each with sleep)
16551655
receiveSteps, err := GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID())
16561656
require.NoError(t, err, "failed to get workflow steps for receive workflow")
1657-
require.Len(t, receiveSteps, 4, "expected 4 steps in receive workflow (3 Recv calls + 1 sleep calls), got %d", len(receiveSteps))
1657+
require.Len(t, receiveSteps, 6, "expected 4 steps in receive workflow (3 Recv calls + 1 sleep calls), got %d", len(receiveSteps))
16581658
for i, step := range receiveSteps {
16591659
require.Equal(t, i, step.StepID, "expected step %d to have correct StepID", i)
1660-
if i == 1 {
1660+
if i%2 == 1 {
16611661
require.Equal(t, "DBOS.sleep", step.StepName, "expected recv step %d to have StepName 'DBOS.sleep'", i)
16621662
} else {
16631663
require.Equal(t, "DBOS.recv", step.StepName, "expected recv step %d to have StepName 'DBOS.recv'", i)

0 commit comments

Comments
 (0)