15
15
pkg/ eth/ common,
16
16
pkg/ stew/ [interval_set, sorted_set],
17
17
../ worker_desc,
18
- ./ headers_staged/ [headers_fetch, staged_collect, staged_headers],
18
+ ./ headers_staged/ [headers_fetch, staged_headers],
19
19
./ headers_unproc
20
20
21
21
# ------------------------------------------------------------------------------
@@ -27,7 +27,7 @@ func headersStagedCollectOk*(buddy: BeaconBuddyRef): bool =
27
27
if buddy.ctrl.running:
28
28
let ctx = buddy.ctx
29
29
if 0 < ctx.headersUnprocAvail() and
30
- not ctx.collectModeStopped ():
30
+ not ctx.headersModeStopped ():
31
31
return true
32
32
false
33
33
@@ -93,92 +93,56 @@ proc headersStagedCollect*(
93
93
discard ctx.headersUnprocFetch(top - dangling).expect(" iv" )
94
94
95
95
let
96
- # Reserve the full range of block numbers so they can be appended in a
97
- # row. This avoid some fragmentation when header chains are stashed by
98
- # multiple peers, i.e. they interleave peer task-wise.
99
- iv = ctx.headersUnprocFetch(nFetchHeadersBatchListLen).valueOr:
100
- break fetchHeadersBody # done, exit this function
101
-
102
96
# Get parent hash from the most senior stored header
103
97
parent = ctx.hdrCache.antecedent.parentHash
104
98
105
- # Fetch headers and store them on the header chain cache. The function
106
- # returns the last unprocessed block number
107
- bottom = await buddy.collectAndStashOnDiskCache(iv, parent, info)
108
-
109
- # Check whether there were some headers fetched at all
110
- if bottom < iv.maxPt:
111
- nStored += (iv.maxPt - bottom) # statistics
112
- ctx.pool.seenData = true # header data exist
99
+ # Fetch some headers
100
+ rev = (await buddy.headersFetch(
101
+ parent, nFetchHeadersRequest, info)).valueOr:
102
+ break fetchHeadersBody # error => exit block
113
103
114
- # Job might have been cancelled or completed while downloading headers.
115
- # If so, no more bookkeeping of headers must take place. The *books*
116
- # might have been reset and prepared for the next stage.
117
- if ctx.collectModeStopped():
118
- trace info & " : stopped fetching/storing headers" , peer, iv,
119
- bottom= bottom.bnStr, nStored, syncState= ($ buddy.syncState)
120
- break fetchHeadersBody # done, exit this function
104
+ ctx.pool.seenData = true # header data exist
121
105
122
- # Commit partially processed block numbers
123
- if iv.minPt <= bottom:
124
- ctx.headersUnprocCommit(iv,iv.minPt,bottom) # partial success only
125
- break fetchHeadersBody # done, exit this function
106
+ # Store it on the header chain cache
107
+ let dTop = ctx.hdrCache.antecedent.number # current antecedent
108
+ if not buddy.headersStashOnDisk(rev, buddy.peerID, info):
109
+ break fetchHeadersBody # error => exit block
126
110
127
- ctx.headersUnprocCommit(iv) # all headers processed
111
+ let dBottom = ctx.hdrCache.antecedent.number # update new antecedent
112
+ nStored += (dTop - dBottom) # statistics
128
113
129
- debug info & " : fetched headers count" , peer,
130
- unprocTop= ctx.headersUnprocAvailTop.bnStr,
131
- D= ctx.hdrCache.antecedent.bnStr, nStored, nStagedQ= ctx.hdr.staged.len,
132
- syncState= ($ buddy.syncState)
114
+ if dBottom == dTop:
115
+ break fetchHeadersBody # nothing achieved
133
116
134
- # Buddy might have been cancelled while downloading headers.
135
- if buddy.ctrl.stopped:
136
- break fetchHeadersBody
117
+ if buddy.ctrl.stopped: # peer was cancelled
118
+ break fetchHeadersBody # done, exit this block
137
119
138
120
# End while: `collectAndStashOnDiskCache()`
139
121
140
122
# Continue opportunistically fetching by block number rather than hash. The
141
123
# fetched headers need to be staged and checked/serialised later.
142
124
if ctx.hdr.staged.len + ctx.hdr.reserveStaged < headersStagedQueueLengthMax:
143
125
144
- let
145
- # Comment see deterministic case
146
- iv = ctx.headersUnprocFetch(nFetchHeadersBatchListLen).valueOr:
147
- break fetchHeadersBody # done, exit this function
148
-
149
- # This record will accumulate the fetched headers. It must be on the
150
- # heap so that `async` can capture that properly.
151
- lhc = (ref LinkedHChain)(peerID: buddy.peerID)
152
-
153
- # Fetch headers and fill up the headers list of `lhc`. The function
154
- # returns the last unprocessed block number.
126
+ # Fetch headers
155
127
ctx.hdr.reserveStaged.inc # Book a slot on `staged`
156
- let bottom = await buddy.collectAndStageOnMemQueue(iv, lhc, info)
128
+ let rc = await buddy.headersFetch(
129
+ EMPTY_ROOT_HASH, nFetchHeadersRequest, info)
157
130
ctx.hdr.reserveStaged.dec # Free that slot again
158
131
159
- nQueued = lhc.revHdrs.len # statistics
132
+ if rc.isErr:
133
+ break fetchHeadersBody # done, exit this block
160
134
161
- # Job might have been cancelled or completed while downloading headers.
162
- # If so, no more bookkeeping of headers must take place. The *books*
163
- # might have been reset and prepared for the next stage.
164
- if ctx.collectModeStopped():
165
- trace info & " : stopped fetching/staging headers" , peer, iv,
166
- bottom= bottom.bnStr, nStored, syncState= ($ buddy.syncState)
167
- break fetchHeadersBody # done, exit this function
168
-
169
- # Store `lhc` chain on the `staged` queue if there is any
170
- if 0 < lhc.revHdrs.len:
171
- let qItem = ctx.hdr.staged.insert(iv.maxPt).valueOr:
172
- raiseAssert info & " : duplicate key on staged queue iv=" & $ iv
173
- qItem.data = lhc[]
174
-
175
- # Commit processed block numbers
176
- if iv.minPt <= bottom:
177
- ctx.headersUnprocCommit(iv,iv.minPt,bottom) # partial success only
178
- break fetchHeadersBody # done, exit this function
135
+ let
136
+ # Insert headers list on the `staged` queue
137
+ key = rc.value[0 ].number
138
+ qItem = ctx.hdr.staged.insert(key).valueOr:
139
+ raiseAssert info & " : duplicate key on staged queue" &
140
+ " iv=" & (rc.value[^ 1 ].number,key).bnStr
141
+ qItem.data.revHdrs = rc.value
142
+ qItem.data.peerID = buddy.peerID
179
143
180
- ctx.headersUnprocCommit(iv) # all headers processed
181
- # End inner block
144
+ nQueued = rc.value.len # statistics
145
+ # End if
182
146
183
147
# End block: `fetchHeadersBody`
184
148
@@ -196,9 +160,10 @@ proc headersStagedCollect*(
196
160
return
197
161
198
162
info " Queued/staged or DB/stored headers" ,
199
- unprocTop= (if ctx.collectModeStopped (): " n/a"
163
+ unprocTop= (if ctx.headersModeStopped (): " n/a"
200
164
else : ctx.headersUnprocAvailTop.bnStr),
201
- nQueued, nStored, nStagedQ= ctx.hdr.staged.len, nSyncPeers= ctx.pool.nBuddies
165
+ nQueued, nStored, nStagedQ= ctx.hdr.staged.len,
166
+ nSyncPeers= ctx.pool.nBuddies
202
167
203
168
204
169
proc headersStagedProcess* (buddy: BeaconBuddyRef; info: static [string ]): bool =
@@ -216,22 +181,22 @@ proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]): bool =
216
181
return false # switch peer
217
182
218
183
var
219
- nStored = 0 # statistics
220
- switchPeer = false # for return code
184
+ nStored = 0 u64 # statistics
185
+ switchPeer = false # for return code
221
186
222
187
while ctx.hdrCache.state == collecting:
223
188
224
189
# Fetch list with largest block numbers
225
190
let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr:
226
- break # all done
191
+ break # all done
227
192
228
193
let
229
194
minNum = qItem.data.revHdrs[^ 1 ].number
230
195
maxNum = qItem.data.revHdrs[0 ].number
231
196
dangling = ctx.hdrCache.antecedent.number
232
197
if maxNum + 1 < dangling:
233
- debug info & " : gap, serialisation postponed" , peer,
234
- qItem= qItem.data.bnStr, D= dangling.bnStr, nStored,
198
+ trace info & " : gap, serialisation postponed" , peer,
199
+ qItem= qItem.data.revHdrs. bnStr, D= dangling.bnStr, nStored,
235
200
nStagedQ= ctx.hdr.staged.len, nSyncPeers= ctx.pool.nBuddies
236
201
switchPeer = true # there is a gap -- come back later
237
202
break
@@ -240,18 +205,14 @@ proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]): bool =
240
205
discard ctx.hdr.staged.delete(qItem.key)
241
206
242
207
# Store headers on database
243
- if not buddy.headersStashOnDisk(qItem.data.revHdrs, info):
244
- # Error mark buddy that produced that unusable headers list
245
- ctx.incHdrProcErrors qItem.data.peerID
246
-
208
+ if not buddy.headersStashOnDisk(
209
+ qItem.data.revHdrs, qItem.data.peerID, info):
247
210
ctx.headersUnprocAppend(minNum, maxNum)
248
211
switchPeer = true
249
212
break
250
213
251
- # Antecedent `dangling` of the header cache might not be at `revHdrs[^1]`.
252
- let revHdrsLen = maxNum - ctx.hdrCache.antecedent.number + 1
253
-
254
- nStored += revHdrsLen.int # count headers
214
+ # Antecedent of the header cache might not be at `revHdrs[^1]`.
215
+ nStored += (maxNum - ctx.hdrCache.antecedent.number + 1 ) # count headers
255
216
# End while loop
256
217
257
218
if 0 < nStored:
0 commit comments