Skip to content

Commit ad46326

Browse files
committed
more exhaustive recovery test
1 parent 503d46c commit ad46326

File tree

3 files changed

+131
-40
lines changed

3 files changed

+131
-40
lines changed

dbos/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestEnqueue(t *testing.T) {
235235

236236
// After first workflow completes, we should be able to enqueue with same deduplication ID
237237
handle5, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"},
238-
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
238+
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
239239
WithEnqueueDeduplicationID(dedupID), // Same deduplication ID as first workflow
240240
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
241241
require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion")

dbos/serialization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func TestSetEventSerialize(t *testing.T) {
236236
assert.Equal(t, "user-defined-event-set", result)
237237

238238
// Retrieve the event to verify it was properly serialized and can be deserialized
239-
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second)
239+
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3*time.Second)
240240
require.NoError(t, err)
241241

242242
// Verify the retrieved data matches what we set
@@ -273,7 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,
273273

274274
func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) {
275275
// Receive the user-defined type message
276-
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second)
276+
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3*time.Second)
277277
return result, err
278278
}
279279

dbos/workflows_test.go

Lines changed: 128 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,57 +1009,148 @@ func TestWorkflowIdempotency(t *testing.T) {
10091009

10101010
func TestWorkflowRecovery(t *testing.T) {
10111011
dbosCtx := setupDBOS(t, true, true)
1012-
RegisterWorkflow(dbosCtx, idempotencyWorkflowWithStep)
1013-
t.Run("RecoveryResumeWhereItLeftOff", func(t *testing.T) {
1014-
// Reset the global counter
1015-
idempotencyCounter = 0
10161012

1017-
// First execution - run the workflow once
1018-
input := "recovery-test"
1019-
idempotencyWorkflowWithStepEvent = NewEvent()
1020-
blockingStepStopEvent = NewEvent()
1021-
handle1, err := RunAsWorkflow(dbosCtx, idempotencyWorkflowWithStep, input)
1022-
require.NoError(t, err, "failed to execute workflow first time")
1013+
var (
1014+
recoveryCounters []int64
1015+
recoveryEvents []*Event
1016+
blockingEvents []*Event
1017+
)
1018+
1019+
recoveryWorkflow := func(dbosCtx DBOSContext, index int) (int64, error) {
1020+
// First step with custom name - increments the counter
1021+
_, err := RunAsStep(dbosCtx, func(ctx context.Context) (int64, error) {
1022+
recoveryCounters[index]++
1023+
return recoveryCounters[index], nil
1024+
}, WithStepName(fmt.Sprintf("IncrementStep-%d", index)))
1025+
if err != nil {
1026+
return 0, err
1027+
}
1028+
1029+
// Signal that first step is complete
1030+
recoveryEvents[index].Set()
1031+
1032+
// Second step with custom name - blocks until signaled
1033+
_, err = RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
1034+
blockingEvents[index].Wait()
1035+
return fmt.Sprintf("completed-%d", index), nil
1036+
}, WithStepName(fmt.Sprintf("BlockingStep-%d", index)))
1037+
if err != nil {
1038+
return 0, err
1039+
}
1040+
1041+
return recoveryCounters[index], nil
1042+
}
1043+
1044+
RegisterWorkflow(dbosCtx, recoveryWorkflow)
1045+
1046+
t.Run("WorkflowRecovery", func(t *testing.T) {
1047+
const numWorkflows = 5
1048+
1049+
// Initialize slices for multiple workflows
1050+
recoveryCounters = make([]int64, numWorkflows)
1051+
recoveryEvents = make([]*Event, numWorkflows)
1052+
blockingEvents = make([]*Event, numWorkflows)
1053+
1054+
// Create events for each workflow
1055+
for i := range numWorkflows {
1056+
recoveryEvents[i] = NewEvent()
1057+
blockingEvents[i] = NewEvent()
1058+
}
1059+
1060+
// Start all workflows
1061+
handles := make([]WorkflowHandle[int64], numWorkflows)
1062+
for i := range numWorkflows {
1063+
handle, err := RunAsWorkflow(dbosCtx, recoveryWorkflow, i, WithWorkflowID(fmt.Sprintf("recovery-test-%d", i)))
1064+
require.NoError(t, err, "failed to start workflow %d", i)
1065+
handles[i] = handle
1066+
}
1067+
1068+
// Wait for all first steps to complete
1069+
for i := range numWorkflows {
1070+
recoveryEvents[i].Wait()
1071+
}
1072+
1073+
// Verify step states before recovery
1074+
for i := range numWorkflows {
1075+
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID())
1076+
require.NoError(t, err, "failed to get steps for workflow %d", i)
1077+
require.Len(t, steps, 1, "expected 1 completed step for workflow %d before recovery", i)
10231078

1024-
idempotencyWorkflowWithStepEvent.Wait() // Wait for the first step to complete. The second spins forever.
1079+
// Verify first step has custom name and completed
1080+
assert.Equal(t, fmt.Sprintf("IncrementStep-%d", i), steps[0].StepName, "workflow %d first step name mismatch", i)
1081+
assert.Equal(t, 0, steps[0].StepID, "workflow %d first step ID should be 0", i)
1082+
assert.NotNil(t, steps[0].Output, "workflow %d first step should have output", i)
1083+
assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i)
1084+
}
1085+
1086+
// Verify counters are all 1 (executed once)
1087+
for i := range numWorkflows {
1088+
require.Equal(t, int64(1), recoveryCounters[i], "workflow %d counter should be 1 before recovery", i)
1089+
}
10251090

1026-
// Run recovery for pending workflows with "local" executor
1091+
// Run recovery
10271092
recoveredHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
10281093
require.NoError(t, err, "failed to recover pending workflows")
1094+
require.Len(t, recoveredHandles, numWorkflows, "expected %d recovered handles, got %d", numWorkflows, len(recoveredHandles))
10291095

1030-
// Check that we have a single handle in the return list
1031-
require.Len(t, recoveredHandles, 1, "expected 1 recovered handle, got %d", len(recoveredHandles))
1096+
// Create a map for easy lookup of recovered handles
1097+
recoveredMap := make(map[string]WorkflowHandle[any])
1098+
for _, h := range recoveredHandles {
1099+
recoveredMap[h.GetWorkflowID()] = h
1100+
}
10321101

1033-
// Check that the workflow ID from the handle is the same as the first handle
1034-
recoveredHandle := recoveredHandles[0]
1035-
_, ok := recoveredHandle.(*workflowPollingHandle[any])
1036-
require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", recoveredHandle)
1037-
require.Equal(t, handle1.GetWorkflowID(), recoveredHandle.GetWorkflowID())
1102+
// Verify all original workflows were recovered
1103+
for i := range numWorkflows {
1104+
originalID := handles[i].GetWorkflowID()
1105+
recoveredHandle, found := recoveredMap[originalID]
1106+
require.True(t, found, "workflow %d with ID %s not found in recovered handles", i, originalID)
1107+
1108+
_, ok := recoveredHandle.(*workflowPollingHandle[any])
1109+
require.True(t, ok, "recovered handle %d should be of type workflowPollingHandle, got %T", i, recoveredHandle)
1110+
}
10381111

1039-
idempotencyWorkflowWithStepEvent.Clear()
1040-
idempotencyWorkflowWithStepEvent.Wait()
1112+
// Verify first steps were NOT re-executed (counters should still be 1)
1113+
for i := range numWorkflows {
1114+
require.Equal(t, int64(1), recoveryCounters[i], "workflow %d counter should remain 1 after recovery (idempotent)", i)
1115+
}
10411116

1042-
// Check that the first step was *not* re-executed (idempotency counter is still 1)
1043-
require.Equal(t, int64(1), idempotencyCounter, "expected counter to remain 1 after recovery (idempotent)")
1117+
// Verify workflow attempts increased to 2
1118+
for i := range numWorkflows {
1119+
workflows, err := dbosCtx.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
1120+
workflowIDs: []string{handles[i].GetWorkflowID()},
1121+
})
1122+
require.NoError(t, err, "failed to list workflow %d", i)
1123+
require.Len(t, workflows, 1, "expected 1 workflow entry for workflow %d", i)
1124+
assert.Equal(t, 2, workflows[0].Attempts, "workflow %d should have 2 attempts after recovery", i)
1125+
}
10441126

1045-
// Using ListWorkflows, retrieve the status of the workflow
1046-
workflows, err := dbosCtx.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
1047-
workflowIDs: []string{handle1.GetWorkflowID()},
1048-
})
1049-
require.NoError(t, err, "failed to list workflows")
1127+
// Unblock all workflows and verify they complete
1128+
for i := range numWorkflows {
1129+
blockingEvents[i].Set()
1130+
}
10501131

1051-
require.Len(t, workflows, 1, "expected 1 workflow, got %d", len(workflows))
1132+
// Get results from all recovered workflows
1133+
for i := range numWorkflows {
1134+
recoveredHandle := recoveredMap[handles[i].GetWorkflowID()]
1135+
result, err := recoveredHandle.GetResult()
1136+
require.NoError(t, err, "failed to get result from recovered workflow %d", i)
10521137

1053-
workflow := workflows[0]
1138+
// Result should be the counter value (1)
1139+
require.Equal(t, int64(1), result, "workflow %d result should be 1", i)
1140+
}
10541141

1055-
// Ensure its number of attempts is 2
1056-
require.Equal(t, 2, workflow.Attempts)
1142+
// Final verification of step states
1143+
for i := range numWorkflows {
1144+
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID())
1145+
require.NoError(t, err, "failed to get final steps for workflow %d", i)
1146+
require.Len(t, steps, 2, "expected 2 steps for workflow %d", i)
10571147

1058-
// unlock the workflow & wait for result
1059-
blockingStepStopEvent.Set() // This will allow the blocking step to complete
1060-
result, err := recoveredHandle.GetResult()
1061-
require.NoError(t, err, "failed to get result from recovered handle")
1062-
require.Equal(t, idempotencyCounter, result)
1148+
// Both steps should now be completed
1149+
assert.NotNil(t, steps[0].Output, "workflow %d first step should have output", i)
1150+
assert.NotNil(t, steps[1].Output, "workflow %d second step should have output", i)
1151+
assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i)
1152+
assert.Nil(t, steps[1].Error, "workflow %d second step should not have error", i)
1153+
}
10631154
})
10641155
}
10651156

0 commit comments

Comments
 (0)