Skip to content

Commit 9d87c70

Browse files
committed
merge conflict
1 parent b8255d6 commit 9d87c70

File tree

1 file changed

+0
-83
lines changed

1 file changed

+0
-83
lines changed

dbos/queues_test.go

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,86 +1072,3 @@ func TestPriorityQueue(t *testing.T) {
10721072

10731073
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
10741074
}
1075-
1076-
func TestPriorityQueue(t *testing.T) {
1077-
dbosCtx := setupDBOS(t, true, true)
1078-
1079-
// Create priority-enabled queue with max concurrency of 1
1080-
priorityQueue := NewWorkflowQueue(dbosCtx, "test_queue_priority", WithGlobalConcurrency(1), WithPriorityEnabled(true))
1081-
childQueue := NewWorkflowQueue(dbosCtx, "test_queue_child")
1082-
1083-
workflowEvent := NewEvent()
1084-
var wfPriorityList []int
1085-
var mu sync.Mutex
1086-
1087-
childWorkflow := func(ctx DBOSContext, p int) (int, error) {
1088-
workflowEvent.Wait()
1089-
return p, nil
1090-
}
1091-
RegisterWorkflow(dbosCtx, childWorkflow)
1092-
1093-
testWorkflow := func(ctx DBOSContext, priority int) (int, error) {
1094-
mu.Lock()
1095-
wfPriorityList = append(wfPriorityList, priority)
1096-
mu.Unlock()
1097-
1098-
childHandle, err := RunAsWorkflow(ctx, childWorkflow, priority, WithQueue(childQueue.Name))
1099-
if err != nil {
1100-
return 0, fmt.Errorf("failed to enqueue child workflow: %v", err)
1101-
}
1102-
workflowEvent.Wait()
1103-
result, err := childHandle.GetResult()
1104-
if err != nil {
1105-
return 0, fmt.Errorf("failed to get child result: %v", err)
1106-
}
1107-
return result + priority, nil
1108-
}
1109-
RegisterWorkflow(dbosCtx, testWorkflow)
1110-
1111-
err := dbosCtx.Launch()
1112-
require.NoError(t, err)
1113-
1114-
var wfHandles []WorkflowHandle[int]
1115-
1116-
// First, enqueue a workflow without priority (default to priority 0)
1117-
handle, err := RunAsWorkflow(dbosCtx, testWorkflow, 0, WithQueue(priorityQueue.Name))
1118-
require.NoError(t, err)
1119-
wfHandles = append(wfHandles, handle)
1120-
1121-
// Then, enqueue workflows with priority 5 to 1
1122-
reversedPriorityHandles := make([]WorkflowHandle[int], 0, 5)
1123-
for i := 5; i > 0; i-- {
1124-
handle, err := RunAsWorkflow(dbosCtx, testWorkflow, i, WithQueue(priorityQueue.Name), WithPriority(uint(i)))
1125-
require.NoError(t, err)
1126-
reversedPriorityHandles = append(reversedPriorityHandles, handle)
1127-
}
1128-
for i := 0; i < len(reversedPriorityHandles); i++ {
1129-
wfHandles = append(wfHandles, reversedPriorityHandles[len(reversedPriorityHandles)-i-1])
1130-
}
1131-
1132-
// Finally, enqueue two workflows without priority again (default priority 0)
1133-
handle6, err := RunAsWorkflow(dbosCtx, testWorkflow, 6, WithQueue(priorityQueue.Name))
1134-
require.NoError(t, err)
1135-
wfHandles = append(wfHandles, handle6)
1136-
1137-
handle7, err := RunAsWorkflow(dbosCtx, testWorkflow, 7, WithQueue(priorityQueue.Name))
1138-
require.NoError(t, err)
1139-
wfHandles = append(wfHandles, handle7)
1140-
1141-
// The finish sequence should be 0, 6, 7, 1, 2, 3, 4, 5
1142-
// (lower priority numbers execute first, same priority follows FIFO)
1143-
workflowEvent.Set()
1144-
1145-
for i, handle := range wfHandles {
1146-
result, err := handle.GetResult()
1147-
require.NoError(t, err, "failed to get result from workflow %d", i)
1148-
assert.Equal(t, i*2, result, "expected result %d for workflow %d", i*2, i)
1149-
}
1150-
1151-
mu.Lock()
1152-
expectedOrder := []int{0, 6, 7, 1, 2, 3, 4, 5}
1153-
assert.Equal(t, expectedOrder, wfPriorityList, "expected workflow execution order %v, got %v", expectedOrder, wfPriorityList)
1154-
mu.Unlock()
1155-
1156-
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test")
1157-
}

0 commit comments

Comments
 (0)