Skip to content

Commit 19efa48

Browse files
committed
list workflows
1 parent 6eafe2b commit 19efa48

File tree

3 files changed

+630
-22
lines changed

3 files changed

+630
-22
lines changed

dbos/client_test.go

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,3 +632,306 @@ func TestForkWorkflow(t *testing.T) {
632632
t.Fatal("expected queue entries to be cleaned up after fork workflow tests")
633633
}
634634
}
635+
636+
func TestListWorkflows(t *testing.T) {
637+
// Setup server context
638+
serverCtx := setupDBOS(t, true, true)
639+
640+
// Create queue for communication
641+
queue := NewWorkflowQueue(serverCtx, "list-workflows-queue")
642+
643+
// Simple test workflow
644+
type testInput struct {
645+
Value int
646+
ID string
647+
}
648+
649+
simpleWorkflow := func(ctx DBOSContext, input testInput) (string, error) {
650+
if input.Value < 0 {
651+
return "", fmt.Errorf("negative value: %d", input.Value)
652+
}
653+
return fmt.Sprintf("result-%d-%s", input.Value, input.ID), nil
654+
}
655+
RegisterWorkflow(serverCtx, simpleWorkflow, WithWorkflowName("SimpleWorkflow"))
656+
657+
// Launch server
658+
err := serverCtx.Launch()
659+
if err != nil {
660+
t.Fatalf("failed to launch server DBOS instance: %v", err)
661+
}
662+
663+
// Setup client context
664+
clientCtx := setupDBOS(t, false, false)
665+
666+
t.Run("ListWorkflowsFiltering", func(t *testing.T) {
667+
var workflowIDs []string
668+
var handles []WorkflowHandle[string]
669+
670+
// Record start time for filtering tests
671+
testStartTime := time.Now()
672+
673+
// Start 10 workflows at 100ms intervals with different patterns
674+
for i := range 10 {
675+
var workflowID string
676+
var handle WorkflowHandle[string]
677+
678+
if i < 5 {
679+
// First 5 workflows: use prefix "test-batch-" and succeed
680+
workflowID = fmt.Sprintf("test-batch-%d", i)
681+
handle, err = Enqueue[testInput, string](clientCtx, GenericEnqueueOptions[testInput]{
682+
WorkflowName: "SimpleWorkflow",
683+
QueueName: queue.Name,
684+
WorkflowID: workflowID,
685+
WorkflowInput: testInput{Value: i, ID: fmt.Sprintf("success-%d", i)},
686+
ApplicationVersion: serverCtx.GetApplicationVersion(),
687+
})
688+
} else {
689+
// Last 5 workflows: use prefix "test-other-" and some will fail
690+
workflowID = fmt.Sprintf("test-other-%d", i)
691+
value := i
692+
if i >= 8 {
693+
value = -i // These will fail
694+
}
695+
handle, err = Enqueue[testInput, string](clientCtx, GenericEnqueueOptions[testInput]{
696+
WorkflowName: "SimpleWorkflow",
697+
QueueName: queue.Name,
698+
WorkflowID: workflowID,
699+
WorkflowInput: testInput{Value: value, ID: fmt.Sprintf("test-%d", i)},
700+
ApplicationVersion: serverCtx.GetApplicationVersion(),
701+
})
702+
}
703+
704+
if err != nil {
705+
t.Fatalf("failed to enqueue workflow %d: %v", i, err)
706+
}
707+
708+
workflowIDs = append(workflowIDs, workflowID)
709+
handles = append(handles, handle)
710+
711+
// Wait 100ms between workflow starts
712+
time.Sleep(100 * time.Millisecond)
713+
}
714+
715+
// Wait for all workflows to complete
716+
for i, handle := range handles {
717+
_, err := handle.GetResult()
718+
if i < 8 {
719+
// First 8 should succeed
720+
if err != nil {
721+
t.Fatalf("workflow %d should have succeeded but got error: %v", i, err)
722+
}
723+
} else {
724+
// Last 2 should fail
725+
if err == nil {
726+
t.Fatalf("workflow %d should have failed but succeeded", i)
727+
}
728+
}
729+
}
730+
731+
// Test 1: List all workflows (no filters)
732+
allWorkflows, err := ListWorkflows(clientCtx)
733+
if err != nil {
734+
t.Fatalf("failed to list all workflows: %v", err)
735+
}
736+
if len(allWorkflows) < 10 {
737+
t.Fatalf("expected at least 10 workflows, got %d", len(allWorkflows))
738+
}
739+
740+
// Test 2: Filter by workflow IDs
741+
expectedIDs := workflowIDs[:3]
742+
specificWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDs(expectedIDs))
743+
if err != nil {
744+
t.Fatalf("failed to list workflows by IDs: %v", err)
745+
}
746+
if len(specificWorkflows) != 3 {
747+
t.Fatalf("expected 3 workflows, got %d", len(specificWorkflows))
748+
}
749+
// Verify returned workflow IDs match expected
750+
returnedIDs := make(map[string]bool)
751+
for _, wf := range specificWorkflows {
752+
returnedIDs[wf.ID] = true
753+
}
754+
for _, expectedID := range expectedIDs {
755+
if !returnedIDs[expectedID] {
756+
t.Fatalf("expected workflow ID %s not found in results", expectedID)
757+
}
758+
}
759+
760+
// Test 3: Filter by workflow ID prefix
761+
batchWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-batch-"))
762+
if err != nil {
763+
t.Fatalf("failed to list workflows by prefix: %v", err)
764+
}
765+
if len(batchWorkflows) != 5 {
766+
t.Fatalf("expected 5 batch workflows, got %d", len(batchWorkflows))
767+
}
768+
// Verify all returned workflow IDs have the correct prefix
769+
for _, wf := range batchWorkflows {
770+
if !strings.HasPrefix(wf.ID, "test-batch-") {
771+
t.Fatalf("workflow ID %s does not have expected prefix 'test-batch-'", wf.ID)
772+
}
773+
}
774+
775+
// Test 4: Filter by status - SUCCESS
776+
successWorkflows, err := ListWorkflows(clientCtx,
777+
WithWorkflowIDPrefix("test-"), // Only our test workflows
778+
WithStatus([]WorkflowStatusType{WorkflowStatusSuccess}))
779+
if err != nil {
780+
t.Fatalf("failed to list successful workflows: %v", err)
781+
}
782+
if len(successWorkflows) != 8 {
783+
t.Fatalf("expected 8 successful workflows, got %d", len(successWorkflows))
784+
}
785+
// Verify all returned workflows have SUCCESS status
786+
for _, wf := range successWorkflows {
787+
if wf.Status != WorkflowStatusSuccess {
788+
t.Fatalf("workflow %s has status %s, expected SUCCESS", wf.ID, wf.Status)
789+
}
790+
}
791+
792+
// Test 5: Filter by status - ERROR
793+
errorWorkflows, err := ListWorkflows(clientCtx,
794+
WithWorkflowIDPrefix("test-"),
795+
WithStatus([]WorkflowStatusType{WorkflowStatusError}))
796+
if err != nil {
797+
t.Fatalf("failed to list error workflows: %v", err)
798+
}
799+
if len(errorWorkflows) != 2 {
800+
t.Fatalf("expected 2 error workflows, got %d", len(errorWorkflows))
801+
}
802+
// Verify all returned workflows have ERROR status
803+
for _, wf := range errorWorkflows {
804+
if wf.Status != WorkflowStatusError {
805+
t.Fatalf("workflow %s has status %s, expected ERROR", wf.ID, wf.Status)
806+
}
807+
}
808+
809+
// Test 6: Filter by time range - first 5 workflows (start to start+500ms)
810+
firstHalfTime := testStartTime.Add(500 * time.Millisecond)
811+
firstHalfWorkflows, err := ListWorkflows(clientCtx,
812+
WithWorkflowIDPrefix("test-"),
813+
WithEndTime(firstHalfTime))
814+
if err != nil {
815+
t.Fatalf("failed to list first half workflows by time range: %v", err)
816+
}
817+
if len(firstHalfWorkflows) != 5 {
818+
t.Fatalf("expected 5 workflows in first half time range, got %d", len(firstHalfWorkflows))
819+
}
820+
821+
// Test 6b: Filter by time range - last 5 workflows (start+500ms to end)
822+
secondHalfWorkflows, err := ListWorkflows(clientCtx,
823+
WithWorkflowIDPrefix("test-"),
824+
WithStartTime(firstHalfTime))
825+
if err != nil {
826+
t.Fatalf("failed to list second half workflows by time range: %v", err)
827+
}
828+
if len(secondHalfWorkflows) != 5 {
829+
t.Fatalf("expected 5 workflows in second half time range, got %d", len(secondHalfWorkflows))
830+
}
831+
832+
// Test 7: Test sorting order (ascending - default)
833+
ascWorkflows, err := ListWorkflows(clientCtx,
834+
WithWorkflowIDPrefix("test-"),
835+
WithSortDesc(false))
836+
if err != nil {
837+
t.Fatalf("failed to list workflows ascending: %v", err)
838+
}
839+
840+
// Test 8: Test sorting order (descending)
841+
descWorkflows, err := ListWorkflows(clientCtx,
842+
WithWorkflowIDPrefix("test-"),
843+
WithSortDesc(true))
844+
if err != nil {
845+
t.Fatalf("failed to list workflows descending: %v", err)
846+
}
847+
848+
// Verify sorting - workflows should be ordered by creation time
849+
// First workflow in desc should be last in asc (latest created)
850+
if ascWorkflows[len(ascWorkflows)-1].ID != descWorkflows[0].ID {
851+
t.Fatalf("sorting verification failed: asc last (%s) != desc first (%s)",
852+
ascWorkflows[len(ascWorkflows)-1].ID, descWorkflows[0].ID)
853+
}
854+
// Last workflow in desc should be first in asc (earliest created)
855+
if ascWorkflows[0].ID != descWorkflows[len(descWorkflows)-1].ID {
856+
t.Fatalf("sorting verification failed: asc first (%s) != desc last (%s)",
857+
ascWorkflows[0].ID, descWorkflows[len(descWorkflows)-1].ID)
858+
}
859+
860+
// Verify ascending order: each workflow should be created at or after the previous
861+
for i := 1; i < len(ascWorkflows); i++ {
862+
if ascWorkflows[i].CreatedAt.Before(ascWorkflows[i-1].CreatedAt) {
863+
t.Fatalf("ascending order violation: workflow at index %d created before previous", i)
864+
}
865+
}
866+
867+
// Verify descending order: each workflow should be created at or before the previous
868+
for i := 1; i < len(descWorkflows); i++ {
869+
if descWorkflows[i].CreatedAt.After(descWorkflows[i-1].CreatedAt) {
870+
t.Fatalf("descending order violation: workflow at index %d created after previous", i)
871+
}
872+
}
873+
874+
// Test 9: Test limit and offset
875+
limitedWorkflows, err := ListWorkflows(clientCtx,
876+
WithWorkflowIDPrefix("test-"),
877+
WithLimit(5))
878+
if err != nil {
879+
t.Fatalf("failed to list workflows with limit: %v", err)
880+
}
881+
if len(limitedWorkflows) != 5 {
882+
t.Fatalf("expected 5 workflows with limit, got %d", len(limitedWorkflows))
883+
}
884+
// Verify we got the first 5 workflows (earliest created)
885+
expectedFirstFive := ascWorkflows[:5]
886+
for i, wf := range limitedWorkflows {
887+
if wf.ID != expectedFirstFive[i].ID {
888+
t.Fatalf("limited workflow at index %d: expected %s, got %s", i, expectedFirstFive[i].ID, wf.ID)
889+
}
890+
}
891+
892+
offsetWorkflows, err := ListWorkflows(clientCtx,
893+
WithWorkflowIDPrefix("test-"),
894+
WithOffset(5),
895+
WithLimit(3))
896+
if err != nil {
897+
t.Fatalf("failed to list workflows with offset: %v", err)
898+
}
899+
if len(offsetWorkflows) != 3 {
900+
t.Fatalf("expected 3 workflows with offset, got %d", len(offsetWorkflows))
901+
}
902+
// Verify we got workflows 5, 6, 7 from the ascending list
903+
expectedOffsetThree := ascWorkflows[5:8]
904+
for i, wf := range offsetWorkflows {
905+
if wf.ID != expectedOffsetThree[i].ID {
906+
t.Fatalf("offset workflow at index %d: expected %s, got %s", i, expectedOffsetThree[i].ID, wf.ID)
907+
}
908+
}
909+
910+
// Test 10: Test input/output loading
911+
noDataWorkflows, err := ListWorkflows(clientCtx,
912+
WithWorkflowIDs(workflowIDs[:2]),
913+
WithLoadInput(false),
914+
WithLoadOutput(false))
915+
if err != nil {
916+
t.Fatalf("failed to list workflows without data: %v", err)
917+
}
918+
if len(noDataWorkflows) != 2 {
919+
t.Fatalf("expected 2 workflows without data, got %d", len(noDataWorkflows))
920+
}
921+
922+
// Verify input/output are not loaded
923+
for _, wf := range noDataWorkflows {
924+
if wf.Input != nil {
925+
t.Fatalf("expected input to be nil when LoadInput=false, got %v", wf.Input)
926+
}
927+
if wf.Output != nil {
928+
t.Fatalf("expected output to be nil when LoadOutput=false, got %v", wf.Output)
929+
}
930+
}
931+
})
932+
933+
// Verify all queue entries are cleaned up
934+
if !queueEntriesAreCleanedUp(serverCtx) {
935+
t.Fatal("expected queue entries to be cleaned up after list workflows tests")
936+
}
937+
}

0 commit comments

Comments
 (0)