Skip to content

Commit 0f05e7c

Browse files
committed
actually check DLQ
1 parent a680f8f commit 0f05e7c

File tree

2 files changed

+300
-826
lines changed

2 files changed

+300
-826
lines changed

dbos/queues_test.go

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -217,16 +217,32 @@ func TestWorkflowQueues(t *testing.T) {
217217
recoveryHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
218218
require.NoError(t, err, "failed to recover pending workflows")
219219
assert.Len(t, recoveryHandles, 1, "expected 1 handle")
220+
t.Logf("Recovered handle: %v, index: %d", recoveryHandles[0].GetWorkflowID(), i)
220221
dlqStartEvent.Wait()
221222
dlqStartEvent.Clear()
222223
handle := recoveryHandles[0]
223224
handles = append(handles, handle)
224225
status, err := handle.GetStatus()
225226
require.NoError(t, err, "failed to get status of recovered workflow handle")
226-
if i == dlqMaxRetries {
227-
// On the last retry, the workflow should be in DLQ
228-
assert.Equal(t, WorkflowStatusRetriesExceeded, status.Status, "expected workflow status to be %s", WorkflowStatusRetriesExceeded)
227+
assert.Equal(t, WorkflowStatusPending, status.Status, "expected workflow to be in PENDING status after recovery")
228+
}
229+
230+
dlqHandle, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"})
231+
require.NoError(t, err, "failed to recover pending workflows")
232+
assert.Len(t, dlqHandle, 1, "expected 1 handle in DLQ")
233+
retries := 0
234+
for {
235+
dlqStatus, err := dlqHandle[0].GetStatus()
236+
require.NoError(t, err, "failed to get status of DLQ workflow handle")
237+
if dlqStatus.Status != WorkflowStatusRetriesExceeded && retries < 10 {
238+
time.Sleep(1 * time.Second) // Wait a bit before checking again
239+
retries++
240+
continue
229241
}
242+
require.NoError(t, err, "failed to get status of DLQ workflow handle")
243+
assert.Equal(t, WorkflowStatusRetriesExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
244+
handles = append(handles, dlqHandle[0])
245+
break
230246
}
231247

232248
// Check the workflow completes
@@ -258,8 +274,8 @@ func TestWorkflowQueues(t *testing.T) {
258274
require.Error(t, err, "expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")
259275

260276
// Check that it's the correct error type
261-
dbosErr, ok := err.(*DBOSError)
262-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
277+
var dbosErr *DBOSError
278+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
263279

264280
assert.Equal(t, ConflictingWorkflowError, dbosErr.Code, "expected error code to be ConflictingWorkflowError")
265281

@@ -301,8 +317,8 @@ func TestWorkflowQueues(t *testing.T) {
301317
require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID")
302318

303319
// Check that it's the correct error type and message
304-
dbosErr, ok := err.(*DBOSError)
305-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
320+
var dbosErr *DBOSError
321+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
306322
assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated")
307323

308324
expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, dedupQueue.Name, dedupID)
@@ -416,14 +432,14 @@ func TestQueueRecovery(t *testing.T) {
416432
castedResult, ok := result.([]int)
417433
require.True(t, ok, "expected result to be of type []int for root workflow, got %T", result)
418434
expectedResult := []int{0, 1, 2, 3, 4}
419-
assert.True(t, equal(castedResult, expectedResult), "expected result %v, got %v", expectedResult, castedResult)
435+
assert.Equal(t, expectedResult, castedResult, "expected result %v, got %v", expectedResult, castedResult)
420436
}
421437
}
422438

423439
result, err := handle.GetResult()
424440
require.NoError(t, err, "failed to get result from original handle")
425441
expectedResult := []int{0, 1, 2, 3, 4}
426-
assert.True(t, equal(result, expectedResult), "expected result %v, got %v", expectedResult, result)
442+
assert.Equal(t, expectedResult, result, "expected result %v, got %v", expectedResult, result)
427443

428444
assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to be %d", queuedSteps*2)
429445

@@ -432,7 +448,7 @@ func TestQueueRecovery(t *testing.T) {
432448
require.NoError(t, err, "failed to rerun workflow")
433449
rerunResult, err := rerunHandle.GetResult()
434450
require.NoError(t, err, "failed to get result from rerun handle")
435-
assert.True(t, equal(rerunResult, expectedResult), "expected result %v, got %v", expectedResult, rerunResult)
451+
assert.Equal(t, expectedResult, rerunResult, "expected result %v, got %v", expectedResult, rerunResult)
436452

437453
assert.Equal(t, int64(queuedSteps*2), atomic.LoadInt64(&recoveryStepCounter), "expected recoveryStepCounter to remain %d", queuedSteps*2)
438454

@@ -794,9 +810,7 @@ func TestQueueTimeouts(t *testing.T) {
794810
queuedWaitForCancelWorkflow := func(ctx DBOSContext, _ string) (string, error) {
795811
// This workflow will wait indefinitely until it is cancelled
796812
<-ctx.Done()
797-
if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) {
798-
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
799-
}
813+
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded), "workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
800814
return "", ctx.Err()
801815
}
802816
RegisterWorkflow(dbosCtx, queuedWaitForCancelWorkflow)
@@ -808,8 +822,8 @@ func TestQueueTimeouts(t *testing.T) {
808822
// Workflow should get AwaitedWorkflowCancelled DBOSError
809823
_, err = handle.GetResult()
810824
require.Error(t, err, "expected error when waiting for enqueued workflow to complete, but got none")
811-
dbosErr, ok := err.(*DBOSError)
812-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
825+
var dbosErr *DBOSError
826+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
813827
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
814828

815829
// enqueud workflow should have been cancelled
@@ -877,8 +891,8 @@ func TestQueueTimeouts(t *testing.T) {
877891
require.Error(t, err, "expected error but got none")
878892

879893
// Check the error type
880-
dbosErr, ok := err.(*DBOSError)
881-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
894+
var dbosErr *DBOSError
895+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
882896

883897
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
884898

@@ -905,8 +919,8 @@ func TestQueueTimeouts(t *testing.T) {
905919
require.Error(t, err, "expected error but got none")
906920

907921
// Check the error type
908-
dbosErr, ok := err.(*DBOSError)
909-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
922+
var dbosErr *DBOSError
923+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
910924

911925
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
912926

@@ -934,8 +948,8 @@ func TestQueueTimeouts(t *testing.T) {
934948
require.Error(t, err, "expected error but got none")
935949

936950
// Check the error type
937-
dbosErr, ok := err.(*DBOSError)
938-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
951+
var dbosErr *DBOSError
952+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
939953

940954
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
941955

0 commit comments

Comments
 (0)