Skip to content

Commit 2097221

Browse files
committed
test ListQueues + set default statuses when WithQueuesOnly is given
1 parent 08d96eb commit 2097221

File tree

2 files changed

+110
-19
lines changed

2 files changed

+110
-19
lines changed

dbos/queues_test.go

Lines changed: 104 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,6 @@ import (
1717
"github.com/stretchr/testify/require"
1818
)
1919

20-
/**
21-
This suite tests
22-
[x] Normal wf with a step
23-
[x] enqueued workflow starts a child workflow
24-
[x] workflow enqueues another workflow
25-
[x] recover queued workflow
26-
[x] queued workflow DLQ
27-
[x] global concurrency (one at a time with a single queue and a single worker)
28-
[x] worker concurrency (2 at a time across two "workers")
29-
[x] worker concurrency X recovery
30-
[x] rate limiter
31-
[x] conflicting workflow on different queues
32-
[x] queue deduplication
33-
[x] queue priority
34-
[x] queued workflow times out
35-
[] scheduled workflow enqueues another workflow
36-
*/
37-
3820
func queueWorkflow(ctx DBOSContext, input string) (string, error) {
3921
step1, err := RunAsStep(ctx, func(context context.Context) (string, error) {
4022
return queueStep(context, input)
@@ -1227,3 +1209,107 @@ func TestPriorityQueue(t *testing.T) {
12271209

12281210
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
12291211
}
1212+
1213+
func TestListQueuedWorkflows(t *testing.T) {
1214+
dbosCtx := setupDBOS(t, true, true)
1215+
1216+
// Simple test workflow that completes immediately
1217+
testWorkflow := func(ctx DBOSContext, input string) (string, error) {
1218+
return "completed-" + input, nil
1219+
}
1220+
1221+
// Blocking workflow for testing pending/enqueued workflows
1222+
startEvent := NewEvent()
1223+
blockEvent := NewEvent()
1224+
blockingWorkflow := func(ctx DBOSContext, input string) (string, error) {
1225+
startEvent.Set()
1226+
blockEvent.Wait()
1227+
return "blocked-" + input, nil
1228+
}
1229+
1230+
RegisterWorkflow(dbosCtx, testWorkflow)
1231+
RegisterWorkflow(dbosCtx, blockingWorkflow)
1232+
1233+
// Create queue for testing
1234+
testQueue1 := NewWorkflowQueue(dbosCtx, "list-test-queue", WithGlobalConcurrency(1))
1235+
testQueue2 := NewWorkflowQueue(dbosCtx, "list-test-queue2", WithGlobalConcurrency(1))
1236+
1237+
err := dbosCtx.Launch()
1238+
require.NoError(t, err, "failed to launch DBOS")
1239+
1240+
t.Run("WithQueuesOnly", func(t *testing.T) {
1241+
blockEvent.Clear()
1242+
startEvent.Clear()
1243+
// Create a non-queued workflow (completed) - this should NOT appear in WithQueuesOnly results
1244+
nonQueuedHandle, err := RunWorkflow(dbosCtx, testWorkflow, "non-queued-test1")
1245+
require.NoError(t, err, "failed to start non-queued workflow")
1246+
_, err = nonQueuedHandle.GetResult()
1247+
require.NoError(t, err, "failed to complete non-queued workflow")
1248+
1249+
// Create queued workflows that will be pending/enqueued
1250+
queuedHandle1, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-1-test1", WithQueue(testQueue1.Name))
1251+
require.NoError(t, err, "failed to start queued workflow 1")
1252+
1253+
queuedHandle2, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-2-test1", WithQueue(testQueue1.Name))
1254+
require.NoError(t, err, "failed to start queued workflow 2")
1255+
1256+
startEvent.Wait()
1257+
1258+
// List workflows with WithQueuesOnly - should only return queued workflows
1259+
queuedWorkflows, err := ListWorkflows(dbosCtx, WithQueuesOnly())
1260+
require.NoError(t, err, "failed to list queued workflows")
1261+
1262+
// Verify all returned workflows are in a queue and have pending/enqueued status
1263+
require.Equal(t, 2, len(queuedWorkflows), "expected 2 queued workflows to be returned")
1264+
for _, wf := range queuedWorkflows {
1265+
require.NotEmpty(t, wf.QueueName, "workflow %s should have a queue name", wf.ID)
1266+
require.True(t, wf.Status == WorkflowStatusPending || wf.Status == WorkflowStatusEnqueued,
1267+
"workflow %s status should be PENDING or ENQUEUED, got %s", wf.ID, wf.Status)
1268+
require.True(t, wf.ID == queuedHandle1.GetWorkflowID() || wf.ID == queuedHandle2.GetWorkflowID())
1269+
}
1270+
1271+
// Unblock the workflows for cleanup
1272+
blockEvent.Set()
1273+
_, err = queuedHandle1.GetResult()
1274+
require.NoError(t, err, "failed to complete queued workflow 1")
1275+
_, err = queuedHandle2.GetResult()
1276+
require.NoError(t, err, "failed to complete queued workflow 2")
1277+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "queue entries should be cleaned up")
1278+
})
1279+
1280+
t.Run("WithQueuesOnlyAndStatusFilter", func(t *testing.T) {
1281+
blockEvent.Clear()
1282+
startEvent.Clear()
1283+
// Create queued workflow that will complete with SUCCESS status
1284+
completedQueuedHandle, err := RunWorkflow(dbosCtx, testWorkflow, "queued-completed", WithQueue(testQueue2.Name))
1285+
require.NoError(t, err, "failed to start queued workflow for completion")
1286+
1287+
// Wait for it to complete
1288+
_, err = completedQueuedHandle.GetResult()
1289+
require.NoError(t, err, "failed to complete queued workflow")
1290+
1291+
// Create pending queued workflows that will NOT have SUCCESS status
1292+
pendingHandle1, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-pending-1", WithQueue(testQueue2.Name))
1293+
require.NoError(t, err, "failed to start pending queued workflow 1")
1294+
1295+
pendingHandle2, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-pending-2", WithQueue(testQueue2.Name))
1296+
require.NoError(t, err, "failed to start pending queued workflow 2")
1297+
1298+
startEvent.Wait()
1299+
1300+
// List queued workflows with SUCCESS status filter
1301+
successWorkflows, err := ListWorkflows(dbosCtx, WithQueuesOnly(), WithStatus([]WorkflowStatusType{WorkflowStatusSuccess}), WithQueueName(testQueue2.Name))
1302+
require.NoError(t, err, "failed to list queued workflows with SUCCESS status")
1303+
1304+
require.Equal(t, 1, len(successWorkflows), "expected 1 queued workflow with SUCCESS status")
1305+
require.True(t, successWorkflows[0].ID == completedQueuedHandle.GetWorkflowID(), "our queued workflow should be found in the results")
1306+
1307+
// Unblock the pending workflows for cleanup
1308+
blockEvent.Set()
1309+
_, err = pendingHandle1.GetResult()
1310+
require.NoError(t, err, "failed to complete pending workflow 1")
1311+
_, err = pendingHandle2.GetResult()
1312+
require.NoError(t, err, "failed to complete pending workflow 2")
1313+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "queue entries should be cleaned up")
1314+
})
1315+
}

dbos/workflow.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1906,7 +1906,7 @@ func WithQueueName(queueName string) ListWorkflowsOption {
19061906
}
19071907
}
19081908

1909-
// WithQueuesOnly filters to only return workflows that are in a queue (queue_name is not NULL).
1909+
// WithQueuesOnly filters to only return workflows that are in a queue.
19101910
//
19111911
// Example:
19121912
//
@@ -1987,6 +1987,11 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption)
19871987
opt(params)
19881988
}
19891989

1990+
// If we are asked to retrieve only queue workflows with no status, only fetch ENQUEUED and PENDING tasks
1991+
if params.queuesOnly && len(params.status) == 0 {
1992+
params.status = []WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}
1993+
}
1994+
19901995
// Convert to system database input structure
19911996
dbInput := listWorkflowsDBInput{
19921997
workflowIDs: params.workflowIDs,

0 commit comments

Comments
 (0)