Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ func TestWorkerConcurrencyXRecovery(t *testing.T) {
// Start the first workflow and wait for it to start
workerConcurrencyRecoveryStartEvent1.Wait()
workerConcurrencyRecoveryStartEvent1.Clear()
// Wait for a few seconds to let the queue runner loop
time.Sleep(2 * time.Second)

// Ensure the 2nd workflow is still ENQUEUED
status2, err := handle2.GetStatus()
Expand Down
2 changes: 1 addition & 1 deletion dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro

// Check if operation was already executed and do nothing if so
checkInput := CheckOperationExecutionDBInput{
workflowID: input.DestinationID,
workflowID: workflowState.WorkflowID,
operationID: stepID,
functionName: functionName,
tx: tx,
Expand Down
116 changes: 107 additions & 9 deletions dbos/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,16 @@ func stepRetryAlwaysFailsStep(ctx context.Context, input string) (string, error)
return "", fmt.Errorf("always fails - attempt %d", stepRetryAttemptCount)
}

var stepIdempotencyCounter int

func stepIdempotencyTest(ctx context.Context, input string) (string, error) {
stepIdempotencyCounter++
fmt.Println("Executing idempotency step:", stepIdempotencyCounter)
return input, nil
}

func stepRetryWorkflow(ctx context.Context, input string) (string, error) {
RunAsStep(ctx, stepIdempotencyTest, input)
return RunAsStep(ctx, stepRetryAlwaysFailsStep, input,
WithStepMaxRetries(5),
WithBackoffFactor(2.0),
Expand Down Expand Up @@ -400,8 +409,9 @@ func TestSteps(t *testing.T) {
})

t.Run("StepRetryWithExponentialBackoff", func(t *testing.T) {
// Reset the global counter before test
// Reset the global counters before test
stepRetryAttemptCount = 0
stepIdempotencyCounter = 0

// Execute the workflow
handle, err := stepRetryWf(context.Background(), "test")
Expand Down Expand Up @@ -449,19 +459,24 @@ func TestSteps(t *testing.T) {
t.Fatal("failed to get workflow steps:", err)
}

if len(steps) != 1 {
t.Fatalf("expected 1 recorded step, got %d", len(steps))
if len(steps) != 2 {
t.Fatalf("expected 2 recorded step, got %d", len(steps))
}

// Verify the recorded step has the error
step := steps[0]
// Verify the second step has the error
step := steps[1]
if step.Error == nil {
t.Fatal("expected error in recorded step, got none")
}

if step.Error.Error() != dbosErr.Error() {
t.Fatalf("expected recorded step error to match joined error, got '%s', expected '%s'", step.Error.Error(), dbosErr.Error())
}

// Verify the idempotency step was executed only once
if stepIdempotencyCounter != 1 {
t.Fatalf("expected idempotency step to be executed only once, got %d", stepIdempotencyCounter)
}
})
}

Expand Down Expand Up @@ -980,10 +995,15 @@ func TestScheduledWorkflows(t *testing.T) {
}

var (
sendWf = WithWorkflow(sendWorkflow)
receiveWf = WithWorkflow(receiveWorkflow)
sendStructWf = WithWorkflow(sendStructWorkflow)
receiveStructWf = WithWorkflow(receiveStructWorkflow)
sendWf = WithWorkflow(sendWorkflow)
receiveWf = WithWorkflow(receiveWorkflow)
sendStructWf = WithWorkflow(sendStructWorkflow)
receiveStructWf = WithWorkflow(receiveStructWorkflow)
sendIdempotencyWf = WithWorkflow(sendIdempotencyWorkflow)
sendIdempotencyEvent = NewEvent()
recvIdempotencyWf = WithWorkflow(receiveIdempotencyWorkflow)
receiveIdempotencyStartEvent = NewEvent()
receiveIdempotencyStopEvent = NewEvent()
)

type sendWorkflowInput struct {
Expand Down Expand Up @@ -1035,6 +1055,25 @@ func receiveStructWorkflow(ctx context.Context, topic string) (sendRecvType, err
return Recv[sendRecvType](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
}

func sendIdempotencyWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) {
err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "m1"})
if err != nil {
return "", err
}
sendIdempotencyEvent.Wait()
return "idempotent-send-completed", nil
}

func receiveIdempotencyWorkflow(ctx context.Context, topic string) (string, error) {
msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second})
if err != nil {
return "", err
}
receiveIdempotencyStartEvent.Set()
receiveIdempotencyStopEvent.Wait()
return msg, nil
}

type sendRecvType struct {
Value string
}
Expand Down Expand Up @@ -1245,5 +1284,64 @@ func TestSendRecv(t *testing.T) {
t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error())
}
})
t.Run("SendRecvIdempotency", func(t *testing.T) {
// Start the receive workflow and wait for it to be ready
receiveHandle, err := recvIdempotencyWf(context.Background(), "idempotency-topic")
if err != nil {
t.Fatalf("failed to start receive idempotency workflow: %v", err)
}

// Send the message to the receive workflow
sendHandle, err := sendIdempotencyWf(context.Background(), sendWorkflowInput{
DestinationID: receiveHandle.GetWorkflowID(),
Topic: "idempotency-topic",
})
if err != nil {
t.Fatalf("failed to send idempotency message: %v", err)
}

// Wait for the receive workflow to have received the message
receiveIdempotencyStartEvent.Wait()

// Attempt recovering both workflows. There should be only 2 steps recorded after recovery.
recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"})
if err != nil {
t.Fatalf("failed to recover pending workflows: %v", err)
}
if len(recoveredHandles) != 2 {
t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles))
}
steps, err := getExecutor().systemDB.GetWorkflowSteps(context.Background(), sendHandle.GetWorkflowID())
if err != nil {
t.Fatalf("failed to get steps for send idempotency workflow: %v", err)
}
if len(steps) != 1 {
t.Fatalf("expected 1 step in send idempotency workflow, got %d", len(steps))
}
steps, err = getExecutor().systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID())
if err != nil {
t.Fatalf("failed to get steps for receive idempotency workflow: %v", err)
}
if len(steps) != 1 {
t.Fatalf("expected 1 step in receive idempotency workflow, got %d", len(steps))
}

// Unblock the workflows to complete
receiveIdempotencyStopEvent.Set()
result, err := receiveHandle.GetResult(context.Background())
if err != nil {
t.Fatalf("failed to get result from receive idempotency workflow: %v", err)
}
if result != "m1" {
t.Fatalf("expected result to be 'm1', got '%s'", result)
}
sendIdempotencyEvent.Set()
result, err = sendHandle.GetResult(context.Background())
if err != nil {
t.Fatalf("failed to get result from send idempotency workflow: %v", err)
}
if result != "idempotent-send-completed" {
t.Fatalf("expected result to be 'idempotent-send-completed', got '%s'", result)
}
})
}
Loading