@@ -73,7 +73,7 @@ def __init__(self,
73
73
self .root_hash = root_hash
74
74
self .scheduler = StateSync (root_hash , account_db )
75
75
self ._handler = PeerRequestHandler (self .chaindb , self .logger , self .cancel_token )
76
- self ._requested_nodes : Dict [ETHPeer , Tuple [float , List [Hash32 ]]] = {}
76
+ self ._active_requests : Dict [ETHPeer , Tuple [float , List [Hash32 ]]] = {}
77
77
self ._peer_missing_nodes : Dict [ETHPeer , List [Hash32 ]] = collections .defaultdict (list )
78
78
self ._executor = get_asyncio_executor ()
79
79
@@ -94,7 +94,7 @@ async def get_peer_for_request(self, node_keys: Set[Hash32]) -> ETHPeer:
94
94
"""Return an idle peer that may have any of the trie nodes in node_keys."""
95
95
async for peer in self .peer_pool :
96
96
peer = cast (ETHPeer , peer )
97
- if peer in self ._requested_nodes :
97
+ if peer in self ._active_requests :
98
98
self .logger .trace ("%s is not idle, skipping it" , peer )
99
99
continue
100
100
if node_keys .difference (self ._peer_missing_nodes [peer ]):
@@ -141,7 +141,7 @@ async def _handle_msg(
141
141
pass
142
142
elif isinstance (cmd , eth .NodeData ):
143
143
msg = cast (List [bytes ], msg )
144
- if peer not in self ._requested_nodes :
144
+ if peer not in self ._active_requests :
145
145
# This is probably a batch that we retried after a timeout and ended up receiving
146
146
# more than once, so ignore but log as an INFO just in case.
147
147
self .logger .info (
@@ -150,7 +150,7 @@ async def _handle_msg(
150
150
return
151
151
152
152
self .logger .debug ("Got %d NodeData entries from %s" , len (msg ), peer )
153
- _ , requested_node_keys = self ._requested_nodes .pop (peer )
153
+ _ , requested_node_keys = self ._active_requests .pop (peer )
154
154
155
155
loop = asyncio .get_event_loop ()
156
156
node_keys = await loop .run_in_executor (self ._executor , list , map (keccak , msg ))
@@ -208,7 +208,7 @@ async def request_nodes(self, node_keys: Iterable[Hash32]) -> None:
208
208
candidates = list (not_yet_requested .difference (self ._peer_missing_nodes [peer ]))
209
209
batch = candidates [:eth .MAX_STATE_FETCH ]
210
210
not_yet_requested = not_yet_requested .difference (batch )
211
- self ._requested_nodes [peer ] = (time .time (), batch )
211
+ self ._active_requests [peer ] = (time .time (), batch )
212
212
self .logger .debug ("Requesting %d trie nodes to %s" , len (batch ), peer )
213
213
peer .sub_proto .send_get_node_data (batch )
214
214
@@ -218,12 +218,12 @@ async def _periodically_retry_timedout(self) -> None:
218
218
oldest_request_time = now
219
219
timed_out = []
220
220
# Iterate over a copy of our dict's items as we're going to mutate it.
221
- for peer , (req_time , node_keys ) in list (self ._requested_nodes .items ()):
221
+ for peer , (req_time , node_keys ) in list (self ._active_requests .items ()):
222
222
if now - req_time > self ._reply_timeout :
223
223
self .logger .debug (
224
224
"Timed out waiting for %d nodes from %s" , len (node_keys ), peer )
225
225
timed_out .extend (node_keys )
226
- self ._requested_nodes .pop (peer )
226
+ self ._active_requests .pop (peer )
227
227
elif req_time < oldest_request_time :
228
228
oldest_request_time = req_time
229
229
if timed_out :
@@ -276,7 +276,7 @@ async def _run(self) -> None:
276
276
async def _periodically_report_progress (self ) -> None :
277
277
while self .is_running :
278
278
requested_nodes = sum (
279
- len (node_keys ) for _ , node_keys in self ._requested_nodes .values ())
279
+ len (node_keys ) for _ , node_keys in self ._active_requests .values ())
280
280
self .logger .info ("====== State sync progress ========" )
281
281
self .logger .info ("Nodes processed: %d" , self ._total_processed_nodes )
282
282
self .logger .info ("Nodes processed per second (average): %d" ,
0 commit comments