Skip to content

Commit 063ee3a

Browse files
authored
Merge branch 'main' into bag-o-stuff
2 parents 3466a54 + 584e8b2 commit 063ee3a

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed

dbos/queues_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,3 +1072,86 @@ func TestPriorityQueue(t *testing.T) {
10721072

10731073
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
10741074
}
1075+
1076+
func TestPriorityQueue(t *testing.T) {
1077+
dbosCtx := setupDBOS(t, true, true)
1078+
1079+
// Create priority-enabled queue with max concurrency of 1
1080+
priorityQueue := NewWorkflowQueue(dbosCtx, "test_queue_priority", WithGlobalConcurrency(1), WithPriorityEnabled(true))
1081+
childQueue := NewWorkflowQueue(dbosCtx, "test_queue_child")
1082+
1083+
workflowEvent := NewEvent()
1084+
var wfPriorityList []int
1085+
var mu sync.Mutex
1086+
1087+
childWorkflow := func(ctx DBOSContext, p int) (int, error) {
1088+
workflowEvent.Wait()
1089+
return p, nil
1090+
}
1091+
RegisterWorkflow(dbosCtx, childWorkflow)
1092+
1093+
testWorkflow := func(ctx DBOSContext, priority int) (int, error) {
1094+
mu.Lock()
1095+
wfPriorityList = append(wfPriorityList, priority)
1096+
mu.Unlock()
1097+
1098+
childHandle, err := RunAsWorkflow(ctx, childWorkflow, priority, WithQueue(childQueue.Name))
1099+
if err != nil {
1100+
return 0, fmt.Errorf("failed to enqueue child workflow: %v", err)
1101+
}
1102+
workflowEvent.Wait()
1103+
result, err := childHandle.GetResult()
1104+
if err != nil {
1105+
return 0, fmt.Errorf("failed to get child result: %v", err)
1106+
}
1107+
return result + priority, nil
1108+
}
1109+
RegisterWorkflow(dbosCtx, testWorkflow)
1110+
1111+
err := dbosCtx.Launch()
1112+
require.NoError(t, err)
1113+
1114+
var wfHandles []WorkflowHandle[int]
1115+
1116+
// First, enqueue a workflow without priority (default to priority 0)
1117+
handle, err := RunAsWorkflow(dbosCtx, testWorkflow, 0, WithQueue(priorityQueue.Name))
1118+
require.NoError(t, err)
1119+
wfHandles = append(wfHandles, handle)
1120+
1121+
// Then, enqueue workflows with priority 5 to 1
1122+
reversedPriorityHandles := make([]WorkflowHandle[int], 0, 5)
1123+
for i := 5; i > 0; i-- {
1124+
handle, err := RunAsWorkflow(dbosCtx, testWorkflow, i, WithQueue(priorityQueue.Name), WithPriority(uint(i)))
1125+
require.NoError(t, err)
1126+
reversedPriorityHandles = append(reversedPriorityHandles, handle)
1127+
}
1128+
for i := 0; i < len(reversedPriorityHandles); i++ {
1129+
wfHandles = append(wfHandles, reversedPriorityHandles[len(reversedPriorityHandles)-i-1])
1130+
}
1131+
1132+
// Finally, enqueue two workflows without priority again (default priority 0)
1133+
handle6, err := RunAsWorkflow(dbosCtx, testWorkflow, 6, WithQueue(priorityQueue.Name))
1134+
require.NoError(t, err)
1135+
wfHandles = append(wfHandles, handle6)
1136+
1137+
handle7, err := RunAsWorkflow(dbosCtx, testWorkflow, 7, WithQueue(priorityQueue.Name))
1138+
require.NoError(t, err)
1139+
wfHandles = append(wfHandles, handle7)
1140+
1141+
// The finish sequence should be 0, 6, 7, 1, 2, 3, 4, 5
1142+
// (lower priority numbers execute first, same priority follows FIFO)
1143+
workflowEvent.Set()
1144+
1145+
for i, handle := range wfHandles {
1146+
result, err := handle.GetResult()
1147+
require.NoError(t, err, "failed to get result from workflow %d", i)
1148+
assert.Equal(t, i*2, result, "expected result %d for workflow %d", i*2, i)
1149+
}
1150+
1151+
mu.Lock()
1152+
expectedOrder := []int{0, 6, 7, 1, 2, 3, 4, 5}
1153+
assert.Equal(t, expectedOrder, wfPriorityList, "expected workflow execution order %v, got %v", expectedOrder, wfPriorityList)
1154+
mu.Unlock()
1155+
1156+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
1157+
}

0 commit comments

Comments
 (0)