Skip to content

Commit ff7ea80

Browse files
committed
query + neutrino: Clone job before sending to workmanager
Adds CloneReq function field to query.Request struct. Jobs are cloned in the worker before sending to the workmanager. This would be useful in coming commits where a job's request is modified according to the response it gets. Such as in the case of block header fetching. A CloneReq function is defined in the instance of GetCFilter, GetCFHeader and GetData requests in this commit as well. Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent d9ec023 commit ff7ea80

File tree

5 files changed

+141
-19
lines changed

5 files changed

+141
-19
lines changed

blockmanager.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,7 @@ func (c *checkpointedCFHeadersQuery) requests() []*query.Request {
842842
Req: m,
843843
HandleResp: c.handleResponse,
844844
SendQuery: sendQueryMessageWithEncoding,
845+
CloneReq: cloneMsgCFHeaders,
845846
}
846847
}
847848
return reqs
@@ -957,12 +958,31 @@ func sendQueryMessageWithEncoding(peer query.Peer, req query.ReqMessage) error {
957958
if !ok {
958959
return errors.New("invalid request type")
959960
}
960-
961961
sp.QueueMessageWithEncoding(request.message, nil, request.encoding)
962962

963963
return nil
964964
}
965965

966+
// cloneMsgCFHeaders clones query.ReqMessage that contains the MsgGetCFHeaders message.
967+
func cloneMsgCFHeaders(req query.ReqMessage) query.ReqMessage {
968+
oldReq, ok := req.(*encodedQuery)
969+
if !ok {
970+
log.Errorf("request not of type *encodedQuery")
971+
}
972+
oldReqMessage, ok := oldReq.message.(*wire.MsgGetCFHeaders)
973+
if !ok {
974+
log.Errorf("request not of type *wire.MsgGetCFHeaders")
975+
}
976+
newReq := &encodedQuery{
977+
message: wire.NewMsgGetCFHeaders(
978+
oldReqMessage.FilterType, oldReqMessage.StartHeight, &oldReqMessage.StopHash,
979+
),
980+
encoding: oldReq.encoding,
981+
priorityIndex: oldReq.priorityIndex,
982+
}
983+
return newReq
984+
}
985+
966986
// getCheckpointedCFHeaders catches a filter header store up with the
967987
// checkpoints we got from the network. It assumes that the filter header store
968988
// matches the checkpoints up to the tip of the store.

query.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,24 @@ func (q *cfiltersQuery) request() *query.Request {
446446
Req: msg,
447447
HandleResp: q.handleResponse,
448448
SendQuery: sendQueryMessageWithEncoding,
449+
CloneReq: func(req query.ReqMessage) query.ReqMessage {
450+
oldReq, ok := req.(*encodedQuery)
451+
if !ok {
452+
log.Errorf("request not of type *encodedQuery")
453+
}
454+
oldReqMessage, ok := oldReq.message.(*wire.MsgGetCFilters)
455+
if !ok {
456+
log.Errorf("request not of type *wire.MsgGetCFilters")
457+
}
458+
newReq := &encodedQuery{
459+
message: wire.NewMsgGetCFilters(
460+
oldReqMessage.FilterType, oldReqMessage.StartHeight, &oldReqMessage.StopHash,
461+
),
462+
encoding: oldReq.encoding,
463+
priorityIndex: oldReq.priorityIndex,
464+
}
465+
return newReq
466+
},
449467
}
450468
}
451469

@@ -923,6 +941,23 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
923941
Req: msg,
924942
HandleResp: handleResp,
925943
SendQuery: sendQueryMessageWithEncoding,
944+
CloneReq: func(req query.ReqMessage) query.ReqMessage {
945+
newMsg := wire.NewMsgGetData()
946+
_ = newMsg.AddInvVect(inv)
947+
948+
oldReq, ok := req.(*encodedQuery)
949+
if !ok {
950+
log.Errorf("request not of type *encodedQuery")
951+
}
952+
953+
newReq := &encodedQuery{
954+
message: newMsg,
955+
encoding: oldReq.encoding,
956+
priorityIndex: oldReq.priorityIndex,
957+
}
958+
959+
return newReq
960+
},
926961
}
927962

928963
// Prepare the query options.

query/interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ type Request struct {
143143
// SendQuery handles sending request to the worker's peer. It returns an error,
144144
// if one is encountered while sending the request.
145145
SendQuery func(peer Peer, request ReqMessage) error
146+
147+
// CloneReq clones the message.
148+
CloneReq func(message ReqMessage) ReqMessage
146149
}
147150

148151
type ReqMessage interface {

query/worker.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,32 @@ nexJobLoop:
235235
// Stop to allow garbage collection.
236236
timeout.Stop()
237237

238+
// We might have an unfinished job which we have modified its request attributes. This step is necessary to
239+
// prevent a situation in which future modification to the job's request affects this job. For example:
240+
// in the case of fetching headers between checkpoints say checkpoint 0 and 20,000. The maximum number of headers
241+
// that a peer can send in one message is say 2000. In such a case upon receiving 2000 headers for one request,
242+
// we modify the request of the job, changing its startheight and blocklocator to reflect the next set of headers
243+
// that we would like to fetch. As even scheduling this new request for the job, we would still not be done with
244+
// fetching our desired 20,000 headers, there is bound to be future modifications of the job and this could
245+
// change previous requests, leading to an unwanted changes.
246+
resultJob := &queryJob{
247+
index: job.Index(),
248+
Request: &Request{
249+
Req: job.CloneReq(job.Req),
250+
HandleResp: job.Request.HandleResp,
251+
CloneReq: job.Request.CloneReq,
252+
SendQuery: job.Request.SendQuery,
253+
},
254+
cancelChan: job.cancelChan,
255+
tries: job.tries,
256+
timeout: job.timeout,
257+
}
258+
238259
// We have a result ready for the query, hand it off before
239260
// getting a new job.
240261
select {
241262
case results <- &jobResult{
242-
job: job,
263+
job: resultJob,
243264
peer: peer,
244265
err: jobErr,
245266
}:

query/worker_test.go

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,19 @@ func makeJob() *queryJob {
9999
m.requests <- req.Message()
100100
return nil
101101
},
102+
CloneReq: func(req ReqMessage) ReqMessage {
103+
oldReq := req.(*mockQueryEncoded)
104+
105+
newMsg := &wire.MsgGetData{
106+
InvList: oldReq.message.InvList,
107+
}
108+
109+
clone := &mockQueryEncoded{
110+
message: newMsg,
111+
}
112+
113+
return clone
114+
},
102115
}
103116

104117
return &queryJob{
@@ -209,9 +222,15 @@ func TestWorkerIgnoreMsgs(t *testing.T) {
209222
t.Fatalf("response error: %v", result.err)
210223
}
211224

212-
// Make sure the result was given for the intended job.
213-
if result.job != task {
214-
t.Fatalf("got result for unexpected job")
225+
// Make sure the QueryJob instance in the result is different from the initial one
226+
// supplied to the worker
227+
if result.job == task {
228+
t.Fatalf("result's job should be different from the task's")
229+
}
230+
231+
// Make sure we are receiving the corresponding result for the given task.
232+
if result.job.Index() != task.Index() {
233+
t.Fatalf("result's job index should not be different from task's")
215234
}
216235

217236
// And the correct peer.
@@ -264,9 +283,15 @@ func TestWorkerTimeout(t *testing.T) {
264283
t.Fatalf("expected timeout, got: %v", result.err)
265284
}
266285

267-
// Make sure the result was given for the intended job.
268-
if result.job != task {
269-
t.Fatalf("got result for unexpected job")
286+
// Make sure the QueryJob instance in the result is different from the initial one
287+
// supplied to the worker
288+
if result.job == task {
289+
t.Fatalf("result's job should be different from the task's")
290+
}
291+
292+
// Make sure we are receiving the corresponding result for the given task.
293+
if result.job.Index() != task.Index() {
294+
t.Fatalf("result's job index should not be different from task's")
270295
}
271296

272297
// And the correct peer.
@@ -323,9 +348,15 @@ func TestWorkerDisconnect(t *testing.T) {
323348
t.Fatalf("expected peer disconnect, got: %v", result.err)
324349
}
325350

326-
// Make sure the result was given for the intended job.
327-
if result.job != task {
328-
t.Fatalf("got result for unexpected job")
351+
// Make sure the QueryJob instance in the result is different from the initial one
352+
// supplied to the worker
353+
if result.job == task {
354+
t.Fatalf("result's job should be different from the task's")
355+
}
356+
357+
// Make sure we are receiving the corresponding result for the given task.
358+
if result.job.Index() != task.Index() {
359+
t.Fatalf("result's job index should not be different from task's")
329360
}
330361

331362
// And the correct peer.
@@ -411,9 +442,15 @@ func TestWorkerProgress(t *testing.T) {
411442
t.Fatalf("expected no error, got: %v", result.err)
412443
}
413444

414-
// Make sure the result was given for the intended task.
415-
if result.job != task {
416-
t.Fatalf("got result for unexpected job")
445+
// Make sure the QueryJob instance in the result is different from the initial one
446+
// supplied to the worker
447+
if result.job == task {
448+
t.Fatalf("result's job should be different from the task's")
449+
}
450+
451+
// Make sure we are receiving the corresponding result for the given task.
452+
if result.job.Index() != task.Index() {
453+
t.Fatalf("result's job index should not be different from task's")
417454
}
418455

419456
// And the correct peer.
@@ -484,9 +521,15 @@ func TestWorkerJobCanceled(t *testing.T) {
484521
t.Fatalf("expected job canceled, got: %v", result.err)
485522
}
486523

487-
// Make sure the result was given for the intended task.
488-
if result.job != task {
489-
t.Fatalf("got result for unexpected job")
524+
// Make sure the QueryJob instance in the result is different from the initial one
525+
// supplied to the worker
526+
if result.job == task {
527+
t.Fatalf("result's job should be different from the task's")
528+
}
529+
530+
// Make sure we are receiving the corresponding result for the given task.
531+
if result.job.Index() != task.Index() {
532+
t.Fatalf("result's job index should not be different from task's")
490533
}
491534

492535
// And the correct peer.
@@ -540,9 +583,9 @@ func TestWorkerSendQueryErr(t *testing.T) {
540583
ctx.peer.err, result.err)
541584
}
542585

543-
// Make sure the result was given for the intended task.
586+
// Make sure the QueryJob instance in the result is same as the initial one.
544587
if result.job != taskJob {
545-
t.Fatalf("got result for unexpected job")
588+
t.Fatalf("result's job should be same as the task's")
546589
}
547590

548591
// And the correct peer.

0 commit comments

Comments
 (0)