Skip to content

Commit 2748d37

Browse files
committed
query: ErrQueryTimeout does not exit worker feedback loop.
Worker waits after timeout for response but job is scheduled on another worker so as not to be slowed down by one worker. Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent 3341f03 commit 2748d37

File tree

5 files changed

+118
-3
lines changed

5 files changed

+118
-3
lines changed

query.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ type cfiltersQuery struct {
430430
headerIndex map[chainhash.Hash]int
431431
targetHash chainhash.Hash
432432
targetFilter *gcs.Filter
433+
mtx sync.Mutex
433434
}
434435

435436
// request couples a query message with the handler to be used for the response
@@ -498,6 +499,8 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
498499

499500
// If this filter is for a block not in our index, we can ignore it, as
500501
// we either already got it, or it is out of our queried range.
502+
q.mtx.Lock()
503+
defer q.mtx.Unlock()
501504
i, ok := q.headerIndex[response.BlockHash]
502505
if !ok {
503506
return noProgress

query/worker.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ nexJobLoop:
134134
err := job.SendQuery(peer, job.Req)
135135

136136
// If any error occurs while sending query send a message to the worker's feedback Chan
137-
// which would be handled by the "LOOP" below.
137+
// which would be handled by the "feedbackLoop" below.
138138
if err != nil {
139139
select {
140140
case results <- &jobResult{
@@ -268,6 +268,16 @@ nexJobLoop:
268268
return
269269
}
270270

271+
// If the error is a timeout still wait for the response as we are assured a response as long as there was a
272+
// request but reschedule on another worker to quickly fetch a response so as not to be slowed down by this
273+
// worker. We either get a response or the peer stalls (i.e. disconnects due to an elongated time without
274+
// a response)
275+
if jobErr == ErrQueryTimeout {
276+
jobErr = nil
277+
278+
goto feedbackLoop
279+
}
280+
271281
// If the peer disconnected, we can exit immediately.
272282
if jobErr == ErrPeerDisconnected {
273283
return

query/worker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,8 @@ func TestWorkerTimeout(t *testing.T) {
303303
// It will immediately attempt to fetch another task.
304304
select {
305305
case ctx.nextJob <- task:
306+
t.Fatalf("worker still in feedback loop picked up job")
306307
case <-time.After(1 * time.Second):
307-
t.Fatalf("did not pick up job")
308308
}
309309
}
310310

query/workmanager.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,9 @@ Loop:
298298
// Delete the job from the worker's active job, such
299299
// that the slot gets opened for more work.
300300
r := workers[result.peer.Addr()]
301-
r.activeJob = nil
301+
if result.err != ErrQueryTimeout {
302+
r.activeJob = nil
303+
}
302304

303305
// Get the index of this query's batch, and delete it
304306
// from the map of current queries, since we don't have

query/workmanager_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,106 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
302302
}
303303
}
304304

305+
// TestWorkManagerErrQueryTimeout tests that the workers that return query
306+
// timeout are not sent jobs until they return a different error.
307+
func TestWorkManagerErrQueryTimeout(t *testing.T) {
308+
const numQueries = 2
309+
const numWorkers = 1
310+
311+
// Start work manager.
312+
wm, workers := startWorkManager(t, numWorkers)
313+
314+
// When the jobs gets scheduled, keep track of which worker was
315+
// assigned the job.
316+
type sched struct {
317+
wk *mockWorker
318+
job *queryJob
319+
}
320+
321+
// Schedule a batch of queries.
322+
var scheduledJobs [numQueries]chan sched
323+
var queries [numQueries]*Request
324+
for i := 0; i < numQueries; i++ {
325+
q := &Request{
326+
Req: &mockQueryEncoded{},
327+
}
328+
queries[i] = q
329+
scheduledJobs[i] = make(chan sched)
330+
}
331+
332+
// Fot each worker, spin up a goroutine that will forward the job it
333+
// got to our slice of scheduled jobs, such that we can handle them in
334+
// order.
335+
for i := 0; i < len(workers); i++ {
336+
wk := workers[i]
337+
go func() {
338+
for {
339+
job := <-wk.nextJob
340+
scheduledJobs[int(job.index)] <- sched{
341+
wk: wk,
342+
job: job,
343+
}
344+
}
345+
}()
346+
}
347+
348+
// Send the batch, and Retrieve all jobs immediately.
349+
errChan := wm.Query(queries[:])
350+
351+
var iter int
352+
var s sched
353+
for i := 0; i < numQueries; i++ {
354+
select {
355+
case s = <-scheduledJobs[i]:
356+
if s.job.index != float64(i) {
357+
t.Fatalf("wrong index")
358+
}
359+
if iter == 1 {
360+
t.Fatalf("Expected only one scheduled job")
361+
}
362+
iter++
363+
case <-errChan:
364+
t.Fatalf("did not expect on errChan")
365+
case <-time.After(time.Second):
366+
if iter < 1 {
367+
t.Fatalf("next job not received")
368+
}
369+
}
370+
}
371+
372+
select {
373+
case s.wk.results <- &jobResult{
374+
job: s.job,
375+
err: ErrQueryTimeout,
376+
}:
377+
case <-errChan:
378+
t.Fatalf("did not expect on errChan")
379+
case <-time.After(time.Second):
380+
t.Fatalf("result not handled")
381+
}
382+
383+
// Finally, make sure the job is not retried as there are no available
384+
// peer to retry it.
385+
386+
select {
387+
case <-scheduledJobs[0]:
388+
t.Fatalf("did not expect job rescheduled")
389+
case <-errChan:
390+
t.Fatalf("did not expect on errChan")
391+
case <-time.After(time.Second):
392+
}
393+
394+
// There should be no errChan message as query is still incomplete.
395+
select {
396+
case err := <-errChan:
397+
if err != nil {
398+
t.Fatalf("got error: %v", err)
399+
}
400+
t.Fatalf("expected no errChan message")
401+
case <-time.After(time.Second):
402+
}
403+
}
404+
305405
// TestWorkManagerCancelBatch checks that we can cancel a batch query midway,
306406
// and that the jobs it contains are canceled.
307407
func TestWorkManagerCancelBatch(t *testing.T) {

0 commit comments

Comments
 (0)