diff --git a/dbos/admin_server.go b/dbos/admin_server.go index 3169dddc..5696daed 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -378,9 +378,11 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { } } - req.Status = "" // We are not expecting a filter here but clear just in case filters := req.toListWorkflowsOptions() - filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending})) + if len(req.Status) == 0 { + filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending})) + } + filters = append(filters, WithQueuesOnly()) workflows, err := ListWorkflows(ctx, filters...) if err != nil { ctx.logger.Error("Failed to list queued workflows", "error", err) @@ -388,17 +390,6 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { return } - // If not queue was specified, filter out non-queued workflows - if req.QueueName == nil { - filtered := make([]WorkflowStatus, 0, len(workflows)) - for _, wf := range workflows { - if len(wf.QueueName) > 0 { - filtered = append(filtered, wf) - } - } - workflows = filtered - } - // Transform to UNIX timestamps before encoding responseWorkflows := make([]map[string]any, len(workflows)) for i, wf := range workflows { diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index 1c39622a..32434681 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -312,7 +312,7 @@ func TestAdminServer(t *testing.T) { // Should have exactly 3 workflows assert.Equal(t, 3, len(workflows), "Expected exactly 3 workflows") - // Verify each workflow's input/output marshaling + // Verify each workflow's input/output marshalling for _, wf := range workflows { wfID := wf["WorkflowUUID"].(string) @@ -530,74 +530,256 @@ func TestAdminServer(t *testing.T) { assert.True(t, foundIDs4[workflowID2], "Expected to find second workflow ID in empty body results") }) - t.Run("TestDeactivate", func(t *testing.T) { - t.Run("Deactivate stops workflow scheduler", func(t *testing.T) { - resetTestDatabase(t, databaseURL) - ctx, err := NewDBOSContext(Config{ - DatabaseURL: databaseURL, - AppName: "test-app", - AdminServer: true, - }) - require.NoError(t, err) + t.Run("ListQueuedWorkflows", func(t *testing.T) { + resetTestDatabase(t, databaseURL) + ctx, err := NewDBOSContext(Config{ + DatabaseURL: databaseURL, + AppName: "test-app", + AdminServer: true, + }) + require.NoError(t, err) - // Track scheduled workflow executions - var executionCount atomic.Int32 + // Create a workflow queue with limited concurrency to keep workflows enqueued + queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1)) - // Register a scheduled workflow that runs every second - RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) { - executionCount.Add(1) - return fmt.Sprintf("executed at %v", scheduledTime), nil - }, WithSchedule("* * * * * *")) // Every second + // Define a blocking workflow that will hold up the queue + startEvent := NewEvent() + blockingChan := make(chan struct{}) + blockingWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + startEvent.Set() + <-blockingChan // Block until channel is closed + return "blocked-" + input, nil + } + RegisterWorkflow(ctx, blockingWorkflow) - err = ctx.Launch() + // Define a regular non-blocking workflow + regularWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + return "regular-" + input, nil + } + RegisterWorkflow(ctx, regularWorkflow) + + err = ctx.Launch() + require.NoError(t, err) + + // Ensure cleanup + defer func() { + close(blockingChan) // Unblock any blocked workflows + if ctx != nil { + ctx.Shutdown(1 * time.Minute) + } + }() + + client := &http.Client{Timeout: 5 * time.Second} + endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_QUEUED_WORKFLOWS_PATTERN, "POST /")) + + /// Create a workflow that will not block the queue + h1, err := RunWorkflow(ctx, regularWorkflow, "regular", WithQueue(queue.Name)) + require.NoError(t, err) + _, err = h1.GetResult() + require.NoError(t, err) + + // Create the first queued workflow that will start processing and block + firstQueueHandle, err := RunWorkflow(ctx, blockingWorkflow, "blocking", WithQueue(queue.Name)) + require.NoError(t, err) + + startEvent.Wait() + + // Create additional queued workflows that will remain in ENQUEUED status + var enqueuedHandles []WorkflowHandle[string] + for i := range 3 { + handle, err := RunWorkflow(ctx, blockingWorkflow, fmt.Sprintf("queued-%d", i), WithQueue(queue.Name)) require.NoError(t, err) + enqueuedHandles = append(enqueuedHandles, handle) + } - client := &http.Client{Timeout: 5 * time.Second} + // Create non-queued workflows that should NOT appear in queues-only results + var regularHandles []WorkflowHandle[string] + for i := range 2 { + handle, err := RunWorkflow(ctx, regularWorkflow, fmt.Sprintf("regular-%d", i)) + require.NoError(t, err) + regularHandles = append(regularHandles, handle) + } - // Ensure cleanup - defer func() { - if ctx != nil { - ctx.Shutdown(1 * time.Minute) - } - if client.Transport != nil { - client.Transport.(*http.Transport).CloseIdleConnections() - } - }() - - // Wait for 2-3 executions to verify scheduler is running - require.Eventually(t, func() bool { - return executionCount.Load() >= 2 - }, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions") - - // Call deactivate endpoint - endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /")) - req, err := http.NewRequest("GET", endpoint, nil) - require.NoError(t, err, "Failed to create deactivate request") - - resp, err := client.Do(req) - require.NoError(t, err, "Failed to call deactivate endpoint") - defer resp.Body.Close() - - // Verify endpoint returned 200 OK - assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint") - - // Verify response body - body, err := io.ReadAll(resp.Body) - require.NoError(t, err, "Failed to read response body") - assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body") - - // Record count after deactivate and wait - countAfterDeactivate := executionCount.Load() - time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running - - // Verify no new executions occurred - finalCount := executionCount.Load() - assert.LessOrEqual(t, finalCount, countAfterDeactivate+1, - "Expected no new scheduled workflows after deactivate (had %d before, %d after)", - countAfterDeactivate, finalCount) - }) + // Wait for regular workflows to complete + for _, h := range regularHandles { + _, err := h.GetResult() + require.NoError(t, err) + } + + // Test 1: Query with empty body (should get all enqueued/pending queue workflows) + reqQueuesOnly, err := http.NewRequest(http.MethodPost, endpoint, nil) + require.NoError(t, err, "Failed to create queues_only request") + reqQueuesOnly.Header.Set("Content-Type", "application/json") + + respQueuesOnly, err := client.Do(reqQueuesOnly) + require.NoError(t, err, "Failed to make queues_only request") + defer respQueuesOnly.Body.Close() + + assert.Equal(t, http.StatusOK, respQueuesOnly.StatusCode) + + var queuesOnlyWorkflows []map[string]any + err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows) + require.NoError(t, err, "Failed to decode queues_only workflows response") + + // Should have exactly 3 enqueued workflows and 1 pending workflow + assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows") + + // Verify all returned workflows are from the queue and have ENQUEUED/PENDING status + for _, wf := range queuesOnlyWorkflows { + status, ok := wf["Status"].(string) + require.True(t, ok, "Status should be a string") + assert.True(t, status == "ENQUEUED" || status == "PENDING", + "Expected status to be ENQUEUED or PENDING, got %s", status) + + queueName, ok := wf["QueueName"].(string) + require.True(t, ok, "QueueName should be a string") + assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'") + } + + // Verify that the enqueued workflow IDs match + enqueuedIDs := make(map[string]bool) + enqueuedIDs[firstQueueHandle.GetWorkflowID()] = true + for _, h := range enqueuedHandles { + enqueuedIDs[h.GetWorkflowID()] = true + } + + for _, wf := range queuesOnlyWorkflows { + id, ok := wf["WorkflowUUID"].(string) + require.True(t, ok, "WorkflowUUID should be a string") + assert.True(t, enqueuedIDs[id], "Expected workflow ID %s to be in enqueued list", id) + } + + // Test 2: Query with queue_name filter (should get only workflows from specific queue) + reqBodyQueueName := map[string]any{ + "queue_name": queue.Name, + } + reqQueueName, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBodyQueueName))) + require.NoError(t, err, "Failed to create queue_name request") + reqQueueName.Header.Set("Content-Type", "application/json") + + respQueueName, err := client.Do(reqQueueName) + require.NoError(t, err, "Failed to make queue_name request") + defer respQueueName.Body.Close() + + assert.Equal(t, http.StatusOK, respQueueName.StatusCode) + + var queueNameWorkflows []map[string]any + err = json.NewDecoder(respQueueName.Body).Decode(&queueNameWorkflows) + require.NoError(t, err, "Failed to decode queue_name workflows response") + + // Should have 4 workflows from the queue (1 blocking running, 3 enqueued) + assert.Equal(t, 4, len(queueNameWorkflows), "Expected exactly 4 workflows from test-queue") + + // All should have the queue name set + for _, wf := range queueNameWorkflows { + queueName, ok := wf["QueueName"].(string) + require.True(t, ok, "QueueName should be a string") + assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'") + id, ok := wf["WorkflowUUID"].(string) + require.True(t, ok, "WorkflowUUID should be a string") + assert.True(t, enqueuedIDs[id], "Expected workflow ID %s to be in enqueued list", id) + } + + // Test 3: Query with status filter for PENDING (should get only the running workflow) + reqBodyPending := map[string]any{ + "status": "PENDING", + } + reqPending, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBodyPending))) + require.NoError(t, err, "Failed to create pending status request") + reqPending.Header.Set("Content-Type", "application/json") + + respPending, err := client.Do(reqPending) + require.NoError(t, err, "Failed to make pending status request") + defer respPending.Body.Close() + + assert.Equal(t, http.StatusOK, respPending.StatusCode) + + var pendingWorkflows []map[string]any + err = json.NewDecoder(respPending.Body).Decode(&pendingWorkflows) + require.NoError(t, err, "Failed to decode pending workflows response") + + // Should have exactly 1 PENDING workflow (the first blocking workflow that's running) + assert.Equal(t, 1, len(pendingWorkflows), "Expected exactly 1 PENDING workflow") + + // Verify it's the first workflow with PENDING status + status, ok := pendingWorkflows[0]["Status"].(string) + require.True(t, ok, "Status should be a string") + assert.Equal(t, "PENDING", status, "Expected status to be PENDING") + + id, ok := pendingWorkflows[0]["WorkflowUUID"].(string) + require.True(t, ok, "WorkflowUUID should be a string") + assert.Equal(t, firstQueueHandle.GetWorkflowID(), id, "Expected the PENDING workflow to be the first blocking workflow") + + queueName, ok := pendingWorkflows[0]["QueueName"].(string) + require.True(t, ok, "QueueName should be a string") + assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'") }) + t.Run("TestDeactivate", func(t *testing.T) { + resetTestDatabase(t, databaseURL) + ctx, err := NewDBOSContext(Config{ + DatabaseURL: databaseURL, + AppName: "test-app", + AdminServer: true, + }) + require.NoError(t, err) + + // Track scheduled workflow executions + var executionCount atomic.Int32 + + // Register a scheduled workflow that runs every second + RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) { + executionCount.Add(1) + return fmt.Sprintf("executed at %v", scheduledTime), nil + }, WithSchedule("* * * * * *")) // Every second + + err = ctx.Launch() + require.NoError(t, err) + + client := &http.Client{Timeout: 5 * time.Second} + + // Ensure cleanup + defer func() { + if ctx != nil { + ctx.Shutdown(1 * time.Minute) + } + if client.Transport != nil { + client.Transport.(*http.Transport).CloseIdleConnections() + } + }() + + // Wait for 2-3 executions to verify scheduler is running + require.Eventually(t, func() bool { + return executionCount.Load() >= 2 + }, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions") + + // Call deactivate endpoint + endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /")) + req, err := http.NewRequest("GET", endpoint, nil) + require.NoError(t, err, "Failed to create deactivate request") + + resp, err := client.Do(req) + require.NoError(t, err, "Failed to call deactivate endpoint") + defer resp.Body.Close() + + // Verify endpoint returned 200 OK + assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint") + + // Verify response body + body, err := io.ReadAll(resp.Body) + require.NoError(t, err, "Failed to read response body") + assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body") + + // Record count after deactivate and wait + countAfterDeactivate := executionCount.Load() + time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running + + // Verify no new executions occurred + finalCount := executionCount.Load() + assert.LessOrEqual(t, finalCount, countAfterDeactivate+1, + "Expected no new scheduled workflows after deactivate (had %d before, %d after)", + countAfterDeactivate, finalCount) + }) } func mustMarshal(v any) []byte { diff --git a/dbos/conductor.go b/dbos/conductor.go index a89f77fc..7e73e2fd 100644 --- a/dbos/conductor.go +++ b/dbos/conductor.go @@ -639,6 +639,7 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri opts = append(opts, WithLoadInput(req.Body.LoadInput)) opts = append(opts, WithLoadOutput(false)) // Don't load output for queued workflows opts = append(opts, WithSortDesc(req.Body.SortDesc)) + opts = append(opts, WithQueuesOnly()) // Only include workflows that are in queues // Add status filter for queued workflows queuedStatuses := make([]WorkflowStatusType, 0) @@ -691,21 +692,9 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri return c.sendResponse(response, string(listQueuedWorkflowsMessage)) } - // If no queue name was specified, only include workflows that have a queue name - var filteredWorkflows []WorkflowStatus - if req.Body.QueueName == nil { - for _, wf := range workflows { - if wf.QueueName != "" { - filteredWorkflows = append(filteredWorkflows, wf) - } - } - } else { - filteredWorkflows = workflows - } - // Prepare response payload - formattedWorkflows := make([]listWorkflowsConductorResponseBody, len(filteredWorkflows)) - for i, wf := range filteredWorkflows { + formattedWorkflows := make([]listWorkflowsConductorResponseBody, len(workflows)) + for i, wf := range workflows { formattedWorkflows[i] = formatListWorkflowsResponseBody(wf) } diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 6552b6f9..1b84c1bf 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -17,24 +17,6 @@ import ( "github.com/stretchr/testify/require" ) -/** -This suite tests -[x] Normal wf with a step -[x] enqueued workflow starts a child workflow -[x] workflow enqueues another workflow -[x] recover queued workflow -[x] queued workflow DLQ -[x] global concurrency (one at a time with a single queue and a single worker) -[x] worker concurrency (2 at a time across two "workers") -[x] worker concurrency X recovery -[x] rate limiter -[x] conflicting workflow on different queues -[x] queue deduplication -[x] queue priority -[x] queued workflow times out -[] scheduled workflow enqueues another workflow -*/ - func queueWorkflow(ctx DBOSContext, input string) (string, error) { step1, err := RunAsStep(ctx, func(context context.Context) (string, error) { return queueStep(context, input) @@ -1227,3 +1209,107 @@ func TestPriorityQueue(t *testing.T) { require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test") } + +func TestListQueuedWorkflows(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Simple test workflow that completes immediately + testWorkflow := func(ctx DBOSContext, input string) (string, error) { + return "completed-" + input, nil + } + + // Blocking workflow for testing pending/enqueued workflows + startEvent := NewEvent() + blockEvent := NewEvent() + blockingWorkflow := func(ctx DBOSContext, input string) (string, error) { + startEvent.Set() + blockEvent.Wait() + return "blocked-" + input, nil + } + + RegisterWorkflow(dbosCtx, testWorkflow) + RegisterWorkflow(dbosCtx, blockingWorkflow) + + // Create queue for testing + testQueue1 := NewWorkflowQueue(dbosCtx, "list-test-queue", WithGlobalConcurrency(1)) + testQueue2 := NewWorkflowQueue(dbosCtx, "list-test-queue2", WithGlobalConcurrency(1)) + + err := dbosCtx.Launch() + require.NoError(t, err, "failed to launch DBOS") + + t.Run("WithQueuesOnly", func(t *testing.T) { + blockEvent.Clear() + startEvent.Clear() + // Create a non-queued workflow (completed) - this should NOT appear in WithQueuesOnly results + nonQueuedHandle, err := RunWorkflow(dbosCtx, testWorkflow, "non-queued-test1") + require.NoError(t, err, "failed to start non-queued workflow") + _, err = nonQueuedHandle.GetResult() + require.NoError(t, err, "failed to complete non-queued workflow") + + // Create queued workflows that will be pending/enqueued + queuedHandle1, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-1-test1", WithQueue(testQueue1.Name)) + require.NoError(t, err, "failed to start queued workflow 1") + + queuedHandle2, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-2-test1", WithQueue(testQueue1.Name)) + require.NoError(t, err, "failed to start queued workflow 2") + + startEvent.Wait() + + // List workflows with WithQueuesOnly - should only return queued workflows + queuedWorkflows, err := ListWorkflows(dbosCtx, WithQueuesOnly()) + require.NoError(t, err, "failed to list queued workflows") + + // Verify all returned workflows are in a queue and have pending/enqueued status + require.Equal(t, 2, len(queuedWorkflows), "expected 2 queued workflows to be returned") + for _, wf := range queuedWorkflows { + require.NotEmpty(t, wf.QueueName, "workflow %s should have a queue name", wf.ID) + require.True(t, wf.Status == WorkflowStatusPending || wf.Status == WorkflowStatusEnqueued, + "workflow %s status should be PENDING or ENQUEUED, got %s", wf.ID, wf.Status) + require.True(t, wf.ID == queuedHandle1.GetWorkflowID() || wf.ID == queuedHandle2.GetWorkflowID()) + } + + // Unblock the workflows for cleanup + blockEvent.Set() + _, err = queuedHandle1.GetResult() + require.NoError(t, err, "failed to complete queued workflow 1") + _, err = queuedHandle2.GetResult() + require.NoError(t, err, "failed to complete queued workflow 2") + require.True(t, queueEntriesAreCleanedUp(dbosCtx), "queue entries should be cleaned up") + }) + + t.Run("WithQueuesOnlyAndStatusFilter", func(t *testing.T) { + blockEvent.Clear() + startEvent.Clear() + // Create queued workflow that will complete with SUCCESS status + completedQueuedHandle, err := RunWorkflow(dbosCtx, testWorkflow, "queued-completed", WithQueue(testQueue2.Name)) + require.NoError(t, err, "failed to start queued workflow for completion") + + // Wait for it to complete + _, err = completedQueuedHandle.GetResult() + require.NoError(t, err, "failed to complete queued workflow") + + // Create pending queued workflows that will NOT have SUCCESS status + pendingHandle1, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-pending-1", WithQueue(testQueue2.Name)) + require.NoError(t, err, "failed to start pending queued workflow 1") + + pendingHandle2, err := RunWorkflow(dbosCtx, blockingWorkflow, "queued-pending-2", WithQueue(testQueue2.Name)) + require.NoError(t, err, "failed to start pending queued workflow 2") + + startEvent.Wait() + + // List queued workflows with SUCCESS status filter + successWorkflows, err := ListWorkflows(dbosCtx, WithQueuesOnly(), WithStatus([]WorkflowStatusType{WorkflowStatusSuccess}), WithQueueName(testQueue2.Name)) + require.NoError(t, err, "failed to list queued workflows with SUCCESS status") + + require.Equal(t, 1, len(successWorkflows), "expected 1 queued workflow with SUCCESS status") + require.True(t, successWorkflows[0].ID == completedQueuedHandle.GetWorkflowID(), "our queued workflow should be found in the results") + + // Unblock the pending workflows for cleanup + blockEvent.Set() + _, err = pendingHandle1.GetResult() + require.NoError(t, err, "failed to complete pending workflow 1") + _, err = pendingHandle2.GetResult() + require.NoError(t, err, "failed to complete pending workflow 2") + require.True(t, queueEntriesAreCleanedUp(dbosCtx), "queue entries should be cleaned up") + }) +} diff --git a/dbos/system_database.go b/dbos/system_database.go index 5cde8188..661bb909 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -477,6 +477,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt type listWorkflowsDBInput struct { workflowName string queueName string + queuesOnly bool workflowIDPrefix string workflowIDs []string authenticatedUser string @@ -521,6 +522,9 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( if input.queueName != "" { qb.addWhere("queue_name", input.queueName) } + if input.queuesOnly { + qb.addWhereIsNotNull("queue_name") + } if input.workflowIDPrefix != "" { qb.addWhereLike("workflow_uuid", input.workflowIDPrefix+"%") } @@ -2286,6 +2290,10 @@ func (qb *queryBuilder) addWhere(column string, value any) { qb.args = append(qb.args, value) } +func (qb *queryBuilder) addWhereIsNotNull(column string) { + qb.whereClauses = append(qb.whereClauses, fmt.Sprintf("%s IS NOT NULL", column)) +} + func (qb *queryBuilder) addWhereLike(column string, value any) { qb.argCounter++ qb.whereClauses = append(qb.whereClauses, fmt.Sprintf("%s LIKE $%d", column, qb.argCounter)) diff --git a/dbos/workflow.go b/dbos/workflow.go index 3c6d4aa7..0b5475cc 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1729,6 +1729,7 @@ type ListWorkflowsOptions struct { loadInput bool loadOutput bool queueName string + queuesOnly bool executorIDs []string } @@ -1905,6 +1906,18 @@ func WithQueueName(queueName string) ListWorkflowsOption { } } +// WithQueuesOnly filters to only return workflows that are in a queue. +// +// Example: +// +// workflows, err := dbos.ListWorkflows(ctx, +// dbos.WithQueuesOnly()) +func WithQueuesOnly() ListWorkflowsOption { + return func(p *ListWorkflowsOptions) { + p.queuesOnly = true + } +} + // WithExecutorIDs filters workflows by the specified executor IDs. // // Example: @@ -1974,6 +1987,11 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) opt(params) } + // If we are asked to retrieve only queue workflows with no status, only fetch ENQUEUED and PENDING tasks + if params.queuesOnly && len(params.status) == 0 { + params.status = []WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending} + } + // Convert to system database input structure dbInput := listWorkflowsDBInput{ workflowIDs: params.workflowIDs, @@ -1990,6 +2008,7 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) loadInput: params.loadInput, loadOutput: params.loadOutput, queueName: params.queueName, + queuesOnly: params.queuesOnly, executorIDs: params.executorIDs, }