41
41
BaseService ,
42
42
)
43
43
44
+ from trinity ._utils .headers import (
45
+ skip_complete_headers ,
46
+ )
44
47
from trinity .chains .base import BaseAsyncChain
45
48
from trinity .db .eth1 .header import BaseAsyncHeaderDB
46
49
from trinity .protocol .common .peer import (
@@ -109,7 +112,7 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
109
112
# When we start the sync with a peer, we always request up to MAX_REORG_DEPTH extra
110
113
# headers before our current head's number, in case there were chain reorgs since the last
111
114
# time _sync() was called. All of the extra headers that are already present in our DB
112
- # will be discarded by _fetch_missing_headers () so we don't unnecessarily process them
115
+ # will be discarded by skip_complete_headers () so we don't unnecessarily process them
113
116
# again.
114
117
start_at = max (GENESIS_BLOCK_NUMBER + 1 , head .block_number - MAX_REORG_DEPTH )
115
118
while self .is_operational :
@@ -121,12 +124,10 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
121
124
all_headers = await self .wait (self ._request_headers (peer , start_at ))
122
125
if last_received_header is None :
123
126
# Skip over existing headers on the first run-through
124
- headers = tuple (
125
- # The inner list comprehension is needed because async_generators
126
- # cannot be cast to a tuple.
127
- [header async for header in self ._get_missing_tail (all_headers )]
127
+ new_headers = await self .wait (
128
+ skip_complete_headers (all_headers , self .logger , self .db .coro_header_exists )
128
129
)
129
- if len (headers ) == 0 and len (all_headers ) > 0 :
130
+ if len (new_headers ) == 0 and len (all_headers ) > 0 :
130
131
head = await self .wait (self .db .coro_get_canonical_head ())
131
132
start_at = max (
132
133
all_headers [- 1 ].block_number + 1 ,
@@ -140,8 +141,8 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
140
141
)
141
142
continue
142
143
else :
143
- headers = all_headers
144
- self .logger .debug2 ('sync received new headers: %s' , headers )
144
+ new_headers = all_headers
145
+ self .logger .debug2 ('sync received new headers: %s' , new_headers )
145
146
except OperationCancelled :
146
147
self .logger .info ("Sync with %s completed" , peer )
147
148
break
@@ -157,7 +158,7 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
157
158
await peer .disconnect (DisconnectReason .useless_peer )
158
159
break
159
160
160
- if not headers :
161
+ if not new_headers :
161
162
if last_received_header is None :
162
163
request_parent = head
163
164
else :
@@ -177,7 +178,7 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
177
178
self .logger .info ("Got no new headers from %s, aborting sync" , peer )
178
179
break
179
180
180
- first = headers [0 ]
181
+ first = new_headers [0 ]
181
182
first_parent = None
182
183
if last_received_header is None :
183
184
# on the first request, make sure that the earliest ancestor has a parent in our db
@@ -205,27 +206,27 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
205
206
"Got new header chain from %s: %s..%s" ,
206
207
peer ,
207
208
first ,
208
- headers [- 1 ],
209
+ new_headers [- 1 ],
209
210
)
210
211
try :
211
212
await self .chain .coro_validate_chain (
212
213
last_received_header or first_parent ,
213
- headers ,
214
+ new_headers ,
214
215
self ._seal_check_random_sample_rate ,
215
216
)
216
217
except ValidationError as e :
217
218
self .logger .warning ("Received invalid headers from %s, disconnecting: %s" , peer , e )
218
219
await peer .disconnect (DisconnectReason .subprotocol_error )
219
220
break
220
221
221
- for header in headers :
222
+ for header in new_headers :
222
223
head_td += header .difficulty
223
224
224
225
# Setting the latest header hash for the peer, before queuing header processing tasks
225
226
self ._target_header_hash = peer .head_hash
226
227
227
- yield headers
228
- last_received_header = headers [- 1 ]
228
+ yield new_headers
229
+ last_received_header = new_headers [- 1 ]
229
230
self .sync_progress = self .sync_progress .update_current_block (
230
231
last_received_header .block_number ,
231
232
)
@@ -243,26 +244,6 @@ async def _request_headers(
243
244
reverse = False ,
244
245
)
245
246
246
- async def _get_missing_tail (
247
- self ,
248
- headers : Tuple [BlockHeader , ...]) -> AsyncIterator [BlockHeader ]:
249
- """
250
- We only want headers that are missing, so we iterate over the list
251
- until we find the first missing header, after which we return all of
252
- the remaining headers.
253
- """
254
- iter_headers = iter (headers )
255
- for header in iter_headers :
256
- is_present = await self .wait (self .db .coro_header_exists (header .hash ))
257
- if is_present :
258
- self .logger .debug ("Discarding header that we already have: %s" , header )
259
- else :
260
- yield header
261
- break
262
-
263
- for header in iter_headers :
264
- yield header
265
-
266
247
267
248
class BaseBlockImporter (ABC ):
268
249
@abstractmethod
0 commit comments