@@ -31,10 +31,16 @@ import (
31
31
"gopkg.in/fatih/set.v0"
32
32
)
33
33
34
+ // Hash and block fetchers belonging to eth/61 and below
34
35
type relativeHashFetcherFn func (common.Hash ) error
35
36
type absoluteHashFetcherFn func (uint64 , int ) error
36
37
type blockFetcherFn func ([]common.Hash ) error
37
38
39
+ // Block header and body fethers belonging to eth/62 and above
40
+ type relativeHeaderFetcherFn func (common.Hash , int , int , bool ) error
41
+ type absoluteHeaderFetcherFn func (uint64 , int , int , bool ) error
42
+ type blockBodyFetcherFn func ([]common.Hash ) error
43
+
38
44
var (
39
45
errAlreadyFetching = errors .New ("already fetching blocks from peer" )
40
46
errAlreadyRegistered = errors .New ("peer is already registered" )
@@ -54,25 +60,37 @@ type peer struct {
54
60
55
61
ignored * set.Set // Set of hashes not to request (didn't have previously)
56
62
57
- getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash
58
- getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position
59
- getBlocks blockFetcherFn // Method to retrieve a batch of blocks
63
+ getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
64
+ getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
65
+ getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks
66
+
67
+ getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
68
+ getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
69
+ getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
60
70
61
71
version int // Eth protocol version number to switch strategies
62
72
}
63
73
64
74
// newPeer create a new downloader peer, with specific hash and block retrieval
65
75
// mechanisms.
66
- func newPeer (id string , version int , head common.Hash , getRelHashes relativeHashFetcherFn , getAbsHashes absoluteHashFetcherFn , getBlocks blockFetcherFn ) * peer {
76
+ func newPeer (id string , version int , head common.Hash ,
77
+ getRelHashes relativeHashFetcherFn , getAbsHashes absoluteHashFetcherFn , getBlocks blockFetcherFn , // eth/61 callbacks, remove when upgrading
78
+ getRelHeaders relativeHeaderFetcherFn , getAbsHeaders absoluteHeaderFetcherFn , getBlockBodies blockBodyFetcherFn ) * peer {
67
79
return & peer {
68
- id : id ,
69
- head : head ,
70
- capacity : 1 ,
80
+ id : id ,
81
+ head : head ,
82
+ capacity : 1 ,
83
+ ignored : set .New (),
84
+
71
85
getRelHashes : getRelHashes ,
72
86
getAbsHashes : getAbsHashes ,
73
87
getBlocks : getBlocks ,
74
- ignored : set .New (),
75
- version : version ,
88
+
89
+ getRelHeaders : getRelHeaders ,
90
+ getAbsHeaders : getAbsHeaders ,
91
+ getBlockBodies : getBlockBodies ,
92
+
93
+ version : version ,
76
94
}
77
95
}
78
96
@@ -83,8 +101,8 @@ func (p *peer) Reset() {
83
101
p .ignored .Clear ()
84
102
}
85
103
86
- // Fetch sends a block retrieval request to the remote peer.
87
- func (p * peer ) Fetch (request * fetchRequest ) error {
104
+ // Fetch61 sends a block retrieval request to the remote peer.
105
+ func (p * peer ) Fetch61 (request * fetchRequest ) error {
88
106
// Short circuit if the peer is already fetching
89
107
if ! atomic .CompareAndSwapInt32 (& p .idle , 0 , 1 ) {
90
108
return errAlreadyFetching
@@ -101,10 +119,28 @@ func (p *peer) Fetch(request *fetchRequest) error {
101
119
return nil
102
120
}
103
121
104
- // SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
122
+ // Fetch sends a block body retrieval request to the remote peer.
123
+ func (p * peer ) Fetch (request * fetchRequest ) error {
124
+ // Short circuit if the peer is already fetching
125
+ if ! atomic .CompareAndSwapInt32 (& p .idle , 0 , 1 ) {
126
+ return errAlreadyFetching
127
+ }
128
+ p .started = time .Now ()
129
+
130
+ // Convert the header set to a retrievable slice
131
+ hashes := make ([]common.Hash , 0 , len (request .Headers ))
132
+ for _ , header := range request .Headers {
133
+ hashes = append (hashes , header .Hash ())
134
+ }
135
+ go p .getBlockBodies (hashes )
136
+
137
+ return nil
138
+ }
139
+
140
+ // SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests.
105
141
// Its block retrieval allowance will also be updated either up- or downwards,
106
142
// depending on whether the previous fetch completed in time or not.
107
- func (p * peer ) SetIdle () {
143
+ func (p * peer ) SetIdle61 () {
108
144
// Update the peer's download allowance based on previous performance
109
145
scale := 2.0
110
146
if time .Since (p .started ) > blockSoftTTL {
@@ -131,6 +167,36 @@ func (p *peer) SetIdle() {
131
167
atomic .StoreInt32 (& p .idle , 0 )
132
168
}
133
169
170
+ // SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
171
+ // Its block body retrieval allowance will also be updated either up- or downwards,
172
+ // depending on whether the previous fetch completed in time or not.
173
+ func (p * peer ) SetIdle () {
174
+ // Update the peer's download allowance based on previous performance
175
+ scale := 2.0
176
+ if time .Since (p .started ) > bodySoftTTL {
177
+ scale = 0.5
178
+ if time .Since (p .started ) > bodyHardTTL {
179
+ scale = 1 / float64 (MaxBodyFetch ) // reduces capacity to 1
180
+ }
181
+ }
182
+ for {
183
+ // Calculate the new download bandwidth allowance
184
+ prev := atomic .LoadInt32 (& p .capacity )
185
+ next := int32 (math .Max (1 , math .Min (float64 (MaxBodyFetch ), float64 (prev )* scale )))
186
+
187
+ // Try to update the old value
188
+ if atomic .CompareAndSwapInt32 (& p .capacity , prev , next ) {
189
+ // If we're having problems at 1 capacity, try to find better peers
190
+ if next == 1 {
191
+ p .Demote ()
192
+ }
193
+ break
194
+ }
195
+ }
196
+ // Set the peer to idle to allow further block requests
197
+ atomic .StoreInt32 (& p .idle , 0 )
198
+ }
199
+
134
200
// Capacity retrieves the peers block download allowance based on its previously
135
201
// discovered bandwidth capacity.
136
202
func (p * peer ) Capacity () int {
0 commit comments