@@ -139,6 +139,7 @@ def get_logs(
139
139
p_bar = None ,
140
140
no_retry : bool = False ,
141
141
use_subsquid : bool = True ,
142
+ get_logs_by_block_hash : bool = False
142
143
) -> list [LogReceipt ]:
143
144
filter_block_range = self .w3 .filter_block_range
144
145
if filter_block_range == 0 :
@@ -149,33 +150,9 @@ def get_logs(
149
150
assert "fromBlock" not in filter_params and "toBlock" not in filter_params
150
151
return self .get_logs_inner (filter_params , no_retry = no_retry )
151
152
152
- from_block_original : BlockIdentifier = filter_params .get ("fromBlock" , "earliest" )
153
- to_block_original : BlockIdentifier = filter_params .get ("toBlock" , "latest" )
154
-
155
- # sanitizing block numbers, could be strings like "latest"
156
- if isinstance (from_block_original , int ):
157
- from_block_body = None
158
- from_block = from_block_original
159
- elif isinstance (from_block_original , AttributeDict ) or isinstance (from_block_original , dict ):
160
- from_block_body = from_block_original
161
- from_block = from_block_original ["number" ]
162
- filter_params = {** filter_params , "fromBlock" : from_block }
163
- else :
164
- from_block_body = self .get_block (from_block_original )
165
- from_block = from_block_body ["number" ]
166
- filter_params = {** filter_params , "fromBlock" : from_block }
167
-
168
- if isinstance (to_block_original , int ):
169
- to_block_body = None
170
- to_block = to_block_original
171
- elif isinstance (to_block_original , AttributeDict ) or isinstance (to_block_original , dict ):
172
- to_block_body = to_block_original
173
- to_block = to_block_original ["number" ]
174
- filter_params = {** filter_params , "toBlock" : to_block }
175
- else :
176
- to_block_body = self .get_block (to_block_original )
177
- to_block = to_block_body ["number" ]
178
- filter_params = {** filter_params , "toBlock" : to_block }
153
+ from_block , from_block_body = self .sanitize_block (filter_params .get ("fromBlock" , "latest" ))
154
+ to_block , to_block_body = self .sanitize_block (filter_params .get ("toBlock" , "latest" ))
155
+ filter_params = {** filter_params , "fromBlock" : from_block , "toBlock" : to_block }
179
156
180
157
assert to_block >= from_block , f"{ from_block = } , { to_block = } "
181
158
@@ -203,60 +180,63 @@ def get_logs(
203
180
use_subsquid = use_subsquid ,
204
181
)
205
182
206
- # the latest blocks might be available on some nodes but not at others.
207
- # setting toBlock to a block bigger than the latest known block of the node
208
- # simply ignores logs from the missing block
209
- # to prevent this, we get the latest blocks individually by their hashes
210
- unstable_blocks = self .w3 .unstable_blocks
211
- if to_block > self .w3 .latest_seen_block - unstable_blocks and to_block > (last_stable_block := (self .get_block_number () - unstable_blocks )):
212
- results = []
213
- if from_block <= last_stable_block :
214
- results += self .get_logs ({** filter_params , "toBlock" : last_stable_block }, ** kwargs )
183
+ if use_subsquid and from_block < self .w3 .latest_seen_block - self .w3 .unstable_blocks :
184
+ kwargs ["use_subsquid" ] = False # make sure we only try once with Subsquid
185
+ try :
186
+ # trying to get logs from SubSquid
187
+ till_block , results = get_filter (
188
+ chain_id = self .chain_id ,
189
+ filter_params = {** filter_params , "toBlock" : min (to_block , self .w3 .latest_seen_block - self .w3 .unstable_blocks )},
190
+ partial_allowed = True ,
191
+ p_bar = p_bar ,
192
+ )
193
+ if till_block >= to_block :
194
+ return results
195
+ return results + self .get_logs ({** filter_params , "fromBlock" : till_block + 1 }, ** kwargs )
196
+ except Exception as e :
197
+ print (f"Getting logs from SubSquid threw exception { repr (e )} , falling back to RPC" )
215
198
199
+ if get_logs_by_block_hash or to_block > self .w3 .latest_seen_block - self .w3 .unstable_blocks :
200
+ # getting logs via from and to block range sometimes drops logs.
201
+ # This does not happen when getting them individually for each block by their block hash
216
202
# get all block hashes and ensure they build upon each other
217
- block_hashes = []
218
- for block_number in range (max (last_stable_block + 1 , from_block ), to_block + 1 ):
219
- block = self .get_block (block_number , no_retry = no_retry )
220
- if block_hashes :
221
- # make sure chain of blocks is consistent with each block building on the previous one
222
- assert block ["parentHash" ] == block_hashes [- 1 ], f"{ block_hashes [- 1 ].hex ()= } , { block ['parentHash' ].hex ()= } "
203
+ with self .w3 .batch_requests () as batch :
204
+ batch .add_mapping ({
205
+ self .w3 .eth ._get_block : list (range (from_block , to_block + 1 ))
206
+ })
207
+ blocks : list [BlockData ] = batch .execute ()
208
+ assert len (blocks ) == num_blocks
209
+
210
+ # make sure chain of blocks is consistent with each block building on the previous one
211
+ for i , block_number in enumerate (range (from_block , to_block + 1 )):
212
+ block = blocks [i ]
213
+ if i != 0 :
214
+ assert block ["parentHash" ] == blocks [i - 1 ]["hash" ], f"{ blocks [i - 1 ]['hash' ].hex ()= } , { block ['parentHash' ].hex ()= } "
223
215
if block_number == from_block and from_block_body is not None :
224
216
if block ["hash" ] != from_block_body ["hash" ]:
225
217
raise ForkedBlock (f"expected={ from_block_body ['hash' ].hex ()} , actual={ block ['hash' ].hex ()} " )
226
218
if block_number == to_block and to_block_body is not None :
227
219
if block ["hash" ] != to_block_body ["hash" ]:
228
220
raise ForkedBlock (f"expected={ to_block_body ['hash' ].hex ()} , actual={ block ['hash' ].hex ()} " )
229
- block_hashes .append (block ["hash" ])
230
221
231
222
single_hash_filter = filter_params .copy ()
232
223
del single_hash_filter ["fromBlock" ]
233
224
del single_hash_filter ["toBlock" ]
234
- for block_hash in block_hashes :
235
- results += self .get_logs_inner ({** single_hash_filter , "blockHash" : block_hash }, no_retry = no_retry )
236
- if p_bar is not None :
237
- p_bar .update (1 )
225
+ with self .w3 .batch_requests () as batch :
226
+ batch .add_mapping ({
227
+ self .w3 .eth ._get_logs : [{** single_hash_filter , "blockHash" : block ["hash" ]} for block in blocks ]
228
+ })
229
+ results_per_block : list [list [LogReceipt ]] = batch .execute ()
230
+ assert len (results_per_block ) == num_blocks
231
+ if p_bar is not None :
232
+ p_bar .update (len (blocks ))
233
+ results = sum (results_per_block , [])
238
234
return results
239
235
240
236
# getting logs for a single block, which is not at the chain head. No drama
241
237
if num_blocks == 1 :
242
238
return self .get_logs_inner (filter_params , no_retry = no_retry )
243
239
244
- if use_subsquid and from_block < self .w3 .latest_seen_block - 100 :
245
- kwargs ["use_subsquid" ] = False # make sure we only try once with Subsquid
246
- try :
247
- # trying to get logs from SubSquid
248
- till_block , results = get_filter (
249
- chain_id = self .chain_id ,
250
- filter_params = filter_params ,
251
- partial_allowed = True ,
252
- p_bar = p_bar ,
253
- )
254
- if till_block >= to_block :
255
- return results
256
- return results + self .get_logs ({** filter_params , "fromBlock" : till_block + 1 }, ** kwargs )
257
- except Exception as e :
258
- print (f"Getting logs from SubSquid threw exception { repr (e )} , falling back to RPC" )
259
-
260
240
# if we already know that the filter range is too large, split it
261
241
if num_blocks > filter_block_range :
262
242
results = []
@@ -268,7 +248,7 @@ def get_logs(
268
248
}, ** kwargs )
269
249
return results
270
250
271
- # get logs
251
+ # get logs and split on exception
272
252
try :
273
253
events = self ._get_logs (filter_params )
274
254
if p_bar is not None :
@@ -281,6 +261,19 @@ def get_logs(
281
261
right_filter = {** filter_params , "fromBlock" : mid_block + 1 }
282
262
return self .get_logs (left_filter , ** kwargs ) + self .get_logs (right_filter , ** kwargs )
283
263
264
+
265
+ def sanitize_block (self , block : BlockIdentifier | BlockData ) -> tuple [int , BlockData | None ]:
266
+ if isinstance (block , int ):
267
+ block_body = None
268
+ block_number = block
269
+ elif isinstance (block , AttributeDict ) or isinstance (block , dict ):
270
+ block_body = block
271
+ block_number = block ["number" ]
272
+ else :
273
+ block_body = self .get_block (block )
274
+ block_number = block_body ["number" ]
275
+ return block_number , block_body
276
+
284
277
def get_logs_inner (self , filter_params : FilterParams , no_retry : bool = False ):
285
278
if not self .w3 .should_retry :
286
279
no_retry = True
@@ -289,3 +282,25 @@ def get_logs_inner(self, filter_params: FilterParams, no_retry: bool = False):
289
282
def _chain_id (self ):
290
283
# usually this causes an RPC call and is used in every eth_call. Getting it once in the init and then not again.
291
284
return self .chain_id_cached
285
+
286
+
287
+ def main (
288
+ node_url = "https://rpc-core.icecreamswap.com" ,
289
+ usdt_address = "0x900101d06A7426441Ae63e9AB3B9b0F63Be145F1" ,
290
+ ):
291
+ from eth_utils import to_checksum_address
292
+
293
+ usdt_address = to_checksum_address (usdt_address )
294
+
295
+ w3 = Web3Advanced (node_url = node_url )
296
+
297
+ latest_block = w3 .eth .block_number
298
+ logs = w3 .eth .get_logs ({
299
+ "address" : usdt_address ,
300
+ "fromBlock" : latest_block - 10_000 ,
301
+ "toBlock" : latest_block ,
302
+ }, use_subsquid = False , get_logs_by_block_hash = True )
303
+ print (len (logs ), logs [0 ])
304
+
305
+ if __name__ == "__main__" :
306
+ main ()
0 commit comments