Skip to content

Commit 3341f03

Browse files
committed
neutrino + query: Priority index implementation
- The workmanager is made to check if a request has a priority index before assigning a query index. - Job index type is also changed to float64 in this commit for flexibility. Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent ff7ea80 commit 3341f03

File tree

8 files changed

+175
-35
lines changed

8 files changed

+175
-35
lines changed

blockmanager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ type checkpointedCFHeadersQuery struct {
820820
type encodedQuery struct {
821821
message wire.Message
822822
encoding wire.MessageEncoding
823-
priorityIndex uint64
823+
priorityIndex float64
824824
}
825825

826826
// Message returns the wire.Message of encodedQuery's struct.
@@ -830,7 +830,7 @@ func (e *encodedQuery) Message() wire.Message {
830830

831831
// PriorityIndex returns the specified priority the caller wants
832832
// the request to take.
833-
func (e *encodedQuery) PriorityIndex() uint64 {
833+
func (e *encodedQuery) PriorityIndex() float64 {
834834
return e.priorityIndex
835835
}
836836

query/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ type Request struct {
150150

151151
type ReqMessage interface {
152152
Message() wire.Message
153-
PriorityIndex() uint64
153+
PriorityIndex() float64
154154
}
155155

156156
// WorkManager defines an API for a manager that dispatches queries to bitcoin

query/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var (
2323
// addition to some information about the query.
2424
type queryJob struct {
2525
tries uint8
26-
index uint64
26+
index float64
2727
timeout time.Duration
2828
cancelChan <-chan struct{}
2929
*Request
@@ -36,7 +36,7 @@ var _ Task = (*queryJob)(nil)
3636
// Index returns the queryJob's index within the work queue.
3737
//
3838
// NOTE: Part of the Task interface.
39-
func (q *queryJob) Index() uint64 {
39+
func (q *queryJob) Index() float64 {
4040
return q.index
4141
}
4242

query/worker_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ import (
1212
type mockQueryEncoded struct {
1313
message *wire.MsgGetData
1414
encoding wire.MessageEncoding
15-
index uint64
15+
index float64
1616
}
1717

1818
func (m *mockQueryEncoded) Message() wire.Message {
1919
return m.message
2020
}
2121

22-
func (m *mockQueryEncoded) PriorityIndex() uint64 {
22+
func (m *mockQueryEncoded) PriorityIndex() float64 {
2323
return m.index
2424
}
2525

query/workmanager.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ func (w *peerWorkManager) workDispatcher() {
199199
// We set up a counter that we'll increase with each incoming query,
200200
// and will serve as the priority of each. In addition we map each
201201
// query to the batch they are part of.
202-
queryIndex := uint64(0)
203-
currentQueries := make(map[uint64]uint64)
202+
queryIndex := float64(0)
203+
currentQueries := make(map[float64]uint64)
204204

205205
workers := make(map[string]*activeWorker)
206206

@@ -435,14 +435,25 @@ Loop:
435435
"work queue", batchIndex, len(batch.requests))
436436

437437
for _, q := range batch.requests {
438+
idx := queryIndex
439+
440+
// If priority index is set, use that index.
441+
if q.Req.PriorityIndex() != 0 {
442+
idx = q.Req.PriorityIndex()
443+
}
438444
heap.Push(work, &queryJob{
439-
index: queryIndex,
445+
index: idx,
440446
timeout: minQueryTimeout,
441447
cancelChan: batch.options.cancelChan,
442448
Request: q,
443449
})
444-
currentQueries[queryIndex] = batchIndex
445-
queryIndex++
450+
currentQueries[idx] = batchIndex
451+
452+
// Only increment queryIndex if it was
453+
// assigned to this job.
454+
if q.Req.PriorityIndex() == 0 {
455+
queryIndex++
456+
}
446457
}
447458

448459
currentBatches[batchIndex] = &batchProgress{

query/workmanager_test.go

Lines changed: 142 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ func TestWorkManagerWorkDispatcherSingleWorker(t *testing.T) {
131131
// Schedule a batch of queries.
132132
var queries []*Request
133133
for i := 0; i < numQueries; i++ {
134-
q := &Request{}
134+
q := &Request{
135+
Req: &mockQueryEncoded{},
136+
}
137+
135138
queries = append(queries, q)
136139
}
137140

@@ -141,7 +144,7 @@ func TestWorkManagerWorkDispatcherSingleWorker(t *testing.T) {
141144

142145
// Each query should be sent on the nextJob queue, in the order they
143146
// had in their batch.
144-
for i := uint64(0); i < numQueries; i++ {
147+
for i := float64(0); i < numQueries; i++ {
145148
var job *queryJob
146149
select {
147150
case job = <-wk.nextJob:
@@ -199,7 +202,9 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
199202
var scheduledJobs [numQueries]chan sched
200203
var queries [numQueries]*Request
201204
for i := 0; i < numQueries; i++ {
202-
q := &Request{}
205+
q := &Request{
206+
Req: &mockQueryEncoded{},
207+
}
203208
queries[i] = q
204209
scheduledJobs[i] = make(chan sched)
205210
}
@@ -212,7 +217,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
212217
go func() {
213218
for {
214219
job := <-wk.nextJob
215-
scheduledJobs[job.index] <- sched{
220+
scheduledJobs[int(job.index)] <- sched{
216221
wk: wk,
217222
job: job,
218223
}
@@ -224,11 +229,11 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
224229
errChan := wm.Query(queries[:])
225230

226231
var jobs [numQueries]sched
227-
for i := uint64(0); i < numQueries; i++ {
232+
for i := 0; i < numQueries; i++ {
228233
var s sched
229234
select {
230235
case s = <-scheduledJobs[i]:
231-
if s.job.index != i {
236+
if s.job.index != float64(i) {
232237
t.Fatalf("wrong index")
233238
}
234239

@@ -238,7 +243,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
238243
t.Fatalf("next job not received")
239244
}
240245

241-
jobs[s.job.index] = s
246+
jobs[int(s.job.index)] = s
242247
}
243248

244249
// Go backwards, and fail half of them.
@@ -262,10 +267,10 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
262267

263268
// Finally, make sure the failed jobs are being retried, in the same
264269
// order as they were originally scheduled.
265-
for i := uint64(0); i < numQueries; i += 2 {
270+
for i := float64(0); i < numQueries; i += 2 {
266271
var s sched
267272
select {
268-
case s = <-scheduledJobs[i]:
273+
case s = <-scheduledJobs[int(i)]:
269274
if s.job.index != i {
270275
t.Fatalf("wrong index")
271276
}
@@ -309,7 +314,9 @@ func TestWorkManagerCancelBatch(t *testing.T) {
309314
// Schedule a batch of queries.
310315
var queries []*Request
311316
for i := 0; i < numQueries; i++ {
312-
q := &Request{}
317+
q := &Request{
318+
Req: &mockQueryEncoded{},
319+
}
313320
queries = append(queries, q)
314321
}
315322

@@ -399,7 +406,9 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
399406
// Schedule a batch of queries.
400407
var queries []*Request
401408
for i := 0; i < numQueries; i++ {
402-
q := &Request{}
409+
q := &Request{
410+
Req: &mockQueryEncoded{},
411+
}
403412
queries = append(queries, q)
404413
}
405414

@@ -411,7 +420,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
411420
for i := 0; i < numQueries; i++ {
412421
select {
413422
case job := <-workers[i].nextJob:
414-
if job.index != uint64(i) {
423+
if job.index != float64(i) {
415424
t.Fatalf("unexpected job")
416425
}
417426
jobs = append(jobs, job)
@@ -462,7 +471,9 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
462471
// Send a new set of queries.
463472
queries = nil
464473
for i := 0; i < numQueries; i++ {
465-
q := &Request{}
474+
q := &Request{
475+
Req: &mockQueryEncoded{},
476+
}
466477
queries = append(queries, q)
467478
}
468479
_ = wm.Query(queries)
@@ -476,3 +487,121 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
476487
}
477488
}
478489
}
490+
491+
// TestWorkManagerSchedulePriorityIndex tests that the workmanager acknowledges
492+
// priority index.
493+
func TestWorkManagerSchedulePriorityIndex(t *testing.T) {
494+
const numQueries = 3
495+
496+
// Start work manager with as many workers as queries. This is not very
497+
// realistic, but makes the work manager able to schedule all queries
498+
// concurrently.
499+
wm, workers := startWorkManager(t, numQueries)
500+
501+
// When the jobs gets scheduled, keep track of which worker was
502+
// assigned the job.
503+
type sched struct {
504+
wk *mockWorker
505+
job *queryJob
506+
}
507+
508+
// Schedule a batch of queries.
509+
var scheduledJobs [5]chan sched
510+
var queries [numQueries]*Request
511+
for i := 0; i < numQueries; i++ {
512+
var q *Request
513+
idx := i
514+
if i == 0 {
515+
q = &Request{
516+
Req: &mockQueryEncoded{},
517+
}
518+
} else {
519+
// Assign priority index.
520+
idx = i + 2
521+
q = &Request{
522+
Req: &mockQueryEncoded{
523+
index: float64(idx),
524+
},
525+
}
526+
}
527+
queries[i] = q
528+
scheduledJobs[idx] = make(chan sched)
529+
}
530+
531+
// Fot each worker, spin up a goroutine that will forward the job it
532+
// got to our slice of scheduled jobs, such that we can handle them in
533+
// order.
534+
for i := 0; i < len(workers); i++ {
535+
wk := workers[i]
536+
go func() {
537+
for {
538+
job := <-wk.nextJob
539+
scheduledJobs[int(job.index)] <- sched{
540+
wk: wk,
541+
job: job,
542+
}
543+
}
544+
}()
545+
}
546+
547+
// Send the batch, and Retrieve all jobs immediately.
548+
errChan := wm.Query(queries[:])
549+
550+
var jobs [numQueries]sched
551+
for i := uint64(0); i < numQueries; i++ {
552+
var expectedIndex float64
553+
554+
if i == 0 {
555+
expectedIndex = float64(0)
556+
} else {
557+
expectedIndex = float64(i + 2)
558+
}
559+
var s sched
560+
select {
561+
case s = <-scheduledJobs[int(expectedIndex)]:
562+
563+
if s.job.index != expectedIndex {
564+
t.Fatalf("wrong index: Got %v but expected %v", s.job.index,
565+
expectedIndex)
566+
}
567+
case <-errChan:
568+
t.Fatalf("did not expect an errChan")
569+
case <-time.After(time.Second):
570+
t.Fatalf("next job not received")
571+
}
572+
573+
jobs[i] = s
574+
}
575+
576+
// Go backwards send results for job.
577+
for i := numQueries - 1; i >= 0; i-- {
578+
select {
579+
case jobs[i].wk.results <- &jobResult{
580+
job: jobs[i].job,
581+
}:
582+
case <-errChan:
583+
t.Fatalf("did not expect on errChan")
584+
case <-time.After(time.Second):
585+
t.Fatalf("result not handled")
586+
}
587+
}
588+
589+
// Finally, make sure no jobs are retried.
590+
for i := uint64(0); i < numQueries; i++ {
591+
select {
592+
case <-scheduledJobs[i]:
593+
t.Fatalf("did not expect a retried job")
594+
case <-time.After(time.Second):
595+
}
596+
}
597+
598+
// The query should ultimately succeed.
599+
select {
600+
case err := <-errChan:
601+
if err != nil {
602+
t.Fatalf("got error: %v", err)
603+
}
604+
case <-time.After(time.Second):
605+
t.Fatalf("nothing received on errChan")
606+
}
607+
}

query/workqueue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package query
44
// work queue.
55
type Task interface {
66
// Index returns this Task's index in the work queue.
7-
Index() uint64
7+
Index() float64
88
}
99

1010
// workQueue is struct implementing the heap interface, and is used to keep a

0 commit comments

Comments
 (0)