Skip to content

Commit 65debe9

Browse files
committed
fix: add tests and fix errors
1 parent cf48bd5 commit 65debe9

File tree

3 files changed

+175
-18
lines changed

3 files changed

+175
-18
lines changed

eth/protocols/eth/handlers.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,13 +527,16 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
527527
return err
528528
}
529529

530-
from, err := peer.ValidateReceipt(res)
530+
from, err := peer.ReconstructReceiptsPacket(res)
531531
if err != nil {
532532
return err
533533
}
534534

535535
if res.LastBlockIncomplete {
536-
peer.HandlePartialReceipts(res.RequestId)
536+
err := peer.RequestPartialReceipts(res.RequestId)
537+
if err != nil {
538+
return err
539+
}
537540
}
538541

539542
// Assign buffers shared between list elements

eth/protocols/eth/peer.go

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ type Peer struct {
6767
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
6868
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
6969

70-
requestedReceipts map[uint64][]common.Hash // requestId -> requested receipts map (can be removed if one peer cannot have more than one request in flight)
71-
receiptBuffer map[uint64]*partialReceipt // requestId -> receiptlist map
70+
requestedReceipts map[uint64][]common.Hash // requested receipts list
71+
receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map
7272

7373
term chan struct{} // Termination channel to stop the broadcasters
7474
}
@@ -345,15 +345,30 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
345345
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
346346
id := rand.Uint64()
347347

348-
req := &Request{
349-
id: id,
350-
sink: sink,
351-
code: GetReceiptsMsg,
352-
want: ReceiptsMsg,
353-
data: &GetReceiptsPacket69{
354-
RequestId: id,
355-
GetReceiptsRequest: hashes,
356-
},
348+
var req *Request
349+
if p.version > ETH69 {
350+
req = &Request{
351+
id: id,
352+
sink: sink,
353+
code: GetReceiptsMsg,
354+
want: ReceiptsMsg,
355+
data: &GetReceiptsPacket70{
356+
RequestId: id,
357+
GetReceiptsRequest: hashes,
358+
FirstBlockReceiptIndex: 0,
359+
},
360+
}
361+
} else {
362+
req = &Request{
363+
id: id,
364+
sink: sink,
365+
code: GetReceiptsMsg,
366+
want: ReceiptsMsg,
367+
data: &GetReceiptsPacket69{
368+
RequestId: id,
369+
GetReceiptsRequest: hashes,
370+
},
371+
}
357372
}
358373
if err := p.dispatchRequest(req); err != nil {
359374
return nil, err
@@ -364,7 +379,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
364379
}
365380

366381
// HandlePartialReceipts re-request partial receipts
367-
func (p *Peer) HandlePartialReceipts(previousId uint64) error {
382+
func (p *Peer) RequestPartialReceipts(previousId uint64) error {
368383
split := p.receiptBuffer[previousId].idx
369384
id := rand.Uint64()
370385

@@ -382,16 +397,22 @@ func (p *Peer) HandlePartialReceipts(previousId uint64) error {
382397
previous: previousId,
383398
}
384399

400+
p.receiptBuffer[id] = p.receiptBuffer[previousId]
401+
p.requestedReceipts[id] = p.requestedReceipts[previousId]
402+
403+
delete(p.receiptBuffer, previousId)
404+
delete(p.requestedReceipts, previousId)
405+
385406
return p.dispatchRequest(req)
386407
}
387408

388-
// ValidateReceipt validates a receipt packet and checks whether a partial request is complete.
409+
// ReconstructReceiptsPacket validates a receipt packet and checks whether a partial request is complete.
389410
// It also mutates the packet in place, trimming the partial response or appending previously collected receipts.
390-
func (p *Peer) ValidateReceipt(packet *ReceiptsPacket70) (int, error) {
411+
func (p *Peer) ReconstructReceiptsPacket(packet *ReceiptsPacket70) (int, error) {
391412
from := 0
392413
requestId := packet.RequestId
393414
if len(packet.List) == 0 {
394-
return 0, fmt.Errorf("receipt list size 0")
415+
return 0, nil
395416
}
396417

397418
// Process the first block
@@ -400,11 +421,14 @@ func (p *Peer) ValidateReceipt(packet *ReceiptsPacket70) (int, error) {
400421
if firstReceipt == nil {
401422
return 0, fmt.Errorf("nil first receipt")
402423
}
403-
if _, ok := p.receiptBuffer[requestId]; !ok {
424+
if _, ok := p.receiptBuffer[requestId]; ok {
404425
// complete packet (hash validation will be performed later)
405426
firstReceipt.items = append(p.receiptBuffer[requestId].list.items, firstReceipt.items...)
406427
from = p.receiptBuffer[requestId].idx
407428
delete(p.receiptBuffer, requestId)
429+
if !packet.LastBlockIncomplete {
430+
delete(p.requestedReceipts, requestId)
431+
}
408432
}
409433

410434
// Trim and buffer the last block when the response is incomplete.

eth/protocols/eth/peer_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ package eth
2222
import (
2323
"crypto/rand"
2424
"testing"
25+
"time"
2526

2627
"github.com/ethereum/go-ethereum/common"
2728
"github.com/ethereum/go-ethereum/p2p"
2829
"github.com/ethereum/go-ethereum/p2p/enode"
30+
"github.com/ethereum/go-ethereum/rlp"
2931
)
3032

3133
// testPeer is a simulated peer to allow testing direct network calls.
@@ -88,3 +90,131 @@ func TestPeerSet(t *testing.T) {
8890
t.Fatalf("bad size")
8991
}
9092
}
93+
94+
func TestPartialReceipt(t *testing.T) {
95+
app, net := p2p.MsgPipe()
96+
var id enode.ID
97+
if _, err := rand.Read(id[:]); err != nil {
98+
t.Fatalf("failed to create random peer: %v", err)
99+
}
100+
101+
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil)
102+
103+
packetCh := make(chan *GetReceiptsPacket70, 1)
104+
go func() {
105+
for {
106+
msg, err := app.ReadMsg()
107+
if err != nil {
108+
return
109+
}
110+
if msg.Code == GetReceiptsMsg {
111+
var pkt GetReceiptsPacket70
112+
if err := msg.Decode(&pkt); err == nil {
113+
select {
114+
case packetCh <- &pkt:
115+
default:
116+
}
117+
}
118+
}
119+
msg.Discard()
120+
}
121+
}()
122+
123+
hashes := []common.Hash{
124+
common.HexToHash("0xaa"),
125+
common.HexToHash("0xbb"),
126+
common.HexToHash("0xcc"),
127+
common.HexToHash("0xdd"),
128+
}
129+
130+
sink := make(chan *Response, 1)
131+
req, err := peer.RequestReceipts(hashes, sink)
132+
if err != nil {
133+
t.Fatalf("RequestReceipts failed: %v", err)
134+
}
135+
select {
136+
case _ = <-packetCh:
137+
case <-time.After(2 * time.Second):
138+
t.Fatalf("timeout waiting for request packet")
139+
}
140+
141+
delivery := &ReceiptsPacket70{
142+
RequestId: req.id,
143+
LastBlockIncomplete: true,
144+
List: []*ReceiptList69{
145+
{
146+
items: []Receipt{
147+
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
148+
},
149+
},
150+
{
151+
items: []Receipt{
152+
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 2))},
153+
},
154+
},
155+
},
156+
}
157+
if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil {
158+
t.Fatalf("first ReconstructReceiptsPacket failed: %v", err)
159+
}
160+
161+
if err := peer.RequestPartialReceipts(req.id); err != nil {
162+
t.Fatalf("RequestPartialReceipts failed: %v", err)
163+
}
164+
165+
var rereq *GetReceiptsPacket70
166+
select {
167+
case rereq = <-packetCh:
168+
case <-time.After(2 * time.Second):
169+
t.Fatalf("timeout waiting for re-request packet")
170+
}
171+
172+
if _, ok := peer.receiptBuffer[req.id]; ok {
173+
t.Fatalf("receiptBuffer has stale request id")
174+
}
175+
if _, ok := peer.requestedReceipts[req.id]; ok {
176+
t.Fatalf("requestedReceipts has stale request id")
177+
}
178+
179+
buffer, ok := peer.receiptBuffer[rereq.RequestId]
180+
if !ok {
181+
t.Fatalf("receiptBuffer should buffer incomplete receipts")
182+
}
183+
if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list.items)) {
184+
t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list.items))
185+
}
186+
if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok {
187+
t.Fatalf("requestedReceipts should buffer receipt hashes")
188+
}
189+
190+
delivery = &ReceiptsPacket70{
191+
RequestId: rereq.RequestId,
192+
LastBlockIncomplete: false,
193+
List: []*ReceiptList69{
194+
{
195+
items: []Receipt{
196+
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
197+
},
198+
},
199+
{
200+
items: []Receipt{
201+
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
202+
},
203+
},
204+
{
205+
items: []Receipt{
206+
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
207+
},
208+
},
209+
},
210+
}
211+
if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil {
212+
t.Fatalf("second ReconstructReceiptsPacket failed: %v", err)
213+
}
214+
if _, ok := peer.receiptBuffer[rereq.RequestId]; ok {
215+
t.Fatalf("receiptBuffer should be cleared after delivery")
216+
}
217+
if _, ok := peer.requestedReceipts[rereq.RequestId]; ok {
218+
t.Fatalf("requestedReceipts should be cleared after delivery")
219+
}
220+
}

0 commit comments

Comments
 (0)