Skip to content

Commit ee8f8e9

Browse files
authored
Fix data race in user list of a queue (#6160)
1 parent 6509fb2 commit ee8f8e9

File tree

2 files changed

+11
-12
lines changed

2 files changed

+11
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
5353
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
5454
* [BUGFIX] Ingester: Fix issue with the minimize token generator where it was not taking in consideration the current ownerhip of an instance when generating extra tokens. #6062
55-
* [BUGFIX] Scheduler: Fix user queue in scheduler that was not thread-safe. #6077
55+
* [BUGFIX] Scheduler: Fix user queue in scheduler that was not thread-safe. #6077 #6160
5656
* [BUGFIX] Ingester: Include out-of-order head compaction when compacting TSDB head. #6108
5757
* [BUGFIX] Ingester: Fix `cortex_ingester_tsdb_mmap_chunks_total` metric. #6134
5858
* [BUGFIX] Query Frontend: Fix query rejection bug for metadata queries. #6143

pkg/scheduler/queue/user_queues.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,12 @@ type querier struct {
3838
// This struct holds user queues for pending requests. It also keeps track of connected queriers,
3939
// and mapping between users and queriers.
4040
type queues struct {
41-
userQueues map[string]*userQueue
42-
userQueuesMx sync.RWMutex
43-
4441
// List of all users with queues, used for iteration when searching for next queue to handle.
4542
// Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink
4643
// this list when there are ""'s at the end of it.
47-
users []string
44+
users []string
45+
userQueues map[string]*userQueue
46+
queuesMx sync.RWMutex
4847

4948
// How long to wait before removing a querier which has got disconnected
5049
// but hasn't notified about a graceful shutdown.
@@ -102,8 +101,8 @@ func (q *queues) len() int {
102101
}
103102

104103
func (q *queues) deleteQueue(userID string) {
105-
q.userQueuesMx.Lock()
106-
defer q.userQueuesMx.Unlock()
104+
q.queuesMx.Lock()
105+
defer q.queuesMx.Unlock()
107106

108107
uq := q.userQueues[userID]
109108
if uq == nil {
@@ -134,8 +133,8 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue
134133
maxQueriers = 0
135134
}
136135

137-
q.userQueuesMx.Lock()
138-
defer q.userQueuesMx.Unlock()
136+
q.queuesMx.Lock()
137+
defer q.queuesMx.Unlock()
139138

140139
uq := q.userQueues[userID]
141140
priorityEnabled := q.limits.QueryPriority(userID).Enabled
@@ -222,6 +221,9 @@ func (q *queues) createUserRequestQueue(userID string) userRequestQueue {
222221
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (userRequestQueue, string, int) {
223222
uid := lastUserIndex
224223

224+
q.queuesMx.RLock()
225+
defer q.queuesMx.RUnlock()
226+
225227
for iters := 0; iters < len(q.users); iters++ {
226228
uid = uid + 1
227229

@@ -236,9 +238,6 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (us
236238
continue
237239
}
238240

239-
q.userQueuesMx.RLock()
240-
defer q.userQueuesMx.RUnlock()
241-
242241
uq := q.userQueues[u]
243242

244243
if uq.queriers != nil {

0 commit comments

Comments
 (0)