Skip to content

Commit 291f010

Browse files
authored
Use dequeue args and setup within tree instead of making pre-dequeue updates the responsibility of the tree caller (#9190)
* Use dequeue args and setup within tree instead of making pre-dequeue updates the responsibility of the tree caller
1 parent dbcf9a8 commit 291f010

12 files changed

+173
-166
lines changed

pkg/scheduler/queue/multi_queuing_algorithm_tree_queue.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,17 @@ const localQueueIndex = -1
1515
type Tree interface {
1616
EnqueueFrontByPath(QueuePath, any) error
1717
EnqueueBackByPath(QueuePath, any) error
18-
Dequeue() (QueuePath, any)
18+
Dequeue(dequeueArgs *DequeueArgs) (QueuePath, any)
1919
ItemCount() int
2020
IsEmpty() bool
2121
}
2222

23+
type DequeueArgs struct {
24+
querierID QuerierID
25+
workerID int
26+
lastTenantIndex int
27+
}
28+
2329
// MultiQueuingAlgorithmTreeQueue holds metadata and a pointer to the root node of a hierarchical queue implementation.
2430
// The root Node maintains a localQueue and an arbitrary number of child nodes (which themselves
2531
// may have local queues and children). Each Node in MultiQueuingAlgorithmTreeQueue uses a QueuingAlgorithm (determined by
@@ -61,17 +67,23 @@ func (t *MultiQueuingAlgorithmTreeQueue) IsEmpty() bool {
6167
return t.rootNode.IsEmpty()
6268
}
6369

64-
// Dequeue removes and returns an item from the front of the next appropriate Node in the MultiQueuingAlgorithmTreeQueue, as
65-
// well as the path to the Node which that item was dequeued from.
70+
// Dequeue removes and returns an item from the front of the next appropriate Node in the MultiQueuingAlgorithmTreeQueue,
71+
// as well as the path to the Node which that item was dequeued from. If DequeueArgs are passed,
72+
// each QueuingAlgorithm's setup function is called with DequeueArgs to update its state. If DequeueArgs is nil,
73+
// the dequeue operation will proceed without setting up QueuingAlgorithm state.
6674
//
67-
// Either the root/self node or a child node is chosen according to the Node's QueuingAlgorithm. If
68-
// the root node is chosen, an item will be dequeued from the front of its localQueue. If a child
69-
// node is chosen, it is recursively dequeued from until a node selects its localQueue.
75+
// Either the root/self node or a child node is chosen according to the Node's QueuingAlgorithm.
76+
// If the root node is chosen, an item will be dequeued from the front of its localQueue. If a child node is chosen,
77+
// it is recursively dequeued from until a node selects its localQueue.
7078
//
71-
// Nodes that empty down to the leaf after being dequeued from (or which are found to be empty leaf
72-
// nodes during the dequeue operation) are deleted as the recursion returns up the stack. This
73-
// maintains structural guarantees relied upon to make IsEmpty() non-recursive.
74-
func (t *MultiQueuingAlgorithmTreeQueue) Dequeue() (QueuePath, any) {
79+
// Nodes that satisfy IsEmpty after a dequeue operation are deleted as the recursion returns up the stack.
80+
// This maintains structural guarantees relied upon to make IsEmpty() non-recursive.
81+
func (t *MultiQueuingAlgorithmTreeQueue) Dequeue(dequeueArgs *DequeueArgs) (QueuePath, any) {
82+
if dequeueArgs != nil {
83+
for _, qa := range t.algosByDepth {
84+
qa.setup(dequeueArgs)
85+
}
86+
}
7587
path, v := t.rootNode.dequeue()
7688
// The returned node dequeue path includes the root node; exclude
7789
// this so that the return path can be used if needed to enqueue.

pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
483483
testCaseReports[testCaseName] = report
484484

485485
// ensure everything was dequeued
486-
path, val := tree.tree.Dequeue()
486+
path, val := tree.tree.Dequeue(&DequeueArgs{querierID: tqa.currentQuerier})
487487
assert.Nil(t, val)
488488
assert.Equal(t, path, QueuePath{})
489489
})

pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_test.go

Lines changed: 44 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,10 @@ func Test_EnqueueFrontByPath(t *testing.T) {
159159
require.NoError(t, err)
160160

161161
for _, expectedVal := range tt.expected {
162-
_, v := tree.Dequeue()
162+
_, v := tree.Dequeue(&DequeueArgs{querierID: someQuerier})
163163
require.Equal(t, expectedVal, v)
164164
}
165-
_, v := tree.Dequeue()
165+
_, v := tree.Dequeue(&DequeueArgs{querierID: someQuerier})
166166
require.Nil(t, v)
167167
})
168168
}
@@ -173,31 +173,39 @@ func Test_Dequeue_RootNode(t *testing.T) {
173173
name string
174174
rootAlgo QueuingAlgorithm
175175
enqueueToRoot []any
176+
dequeueArgs *DequeueArgs
177+
expected []any
176178
}{
177179
{
178180
name: "dequeue from empty round-robin root node",
179181
rootAlgo: &roundRobinState{},
180182
},
181183
{
182-
name: "dequeue from empty tenant-querier root node",
183-
rootAlgo: newTenantQuerierAssignments(0),
184+
name: "dequeue from empty tenant-querier root node",
185+
rootAlgo: newTenantQuerierAssignments(0),
186+
dequeueArgs: &DequeueArgs{querierID: "placeholder", lastTenantIndex: localQueueIndex},
184187
},
185188
{
186189
name: "dequeue from non-empty round-robin root node",
187190
rootAlgo: &roundRobinState{},
188191
enqueueToRoot: []any{"something-in-root"},
192+
expected: []any{"something-in-root"},
189193
},
190194
{
191195
name: "dequeue from non-empty tenant-querier root node",
192196
rootAlgo: newTenantQuerierAssignments(0),
193197
enqueueToRoot: []any{"something-else-in-root"},
198+
expected: []any{"something-else-in-root"},
199+
dequeueArgs: &DequeueArgs{querierID: "placeholder", lastTenantIndex: localQueueIndex},
200+
},
201+
{
202+
name: "dequeue from non-empty tenant-querier root node with no current querier",
203+
rootAlgo: newTenantQuerierAssignments(0),
204+
enqueueToRoot: []any{"something-in-root"},
194205
},
195206
}
196207
for _, tt := range tests {
197208
t.Run(tt.name, func(t *testing.T) {
198-
if tqa, ok := tt.rootAlgo.(*tenantQuerierAssignments); ok {
199-
tqa.updateQueuingAlgorithmState("placeholder", localQueueIndex)
200-
}
201209
tree, err := NewTree(tt.rootAlgo)
202210
require.NoError(t, err)
203211

@@ -208,13 +216,18 @@ func Test_Dequeue_RootNode(t *testing.T) {
208216
}
209217

210218
for _, elt := range tt.enqueueToRoot {
211-
dequeuePath, v := tree.Dequeue()
212-
require.Equal(t, path, dequeuePath)
213-
require.Equal(t, elt, v)
219+
dequeuePath, v := tree.Dequeue(tt.dequeueArgs)
220+
if tt.expected != nil {
221+
require.Equal(t, path, dequeuePath)
222+
require.Equal(t, elt, v)
223+
} else {
224+
require.Equal(t, path, dequeuePath)
225+
require.Nil(t, v)
226+
}
214227

215228
}
216229

217-
dequeuePath, v := tree.Dequeue()
230+
dequeuePath, v := tree.Dequeue(tt.dequeueArgs)
218231
require.Equal(t, path, dequeuePath)
219232
require.Nil(t, v)
220233

@@ -276,7 +289,7 @@ func Test_RoundRobinDequeue(t *testing.T) {
276289
}
277290

278291
for _, expected := range tt.expected {
279-
_, val := tree.Dequeue()
292+
_, val := tree.Dequeue(nil)
280293
v, ok := val.(string)
281294
require.True(t, ok)
282295
require.Equal(t, expected, v)
@@ -352,7 +365,7 @@ func Test_DequeueOrderAfterEnqueue(t *testing.T) {
352365
require.NoError(t, err)
353366
}
354367
if operation.kind == dequeue {
355-
path, obj := tree.Dequeue()
368+
path, obj := tree.Dequeue(&DequeueArgs{querierID: placeholderQuerier})
356369
require.Equal(t, operation.path, path)
357370
require.Equal(t, operation.obj, obj)
358371
}
@@ -533,13 +546,6 @@ func Test_TenantQuerierAssignmentsDequeue(t *testing.T) {
533546

534547
for _, tt := range tests {
535548
t.Run(tt.name, func(t *testing.T) {
536-
tqas := make([]*tenantQuerierAssignments, 0)
537-
for _, da := range tt.treeAlgosByDepth {
538-
if tqa, ok := da.(*tenantQuerierAssignments); ok {
539-
tqas = append(tqas, tqa)
540-
}
541-
}
542-
543549
tree, err := NewTree(tt.treeAlgosByDepth...)
544550
require.NoError(t, err)
545551

@@ -550,10 +556,7 @@ func Test_TenantQuerierAssignmentsDequeue(t *testing.T) {
550556
// currQuerier at position i is used to dequeue the expected result at position i
551557
require.Equal(t, len(tt.currQuerier), len(tt.expected))
552558
for i := 0; i < len(tt.expected); i++ {
553-
for _, tqa := range tqas {
554-
tqa.updateQueuingAlgorithmState(tt.currQuerier[i], i-1)
555-
}
556-
_, v := tree.Dequeue()
559+
_, v := tree.Dequeue(&DequeueArgs{querierID: tt.currQuerier[i], lastTenantIndex: i - 1})
557560
require.Equal(t, tt.expected[i], v)
558561
}
559562
})
@@ -593,32 +596,29 @@ func Test_ChangeTenantQuerierAssignments(t *testing.T) {
593596
querier2 := QuerierID("querier-2")
594597
querier3 := QuerierID("querier-3")
595598

596-
// set state to querier-2 should dequeue query-2
597-
tqa.updateQueuingAlgorithmState(querier2, -1)
598-
_, v := tree.Dequeue()
599+
// dequeue for querier-2 should dequeue query-2
600+
_, v := tree.Dequeue(&DequeueArgs{querierID: querier2, lastTenantIndex: -1})
599601
require.Equal(t, "query-2", v)
600602

601-
// update tqa to querier-1 should dequeue query-1
602-
tqa.updateQueuingAlgorithmState(querier1, -1)
603-
_, v = tree.Dequeue()
603+
// dequeue for querier-1 should dequeue query-1
604+
_, v = tree.Dequeue(&DequeueArgs{querierID: querier1, lastTenantIndex: -1})
604605
require.Equal(t, "query-1", v)
605606

606-
// update tqa map to add querier-3 as assigned to tenant-2, then set tqa to querier-3 should dequeue query-3
607+
// update tqa map to add querier-3 as assigned to tenant-2, then dequeue for querier-3 should dequeue query-3
607608
tqa.tenantQuerierIDs["tenant-2"]["querier-3"] = struct{}{}
608-
tqa.updateQueuingAlgorithmState(querier3, -1)
609-
_, v = tree.Dequeue()
609+
_, v = tree.Dequeue(&DequeueArgs{querierID: querier3, lastTenantIndex: -1})
610610
require.Equal(t, "query-3", v)
611611

612612
// during reshuffle, we only ever reassign tenant values, we don't assign an entirely new map value
613613
// to tenantQuerierIDs. Reassign tenant-2 to an empty map value, and query-5 (tenant-3), which can be handled
614614
// by any querier, should be dequeued,
615615
tqa.tenantQuerierIDs["tenant-2"] = map[QuerierID]struct{}{}
616-
_, v = tree.Dequeue()
616+
_, v = tree.Dequeue(&DequeueArgs{querierID: querier3})
617617
require.Equal(t, "query-5", v)
618618

619619
// then we should not be able to dequeue query-4
620620
tqa.tenantQuerierIDs["tenant-2"] = map[QuerierID]struct{}{}
621-
_, v = tree.Dequeue()
621+
_, v = tree.Dequeue(&DequeueArgs{querierID: querier3})
622622
require.Nil(t, v)
623623

624624
}
@@ -680,7 +680,7 @@ func Test_DequeueBalancedRoundRobinTree(t *testing.T) {
680680
dequeuedPathCache := make([]QueuePath, rotationsBeforeRepeat)
681681

682682
for !tree.IsEmpty() {
683-
dequeuedPath, _ := tree.Dequeue()
683+
dequeuedPath, _ := tree.Dequeue(nil)
684684

685685
// require dequeued path has not repeated before the expected number of rotations
686686
require.NotContains(t, dequeuedPathCache, dequeuedPath)
@@ -716,7 +716,7 @@ func Test_DequeueUnbalancedRoundRobinTree(t *testing.T) {
716716
}
717717

718718
for _, expected := range expectedVals {
719-
_, v := tree.Dequeue()
719+
_, v := tree.Dequeue(nil)
720720
require.Equal(t, expected, v)
721721
}
722722

@@ -759,17 +759,17 @@ func Test_EnqueueDuringDequeueRespectsRoundRobin(t *testing.T) {
759759
require.Equal(t, []string{"0", "1", "2"}, root.queueOrder)
760760

761761
// dequeue first item
762-
dequeuedPath, _ := tree.Dequeue()
762+
dequeuedPath, _ := tree.Dequeue(nil)
763763
require.Equal(t, QueuePath{"0"}, dequeuedPath)
764764

765765
// dequeue second item; root:1 is now exhausted and deleted
766-
dequeuedPath, _ = tree.Dequeue()
766+
dequeuedPath, _ = tree.Dequeue(nil)
767767
require.Equal(t, QueuePath{"1"}, dequeuedPath)
768768
require.Nil(t, root.getNode(QueuePath{"1"}))
769769
require.Equal(t, []string{"0", "2"}, root.queueOrder)
770770

771771
// dequeue third item
772-
dequeuedPath, _ = tree.Dequeue()
772+
dequeuedPath, _ = tree.Dequeue(nil)
773773
require.Equal(t, QueuePath{"2"}, dequeuedPath)
774774

775775
// root:1 was previously exhausted; root:0, then root:2 will be next in the rotation
@@ -782,16 +782,16 @@ func Test_EnqueueDuringDequeueRespectsRoundRobin(t *testing.T) {
782782

783783
// dequeue fourth item; the newly-enqueued root:1 item
784784
// has not jumped the line in front of root:0
785-
dequeuedPath, _ = tree.Dequeue()
785+
dequeuedPath, _ = tree.Dequeue(nil)
786786
require.Equal(t, QueuePath{"0"}, dequeuedPath)
787787

788788
// dequeue fifth item; the newly-enqueued root:1 item
789789
// has not jumped the line in front of root:2
790-
dequeuedPath, _ = tree.Dequeue()
790+
dequeuedPath, _ = tree.Dequeue(nil)
791791
require.Equal(t, QueuePath{"2"}, dequeuedPath)
792792

793793
// dequeue sixth item; verifying the order 0->2->1 is being followed
794-
dequeuedPath, _ = tree.Dequeue()
794+
dequeuedPath, _ = tree.Dequeue(nil)
795795
require.Equal(t, QueuePath{"1"}, dequeuedPath)
796796

797797
// all items have been dequeued
@@ -818,7 +818,7 @@ func Test_NodeCannotDeleteItself(t *testing.T) {
818818
require.NoError(t, err)
819819
require.NotNil(t, tree)
820820

821-
_, _ = tree.Dequeue()
821+
_, _ = tree.Dequeue(nil)
822822

823823
require.NotNil(t, tree.rootNode)
824824
require.Zero(t, tree.rootNode.getLocalQueue().Len())
@@ -885,18 +885,6 @@ func makeUnbalancedRoundRobinTree(t *testing.T) *MultiQueuingAlgorithmTreeQueue
885885
require.Equal(t, 3, tree.rootNode.nodeCount())
886886
require.Equal(t, 1, tree.rootNode.ItemCount())
887887

888-
//// enqueue two items to root:1
889-
//childPath = QueuePath{"1"}
890-
//item = makeItemForChildQueue(tree.rootNode, childPath, cache)
891-
//require.NoError(t, tree.EnqueueBackByPath(childPath, item))
892-
//require.Equal(t, 3, tree.rootNode.nodeCount())
893-
//require.Equal(t, 2, tree.rootNode.ItemCount())
894-
//
895-
//item = makeItemForChildQueue(tree.rootNode, childPath, cache)
896-
//require.NoError(t, tree.EnqueueBackByPath(childPath, item))
897-
//require.Equal(t, 3, tree.rootNode.nodeCount())
898-
//require.Equal(t, 3, tree.rootNode.ItemCount())
899-
900888
// enqueue two items to root:1:a
901889
childPath = QueuePath{"1", "a"}
902890
item = makeItemForChildQueue(tree.rootNode, childPath, cache)

pkg/scheduler/queue/queue.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -419,16 +419,8 @@ func (q *RequestQueue) enqueueRequestInternal(r requestToEnqueue) error {
419419
// a) a query request which was successfully dequeued for the querier, or
420420
// b) an ErrQuerierShuttingDown indicating the querier has been placed in a graceful shutdown state.
421421
func (q *RequestQueue) trySendNextRequestForQuerier(dequeueReq *QuerierWorkerDequeueRequest) (done bool) {
422-
// TODO: This is a temporary solution to set the current querier-worker for the QuerierWorkerQueuePriorityAlgo.
423-
if itq, ok := q.queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok {
424-
for _, algoState := range itq.algosByDepth {
425-
if qwpAlgo, ok := algoState.(*QuerierWorkerQueuePriorityAlgo); ok {
426-
qwpAlgo.SetCurrentQuerierWorker(dequeueReq.WorkerID)
427-
}
428-
}
429-
}
430422

431-
req, tenant, idx, err := q.queueBroker.dequeueRequestForQuerier(dequeueReq.lastTenantIndex.last, dequeueReq.QuerierID)
423+
req, tenant, idx, err := q.queueBroker.dequeueRequestForQuerier(dequeueReq)
432424
if err != nil {
433425
// If this querier has told us it's shutting down, terminate AwaitRequestForQuerier with an error now...
434426
dequeueReq.sendError(err)
@@ -676,7 +668,7 @@ func NewQuerierWorkerDequeueRequest(querierWorkerConn *QuerierWorkerConn, lastTe
676668
}
677669

678670
// querierWorkerDequeueResponse is the response for a QuerierWorkerDequeueRequest,
679-
// to be written to the dequeue requests 's receiver channel.
671+
// to be written to the dequeue request's receiver channel.
680672
// Errors are embedded in this response rather than written to a separate error channel
681673
// so that lastTenantIndex can still be returned back to the querier connection.
682674
type querierWorkerDequeueResponse struct {

pkg/scheduler/queue/tenant_querier_assignment.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,11 @@ func (tqa *tenantQuerierAssignments) shuffleTenantQueriers(tenantID TenantID, sc
434434
return true
435435
}
436436

437+
func (tqa *tenantQuerierAssignments) setup(dequeueArgs *DequeueArgs) {
438+
tqa.currentQuerier = dequeueArgs.querierID
439+
tqa.tenantOrderIndex = dequeueArgs.lastTenantIndex
440+
}
441+
437442
// dequeueSelectNode chooses the next node to dequeue from based on tenantIDOrder and tenantOrderIndex, which are
438443
// shared across all nodes to maintain an O(n) (where n = # tenants) time-to-dequeue for each tenant.
439444
// If tenant order were maintained by individual nodes, we would end up with O(mn) (where m = # query components)
@@ -583,11 +588,3 @@ func (tqa *tenantQuerierAssignments) addChildNode(parent, child *Node) {
583588
// if we get here, we didn't find any empty elements in tenantIDOrder; append
584589
tqa.tenantIDOrder = append(tqa.tenantIDOrder, TenantID(childName))
585590
}
586-
587-
// updateQueuingAlgorithmState should be called before attempting to dequeue, and updates inputs required by this
588-
// QueuingAlgorithm to dequeue the appropriate value for the given querier. In some test cases, it need not be called
589-
// before consecutive dequeues for the same querier, but in all operating cases, it should be called ahead of a dequeue.
590-
func (tqa *tenantQuerierAssignments) updateQueuingAlgorithmState(querierID QuerierID, tenantOrderIndex int) {
591-
tqa.currentQuerier = querierID
592-
tqa.tenantOrderIndex = tenantOrderIndex
593-
}

0 commit comments

Comments
 (0)