Skip to content

Commit bcfd555

Browse files
committed
Revert "more exhaustive recovery test"
This reverts commit 630532d.
1 parent 630532d commit bcfd555

File tree

3 files changed

+40
-131
lines changed

3 files changed

+40
-131
lines changed

dbos/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestEnqueue(t *testing.T) {
235235

236236
// After first workflow completes, we should be able to enqueue with same deduplication ID
237237
handle5, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"},
238-
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
238+
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
239239
WithEnqueueDeduplicationID(dedupID), // Same deduplication ID as first workflow
240240
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
241241
require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion")

dbos/serialization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func TestSetEventSerialize(t *testing.T) {
236236
assert.Equal(t, "user-defined-event-set", result)
237237

238238
// Retrieve the event to verify it was properly serialized and can be deserialized
239-
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3*time.Second)
239+
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second)
240240
require.NoError(t, err)
241241

242242
// Verify the retrieved data matches what we set
@@ -273,7 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,
273273

274274
func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) {
275275
// Receive the user-defined type message
276-
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3*time.Second)
276+
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second)
277277
return result, err
278278
}
279279

dbos/workflows_test.go

Lines changed: 37 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,148 +1009,57 @@ func TestWorkflowIdempotency(t *testing.T) {
10091009

10101010
func TestWorkflowRecovery(t *testing.T) {
10111011
dbosCtx := setupDBOS(t, true, true)
1012+
RegisterWorkflow(dbosCtx, idempotencyWorkflowWithStep)
1013+
t.Run("RecoveryResumeWhereItLeftOff", func(t *testing.T) {
1014+
// Reset the global counter
1015+
idempotencyCounter = 0
10121016

1013-
var (
1014-
recoveryCounters []int64
1015-
recoveryEvents []*Event
1016-
blockingEvents []*Event
1017-
)
1018-
1019-
recoveryWorkflow := func(dbosCtx DBOSContext, index int) (int64, error) {
1020-
// First step with custom name - increments the counter
1021-
_, err := RunAsStep(dbosCtx, func(ctx context.Context) (int64, error) {
1022-
recoveryCounters[index]++
1023-
return recoveryCounters[index], nil
1024-
}, WithStepName(fmt.Sprintf("IncrementStep-%d", index)))
1025-
if err != nil {
1026-
return 0, err
1027-
}
1028-
1029-
// Signal that first step is complete
1030-
recoveryEvents[index].Set()
1031-
1032-
// Second step with custom name - blocks until signaled
1033-
_, err = RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
1034-
blockingEvents[index].Wait()
1035-
return fmt.Sprintf("completed-%d", index), nil
1036-
}, WithStepName(fmt.Sprintf("BlockingStep-%d", index)))
1037-
if err != nil {
1038-
return 0, err
1039-
}
1040-
1041-
return recoveryCounters[index], nil
1042-
}
1043-
1044-
RegisterWorkflow(dbosCtx, recoveryWorkflow)
1045-
1046-
t.Run("WorkflowRecovery", func(t *testing.T) {
1047-
const numWorkflows = 5
1048-
1049-
// Initialize slices for multiple workflows
1050-
recoveryCounters = make([]int64, numWorkflows)
1051-
recoveryEvents = make([]*Event, numWorkflows)
1052-
blockingEvents = make([]*Event, numWorkflows)
1053-
1054-
// Create events for each workflow
1055-
for i := range numWorkflows {
1056-
recoveryEvents[i] = NewEvent()
1057-
blockingEvents[i] = NewEvent()
1058-
}
1059-
1060-
// Start all workflows
1061-
handles := make([]WorkflowHandle[int64], numWorkflows)
1062-
for i := range numWorkflows {
1063-
handle, err := RunAsWorkflow(dbosCtx, recoveryWorkflow, i, WithWorkflowID(fmt.Sprintf("recovery-test-%d", i)))
1064-
require.NoError(t, err, "failed to start workflow %d", i)
1065-
handles[i] = handle
1066-
}
1067-
1068-
// Wait for all first steps to complete
1069-
for i := range numWorkflows {
1070-
recoveryEvents[i].Wait()
1071-
}
1072-
1073-
// Verify step states before recovery
1074-
for i := range numWorkflows {
1075-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID())
1076-
require.NoError(t, err, "failed to get steps for workflow %d", i)
1077-
require.Len(t, steps, 1, "expected 1 completed step for workflow %d before recovery", i)
1078-
1079-
// Verify first step has custom name and completed
1080-
assert.Equal(t, fmt.Sprintf("IncrementStep-%d", i), steps[0].StepName, "workflow %d first step name mismatch", i)
1081-
assert.Equal(t, 0, steps[0].StepID, "workflow %d first step ID should be 0", i)
1082-
assert.NotNil(t, steps[0].Output, "workflow %d first step should have output", i)
1083-
assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i)
1084-
}
1017+
// First execution - run the workflow once
1018+
input := "recovery-test"
1019+
idempotencyWorkflowWithStepEvent = NewEvent()
1020+
blockingStepStopEvent = NewEvent()
1021+
handle1, err := RunAsWorkflow(dbosCtx, idempotencyWorkflowWithStep, input)
1022+
require.NoError(t, err, "failed to execute workflow first time")
10851023

1086-
// Verify counters are all 1 (executed once)
1087-
for i := range numWorkflows {
1088-
require.Equal(t, int64(1), recoveryCounters[i], "workflow %d counter should be 1 before recovery", i)
1089-
}
1024+
idempotencyWorkflowWithStepEvent.Wait() // Wait for the first step to complete. The second spins forever.
10901025

1091-
// Run recovery
1026+
// Run recovery for pending workflows with "local" executor
10921027
recoveredHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
10931028
require.NoError(t, err, "failed to recover pending workflows")
1094-
require.Len(t, recoveredHandles, numWorkflows, "expected %d recovered handles, got %d", numWorkflows, len(recoveredHandles))
10951029

1096-
// Create a map for easy lookup of recovered handles
1097-
recoveredMap := make(map[string]WorkflowHandle[any])
1098-
for _, h := range recoveredHandles {
1099-
recoveredMap[h.GetWorkflowID()] = h
1100-
}
1101-
1102-
// Verify all original workflows were recovered
1103-
for i := range numWorkflows {
1104-
originalID := handles[i].GetWorkflowID()
1105-
recoveredHandle, found := recoveredMap[originalID]
1106-
require.True(t, found, "workflow %d with ID %s not found in recovered handles", i, originalID)
1030+
// Check that we have a single handle in the return list
1031+
require.Len(t, recoveredHandles, 1, "expected 1 recovered handle, got %d", len(recoveredHandles))
11071032

1108-
_, ok := recoveredHandle.(*workflowPollingHandle[any])
1109-
require.True(t, ok, "recovered handle %d should be of type workflowPollingHandle, got %T", i, recoveredHandle)
1110-
}
1033+
// Check that the workflow ID from the handle is the same as the first handle
1034+
recoveredHandle := recoveredHandles[0]
1035+
_, ok := recoveredHandle.(*workflowPollingHandle[any])
1036+
require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", recoveredHandle)
1037+
require.Equal(t, handle1.GetWorkflowID(), recoveredHandle.GetWorkflowID())
11111038

1112-
// Verify first steps were NOT re-executed (counters should still be 1)
1113-
for i := range numWorkflows {
1114-
require.Equal(t, int64(1), recoveryCounters[i], "workflow %d counter should remain 1 after recovery (idempotent)", i)
1115-
}
1039+
idempotencyWorkflowWithStepEvent.Clear()
1040+
idempotencyWorkflowWithStepEvent.Wait()
11161041

1117-
// Verify workflow attempts increased to 2
1118-
for i := range numWorkflows {
1119-
workflows, err := dbosCtx.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
1120-
workflowIDs: []string{handles[i].GetWorkflowID()},
1121-
})
1122-
require.NoError(t, err, "failed to list workflow %d", i)
1123-
require.Len(t, workflows, 1, "expected 1 workflow entry for workflow %d", i)
1124-
assert.Equal(t, 2, workflows[0].Attempts, "workflow %d should have 2 attempts after recovery", i)
1125-
}
1042+
// Check that the first step was *not* re-executed (idempotency counter is still 1)
1043+
require.Equal(t, int64(1), idempotencyCounter, "expected counter to remain 1 after recovery (idempotent)")
11261044

1127-
// Unblock all workflows and verify they complete
1128-
for i := range numWorkflows {
1129-
blockingEvents[i].Set()
1130-
}
1045+
// Using ListWorkflows, retrieve the status of the workflow
1046+
workflows, err := dbosCtx.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
1047+
workflowIDs: []string{handle1.GetWorkflowID()},
1048+
})
1049+
require.NoError(t, err, "failed to list workflows")
11311050

1132-
// Get results from all recovered workflows
1133-
for i := range numWorkflows {
1134-
recoveredHandle := recoveredMap[handles[i].GetWorkflowID()]
1135-
result, err := recoveredHandle.GetResult()
1136-
require.NoError(t, err, "failed to get result from recovered workflow %d", i)
1051+
require.Len(t, workflows, 1, "expected 1 workflow, got %d", len(workflows))
11371052

1138-
// Result should be the counter value (1)
1139-
require.Equal(t, int64(1), result, "workflow %d result should be 1", i)
1140-
}
1053+
workflow := workflows[0]
11411054

1142-
// Final verification of step states
1143-
for i := range numWorkflows {
1144-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID())
1145-
require.NoError(t, err, "failed to get final steps for workflow %d", i)
1146-
require.Len(t, steps, 2, "expected 2 steps for workflow %d", i)
1055+
// Ensure its number of attempts is 2
1056+
require.Equal(t, 2, workflow.Attempts)
11471057

1148-
// Both steps should now be completed
1149-
assert.NotNil(t, steps[0].Output, "workflow %d first step should have output", i)
1150-
assert.NotNil(t, steps[1].Output, "workflow %d second step should have output", i)
1151-
assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i)
1152-
assert.Nil(t, steps[1].Error, "workflow %d second step should not have error", i)
1153-
}
1058+
// unlock the workflow & wait for result
1059+
blockingStepStopEvent.Set() // This will allow the blocking step to complete
1060+
result, err := recoveredHandle.GetResult()
1061+
require.NoError(t, err, "failed to get result from recovered handle")
1062+
require.Equal(t, idempotencyCounter, result)
11541063
})
11551064
}
11561065

0 commit comments

Comments
 (0)