Skip to content

Commit 08d96eb

Browse files
committed
queue filter in sql
1 parent 65baa06 commit 08d96eb

File tree

5 files changed

+272
-88
lines changed

5 files changed

+272
-88
lines changed

dbos/admin_server.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -378,27 +378,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
378378
}
379379
}
380380

381-
req.Status = "" // We are not expecting a filter here but clear just in case
382381
filters := req.toListWorkflowsOptions()
383-
filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}))
382+
if len(req.Status) == 0 {
383+
filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}))
384+
}
385+
filters = append(filters, WithQueuesOnly())
384386
workflows, err := ListWorkflows(ctx, filters...)
385387
if err != nil {
386388
ctx.logger.Error("Failed to list queued workflows", "error", err)
387389
http.Error(w, fmt.Sprintf("Failed to list queued workflows: %v", err), http.StatusInternalServerError)
388390
return
389391
}
390392

391-
// If not queue was specified, filter out non-queued workflows
392-
if req.QueueName == nil {
393-
filtered := make([]WorkflowStatus, 0, len(workflows))
394-
for _, wf := range workflows {
395-
if len(wf.QueueName) > 0 {
396-
filtered = append(filtered, wf)
397-
}
398-
}
399-
workflows = filtered
400-
}
401-
402393
// Transform to UNIX timestamps before encoding
403394
responseWorkflows := make([]map[string]any, len(workflows))
404395
for i, wf := range workflows {

dbos/admin_server_test.go

Lines changed: 243 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func TestAdminServer(t *testing.T) {
312312
// Should have exactly 3 workflows
313313
assert.Equal(t, 3, len(workflows), "Expected exactly 3 workflows")
314314

315-
// Verify each workflow's input/output marshaling
315+
// Verify each workflow's input/output marshalling
316316
for _, wf := range workflows {
317317
wfID := wf["WorkflowUUID"].(string)
318318

@@ -530,74 +530,256 @@ func TestAdminServer(t *testing.T) {
530530
assert.True(t, foundIDs4[workflowID2], "Expected to find second workflow ID in empty body results")
531531
})
532532

533-
t.Run("TestDeactivate", func(t *testing.T) {
534-
t.Run("Deactivate stops workflow scheduler", func(t *testing.T) {
535-
resetTestDatabase(t, databaseURL)
536-
ctx, err := NewDBOSContext(Config{
537-
DatabaseURL: databaseURL,
538-
AppName: "test-app",
539-
AdminServer: true,
540-
})
541-
require.NoError(t, err)
533+
t.Run("ListQueuedWorkflows", func(t *testing.T) {
534+
resetTestDatabase(t, databaseURL)
535+
ctx, err := NewDBOSContext(Config{
536+
DatabaseURL: databaseURL,
537+
AppName: "test-app",
538+
AdminServer: true,
539+
})
540+
require.NoError(t, err)
542541

543-
// Track scheduled workflow executions
544-
var executionCount atomic.Int32
542+
// Create a workflow queue with limited concurrency to keep workflows enqueued
543+
queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1))
545544

546-
// Register a scheduled workflow that runs every second
547-
RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) {
548-
executionCount.Add(1)
549-
return fmt.Sprintf("executed at %v", scheduledTime), nil
550-
}, WithSchedule("* * * * * *")) // Every second
545+
// Define a blocking workflow that will hold up the queue
546+
startEvent := NewEvent()
547+
blockingChan := make(chan struct{})
548+
blockingWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
549+
startEvent.Set()
550+
<-blockingChan // Block until channel is closed
551+
return "blocked-" + input, nil
552+
}
553+
RegisterWorkflow(ctx, blockingWorkflow)
551554

552-
err = ctx.Launch()
555+
// Define a regular non-blocking workflow
556+
regularWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
557+
return "regular-" + input, nil
558+
}
559+
RegisterWorkflow(ctx, regularWorkflow)
560+
561+
err = ctx.Launch()
562+
require.NoError(t, err)
563+
564+
// Ensure cleanup
565+
defer func() {
566+
close(blockingChan) // Unblock any blocked workflows
567+
if ctx != nil {
568+
ctx.Shutdown(1 * time.Minute)
569+
}
570+
}()
571+
572+
client := &http.Client{Timeout: 5 * time.Second}
573+
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_QUEUED_WORKFLOWS_PATTERN, "POST /"))
574+
575+
/// Create a workflow that will not block the queue
576+
h1, err := RunWorkflow(ctx, regularWorkflow, "regular", WithQueue(queue.Name))
577+
require.NoError(t, err)
578+
_, err = h1.GetResult()
579+
require.NoError(t, err)
580+
581+
// Create the first queued workflow that will start processing and block
582+
firstQueueHandle, err := RunWorkflow(ctx, blockingWorkflow, "blocking", WithQueue(queue.Name))
583+
require.NoError(t, err)
584+
585+
startEvent.Wait()
586+
587+
// Create additional queued workflows that will remain in ENQUEUED status
588+
var enqueuedHandles []WorkflowHandle[string]
589+
for i := range 3 {
590+
handle, err := RunWorkflow(ctx, blockingWorkflow, fmt.Sprintf("queued-%d", i), WithQueue(queue.Name))
553591
require.NoError(t, err)
592+
enqueuedHandles = append(enqueuedHandles, handle)
593+
}
554594

555-
client := &http.Client{Timeout: 5 * time.Second}
595+
// Create non-queued workflows that should NOT appear in queues-only results
596+
var regularHandles []WorkflowHandle[string]
597+
for i := range 2 {
598+
handle, err := RunWorkflow(ctx, regularWorkflow, fmt.Sprintf("regular-%d", i))
599+
require.NoError(t, err)
600+
regularHandles = append(regularHandles, handle)
601+
}
556602

557-
// Ensure cleanup
558-
defer func() {
559-
if ctx != nil {
560-
ctx.Shutdown(1 * time.Minute)
561-
}
562-
if client.Transport != nil {
563-
client.Transport.(*http.Transport).CloseIdleConnections()
564-
}
565-
}()
566-
567-
// Wait for 2-3 executions to verify scheduler is running
568-
require.Eventually(t, func() bool {
569-
return executionCount.Load() >= 2
570-
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")
571-
572-
// Call deactivate endpoint
573-
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
574-
req, err := http.NewRequest("GET", endpoint, nil)
575-
require.NoError(t, err, "Failed to create deactivate request")
576-
577-
resp, err := client.Do(req)
578-
require.NoError(t, err, "Failed to call deactivate endpoint")
579-
defer resp.Body.Close()
580-
581-
// Verify endpoint returned 200 OK
582-
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint")
583-
584-
// Verify response body
585-
body, err := io.ReadAll(resp.Body)
586-
require.NoError(t, err, "Failed to read response body")
587-
assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body")
588-
589-
// Record count after deactivate and wait
590-
countAfterDeactivate := executionCount.Load()
591-
time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running
592-
593-
// Verify no new executions occurred
594-
finalCount := executionCount.Load()
595-
assert.LessOrEqual(t, finalCount, countAfterDeactivate+1,
596-
"Expected no new scheduled workflows after deactivate (had %d before, %d after)",
597-
countAfterDeactivate, finalCount)
598-
})
603+
// Wait for regular workflows to complete
604+
for _, h := range regularHandles {
605+
_, err := h.GetResult()
606+
require.NoError(t, err)
607+
}
608+
609+
// Test 1: Query with empty body (should get all enqueued/pending queue workflows)
610+
reqQueuesOnly, err := http.NewRequest(http.MethodPost, endpoint, nil)
611+
require.NoError(t, err, "Failed to create queues_only request")
612+
reqQueuesOnly.Header.Set("Content-Type", "application/json")
613+
614+
respQueuesOnly, err := client.Do(reqQueuesOnly)
615+
require.NoError(t, err, "Failed to make queues_only request")
616+
defer respQueuesOnly.Body.Close()
617+
618+
assert.Equal(t, http.StatusOK, respQueuesOnly.StatusCode)
619+
620+
var queuesOnlyWorkflows []map[string]any
621+
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
622+
require.NoError(t, err, "Failed to decode queues_only workflows response")
623+
624+
// Should have exactly 3 enqueued workflows and 1 pending workflow
625+
assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows")
626+
627+
// Verify all returned workflows are from the queue and have ENQUEUED/PENDING status
628+
for _, wf := range queuesOnlyWorkflows {
629+
status, ok := wf["Status"].(string)
630+
require.True(t, ok, "Status should be a string")
631+
assert.True(t, status == "ENQUEUED" || status == "PENDING",
632+
"Expected status to be ENQUEUED or PENDING, got %s", status)
633+
634+
queueName, ok := wf["QueueName"].(string)
635+
require.True(t, ok, "QueueName should be a string")
636+
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
637+
}
638+
639+
// Verify that the enqueued workflow IDs match
640+
enqueuedIDs := make(map[string]bool)
641+
enqueuedIDs[firstQueueHandle.GetWorkflowID()] = true
642+
for _, h := range enqueuedHandles {
643+
enqueuedIDs[h.GetWorkflowID()] = true
644+
}
645+
646+
for _, wf := range queuesOnlyWorkflows {
647+
id, ok := wf["WorkflowUUID"].(string)
648+
require.True(t, ok, "WorkflowUUID should be a string")
649+
assert.True(t, enqueuedIDs[id], "Expected workflow ID %s to be in enqueued list", id)
650+
}
651+
652+
// Test 2: Query with queue_name filter (should get only workflows from specific queue)
653+
reqBodyQueueName := map[string]any{
654+
"queue_name": queue.Name,
655+
}
656+
reqQueueName, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBodyQueueName)))
657+
require.NoError(t, err, "Failed to create queue_name request")
658+
reqQueueName.Header.Set("Content-Type", "application/json")
659+
660+
respQueueName, err := client.Do(reqQueueName)
661+
require.NoError(t, err, "Failed to make queue_name request")
662+
defer respQueueName.Body.Close()
663+
664+
assert.Equal(t, http.StatusOK, respQueueName.StatusCode)
665+
666+
var queueNameWorkflows []map[string]any
667+
err = json.NewDecoder(respQueueName.Body).Decode(&queueNameWorkflows)
668+
require.NoError(t, err, "Failed to decode queue_name workflows response")
669+
670+
// Should have 4 workflows from the queue (1 blocking running, 3 enqueued)
671+
assert.Equal(t, 4, len(queueNameWorkflows), "Expected exactly 4 workflows from test-queue")
672+
673+
// All should have the queue name set
674+
for _, wf := range queueNameWorkflows {
675+
queueName, ok := wf["QueueName"].(string)
676+
require.True(t, ok, "QueueName should be a string")
677+
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
678+
id, ok := wf["WorkflowUUID"].(string)
679+
require.True(t, ok, "WorkflowUUID should be a string")
680+
assert.True(t, enqueuedIDs[id], "Expected workflow ID %s to be in enqueued list", id)
681+
}
682+
683+
// Test 3: Query with status filter for PENDING (should get only the running workflow)
684+
reqBodyPending := map[string]any{
685+
"status": "PENDING",
686+
}
687+
reqPending, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBodyPending)))
688+
require.NoError(t, err, "Failed to create pending status request")
689+
reqPending.Header.Set("Content-Type", "application/json")
690+
691+
respPending, err := client.Do(reqPending)
692+
require.NoError(t, err, "Failed to make pending status request")
693+
defer respPending.Body.Close()
694+
695+
assert.Equal(t, http.StatusOK, respPending.StatusCode)
696+
697+
var pendingWorkflows []map[string]any
698+
err = json.NewDecoder(respPending.Body).Decode(&pendingWorkflows)
699+
require.NoError(t, err, "Failed to decode pending workflows response")
700+
701+
// Should have exactly 1 PENDING workflow (the first blocking workflow that's running)
702+
assert.Equal(t, 1, len(pendingWorkflows), "Expected exactly 1 PENDING workflow")
703+
704+
// Verify it's the first workflow with PENDING status
705+
status, ok := pendingWorkflows[0]["Status"].(string)
706+
require.True(t, ok, "Status should be a string")
707+
assert.Equal(t, "PENDING", status, "Expected status to be PENDING")
708+
709+
id, ok := pendingWorkflows[0]["WorkflowUUID"].(string)
710+
require.True(t, ok, "WorkflowUUID should be a string")
711+
assert.Equal(t, firstQueueHandle.GetWorkflowID(), id, "Expected the PENDING workflow to be the first blocking workflow")
712+
713+
queueName, ok := pendingWorkflows[0]["QueueName"].(string)
714+
require.True(t, ok, "QueueName should be a string")
715+
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
599716
})
600717

718+
t.Run("TestDeactivate", func(t *testing.T) {
719+
resetTestDatabase(t, databaseURL)
720+
ctx, err := NewDBOSContext(Config{
721+
DatabaseURL: databaseURL,
722+
AppName: "test-app",
723+
AdminServer: true,
724+
})
725+
require.NoError(t, err)
726+
727+
// Track scheduled workflow executions
728+
var executionCount atomic.Int32
729+
730+
// Register a scheduled workflow that runs every second
731+
RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) {
732+
executionCount.Add(1)
733+
return fmt.Sprintf("executed at %v", scheduledTime), nil
734+
}, WithSchedule("* * * * * *")) // Every second
735+
736+
err = ctx.Launch()
737+
require.NoError(t, err)
738+
739+
client := &http.Client{Timeout: 5 * time.Second}
740+
741+
// Ensure cleanup
742+
defer func() {
743+
if ctx != nil {
744+
ctx.Shutdown(1 * time.Minute)
745+
}
746+
if client.Transport != nil {
747+
client.Transport.(*http.Transport).CloseIdleConnections()
748+
}
749+
}()
750+
751+
// Wait for 2-3 executions to verify scheduler is running
752+
require.Eventually(t, func() bool {
753+
return executionCount.Load() >= 2
754+
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")
755+
756+
// Call deactivate endpoint
757+
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
758+
req, err := http.NewRequest("GET", endpoint, nil)
759+
require.NoError(t, err, "Failed to create deactivate request")
760+
761+
resp, err := client.Do(req)
762+
require.NoError(t, err, "Failed to call deactivate endpoint")
763+
defer resp.Body.Close()
764+
765+
// Verify endpoint returned 200 OK
766+
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint")
767+
768+
// Verify response body
769+
body, err := io.ReadAll(resp.Body)
770+
require.NoError(t, err, "Failed to read response body")
771+
assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body")
772+
773+
// Record count after deactivate and wait
774+
countAfterDeactivate := executionCount.Load()
775+
time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running
776+
777+
// Verify no new executions occurred
778+
finalCount := executionCount.Load()
779+
assert.LessOrEqual(t, finalCount, countAfterDeactivate+1,
780+
"Expected no new scheduled workflows after deactivate (had %d before, %d after)",
781+
countAfterDeactivate, finalCount)
782+
})
601783
}
602784

603785
func mustMarshal(v any) []byte {

dbos/conductor.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
639639
opts = append(opts, WithLoadInput(req.Body.LoadInput))
640640
opts = append(opts, WithLoadOutput(false)) // Don't load output for queued workflows
641641
opts = append(opts, WithSortDesc(req.Body.SortDesc))
642+
opts = append(opts, WithQueuesOnly()) // Only include workflows that are in queues
642643

643644
// Add status filter for queued workflows
644645
queuedStatuses := make([]WorkflowStatusType, 0)
@@ -691,21 +692,9 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
691692
return c.sendResponse(response, string(listQueuedWorkflowsMessage))
692693
}
693694

694-
// If no queue name was specified, only include workflows that have a queue name
695-
var filteredWorkflows []WorkflowStatus
696-
if req.Body.QueueName == nil {
697-
for _, wf := range workflows {
698-
if wf.QueueName != "" {
699-
filteredWorkflows = append(filteredWorkflows, wf)
700-
}
701-
}
702-
} else {
703-
filteredWorkflows = workflows
704-
}
705-
706695
// Prepare response payload
707-
formattedWorkflows := make([]listWorkflowsConductorResponseBody, len(filteredWorkflows))
708-
for i, wf := range filteredWorkflows {
696+
formattedWorkflows := make([]listWorkflowsConductorResponseBody, len(workflows))
697+
for i, wf := range workflows {
709698
formattedWorkflows[i] = formatListWorkflowsResponseBody(wf)
710699
}
711700

0 commit comments

Comments
 (0)