Skip to content

Commit 197df14

Browse files
committed
query: change queued jobs to a map
Changing queued jobs to a map allows for multiple job queuing. However the queued jobs is still limited to a single job for now.
1 parent 81d6cd2 commit 197df14

File tree

1 file changed

+7
-7
lines changed

1 file changed

+7
-7
lines changed

query/workmanager.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ type PeerRanking interface {
7777
// TODO(halseth): support more than one active job at a time.
7878
type activeWorker struct {
7979
w Worker
80-
activeJob *queryJob
80+
activeJobs map[uint64]*queryJob
8181
onExit chan struct{}
8282
}
8383

@@ -220,7 +220,7 @@ Loop:
220220
for p, r := range workers {
221221
// Only one active job at a time is currently
222222
// supported.
223-
if r.activeJob != nil {
223+
if len(r.activeJobs) >= 1 {
224224
continue
225225
}
226226

@@ -242,7 +242,7 @@ Loop:
242242
log.Tracef("Sent job %v to worker %v",
243243
next.Index(), p)
244244
heap.Pop(work)
245-
r.activeJob = next
245+
r.activeJobs[next.Index()] = next
246246

247247
// Go back to start of loop, to check
248248
// if there are more jobs to
@@ -278,9 +278,9 @@ Loop:
278278
// remove it from our set of active workers.
279279
onExit := make(chan struct{})
280280
workers[peer.Addr()] = &activeWorker{
281-
w: r,
282-
activeJob: nil,
283-
onExit: onExit,
281+
w: r,
282+
activeJobs: make(map[uint64]*queryJob),
283+
onExit: onExit,
284284
}
285285

286286
w.cfg.Ranking.AddPeer(peer.Addr())
@@ -302,7 +302,7 @@ Loop:
302302
// Delete the job from the worker's active job, such
303303
// that the slot gets opened for more work.
304304
r := workers[result.peer.Addr()]
305-
r.activeJob = nil
305+
delete(r.activeJobs, result.job.Index())
306306

307307
// Get the index of this query's batch, and delete it
308308
// from the map of current queries, since we don't have

0 commit comments

Comments
 (0)