Skip to content

Commit 6c8d91e

Browse files
authored
Dynamic blocks (#887)
* Separate events retrieval and their handling. * Adjust handling of retry queue flag. * Fix ExchangeCreated handling.
1 parent 23fb565 commit 6c8d91e

File tree

1 file changed

+131
-88
lines changed

1 file changed

+131
-88
lines changed

aquarius/events/events_monitor.py

Lines changed: 131 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
import os
88
import time
9+
from distutils.util import strtobool
910
from threading import Thread
1011

1112
import elasticsearch
@@ -147,7 +148,9 @@ def do_run_monitor(self):
147148

148149
def process_current_blocks(self):
149150
"""Process all blocks from the last processed block to the current block."""
150-
if os.getenv("PROCESS_RETRY_QUEUE"):
151+
process_queue = strtobool(os.getenv("PROCESS_RETRY_QUEUE", "0"))
152+
153+
if process_queue:
151154
self.retry_mechanism.process_queue()
152155

153156
last_block = self.get_last_processed_block()
@@ -188,28 +191,73 @@ def process_block_range(self, from_block, to_block):
188191
self._chain_id,
189192
]
190193

191-
event_processors = {
192-
"EVENT_METADATA_CREATED": MetadataCreatedProcessor,
193-
"EVENT_METADATA_UPDATED": MetadataUpdatedProcessor,
194-
"EVENT_METADATA_STATE": MetadataStateProcessor,
194+
# event retrieval
195+
all_events = self.get_all_events(from_block, to_block)
196+
197+
regular_event_processors = {
198+
EventTypes.EVENT_METADATA_CREATED: MetadataCreatedProcessor,
199+
EventTypes.EVENT_METADATA_UPDATED: MetadataUpdatedProcessor,
200+
EventTypes.EVENT_METADATA_STATE: MetadataStateProcessor,
195201
}
196-
for event_name in event_processors:
197-
self.handle_regular_event_processor(
198-
event_name,
199-
event_processors[event_name],
200-
processor_args,
201-
from_block,
202-
to_block,
203-
)
204202

205-
self.handle_price_change(from_block, to_block)
206-
self.handle_token_uri_update(from_block, to_block)
207-
self.handle_transfer_ownership(from_block, to_block)
203+
# event handling
204+
for event_name, events_to_process in all_events.items():
205+
if event_name == EventTypes.EVENT_TRANSFER:
206+
self.handle_transfer_ownership(events_to_process)
207+
elif event_name in regular_event_processors.keys():
208+
self.handle_regular_event_processor(
209+
event_name,
210+
regular_event_processors[event_name],
211+
processor_args,
212+
events_to_process,
213+
)
214+
elif event_name in [
215+
EventTypes.EVENT_ORDER_STARTED,
216+
EventTypes.EVENT_EXCHANGE_CREATED,
217+
EventTypes.EVENT_EXCHANGE_RATE_CHANGED,
218+
EventTypes.EVENT_DISPENSER_CREATED,
219+
]:
220+
self.handle_price_change(event_name, events_to_process, to_block)
221+
elif event_name == EventTypes.EVENT_TOKEN_URI_UPDATE:
222+
self.handle_token_uri_update(events_to_process)
208223

209224
self.store_last_processed_block(to_block)
210225

226+
def get_all_events(self, from_block, to_block):
227+
all_events = {
228+
EventTypes.EVENT_TRANSFER: [],
229+
# "regular" events
230+
EventTypes.EVENT_METADATA_CREATED: [],
231+
EventTypes.EVENT_METADATA_UPDATED: [],
232+
EventTypes.EVENT_METADATA_STATE: [],
233+
# price changed events
234+
EventTypes.EVENT_ORDER_STARTED: [],
235+
EventTypes.EVENT_EXCHANGE_CREATED: [],
236+
EventTypes.EVENT_EXCHANGE_RATE_CHANGED: [],
237+
EventTypes.EVENT_DISPENSER_CREATED: [],
238+
#
239+
EventTypes.EVENT_TOKEN_URI_UPDATE: [],
240+
}
241+
242+
if from_block >= to_block:
243+
return all_events
244+
245+
try:
246+
for event_name in all_events.keys():
247+
all_events[event_name] = self.get_event_logs(
248+
event_name, from_block, to_block
249+
)
250+
251+
return all_events
252+
except Exception:
253+
middle = int((from_block + to_block) / 2)
254+
return merge_list_dictionary(
255+
self.get_all_events(from_block, middle),
256+
self.get_all_events(middle + 1, to_block),
257+
)
258+
211259
def handle_regular_event_processor(
212-
self, event_name, processor, processor_args, from_block, to_block
260+
self, event_name, processor, processor_args, events
213261
):
214262
"""Process emitted events between two given blocks for a given event name.
215263
@@ -220,19 +268,17 @@ def handle_regular_event_processor(
220268
from_block (int): inital block
221269
to_block (int): final block
222270
"""
223-
for event in self.get_event_logs(
224-
EventTypes.get_value(event_name), from_block, to_block
225-
):
271+
for event in events:
226272
dt_contract = self._web3.eth.contract(
227273
abi=ERC721Template.abi,
228274
address=self._web3.toChecksumAddress(event.address),
229275
)
230276
receipt = self._web3.eth.get_transaction_receipt(
231277
event.transactionHash.hex()
232278
)
233-
event_object = dt_contract.events[
234-
EventTypes.get_value(event_name)
235-
]().processReceipt(receipt, errors=DISCARD)[0]
279+
event_object = dt_contract.events[event_name]().processReceipt(
280+
receipt, errors=DISCARD
281+
)[0]
236282
try:
237283
metadata_proofs = dt_contract.events.MetadataValidated().processReceipt(
238284
receipt, errors=DISCARD
@@ -247,77 +293,67 @@ def handle_regular_event_processor(
247293
event.transactionHash.hex(), 0, processor_args[4]
248294
)
249295
logger.exception(
250-
f"Error processing {EventTypes.get_value(event_name)} event: {e}\n"
251-
f"event={event}"
296+
f"Error processing {event_name} event: {e}\n" f"event={event}"
252297
)
253298

254-
def handle_price_change(self, from_block, to_block):
299+
def handle_price_change(self, event_name, events, to_block):
255300
fre = get_fre(self._web3, self._chain_id)
256301
dispenser = get_dispenser(self._web3, self._chain_id)
257302

258-
for event_name in [
259-
EventTypes.EVENT_ORDER_STARTED,
260-
EventTypes.EVENT_EXCHANGE_CREATED,
261-
EventTypes.EVENT_EXCHANGE_RATE_CHANGED,
262-
EventTypes.EVENT_DISPENSER_CREATED,
263-
]:
264-
events = self.get_event_logs(event_name, from_block, to_block)
265-
266-
for event in events:
267-
if event_name == EventTypes.EVENT_EXCHANGE_CREATED:
268-
receipt = self._web3.eth.get_transaction_receipt(
269-
event.transactionHash.hex()
270-
)
271-
erc20_address = receipt.to
272-
elif event_name == EventTypes.EVENT_EXCHANGE_RATE_CHANGED:
273-
receipt = self._web3.eth.get_transaction_receipt(
274-
event.transactionHash.hex()
275-
)
276-
exchange_id = (
277-
fre.events.ExchangeRateChanged()
278-
.processReceipt(receipt)[0]
279-
.args.exchangeId
280-
)
281-
erc20_address = fre.caller.getExchange(exchange_id)[1]
282-
elif event_name == EventTypes.EVENT_DISPENSER_CREATED:
283-
receipt = self._web3.eth.get_transaction_receipt(
284-
event.transactionHash.hex()
285-
)
286-
erc20_address = (
287-
dispenser.events.DispenserCreated()
288-
.processReceipt(receipt)[0]
289-
.args.datatokenAddress
290-
)
291-
else:
292-
erc20_address = event.address
293-
294-
erc20_contract = self._web3.eth.contract(
295-
abi=ERC20Template.abi,
296-
address=self._web3.toChecksumAddress(erc20_address),
303+
for event in events:
304+
if event_name == EventTypes.EVENT_EXCHANGE_CREATED:
305+
receipt = self._web3.eth.get_transaction_receipt(
306+
event.transactionHash.hex()
297307
)
298-
299-
logger.debug(
300-
f"{event_name} detected on ERC20 contract {event.address}."
308+
exchange_id = (
309+
fre.events.ExchangeCreated()
310+
.processReceipt(receipt)[0]
311+
.args.exchangeId
312+
)
313+
erc20_address = fre.caller.getExchange(exchange_id)[1]
314+
elif event_name == EventTypes.EVENT_EXCHANGE_RATE_CHANGED:
315+
receipt = self._web3.eth.get_transaction_receipt(
316+
event.transactionHash.hex()
301317
)
318+
exchange_id = (
319+
fre.events.ExchangeRateChanged()
320+
.processReceipt(receipt)[0]
321+
.args.exchangeId
322+
)
323+
erc20_address = fre.caller.getExchange(exchange_id)[1]
324+
elif event_name == EventTypes.EVENT_DISPENSER_CREATED:
325+
receipt = self._web3.eth.get_transaction_receipt(
326+
event.transactionHash.hex()
327+
)
328+
erc20_address = (
329+
dispenser.events.DispenserCreated()
330+
.processReceipt(receipt)[0]
331+
.args.datatokenAddress
332+
)
333+
else:
334+
erc20_address = event.address
302335

303-
try:
304-
event_processor = OrderStartedProcessor(
305-
erc20_contract.caller.getERC721Address(),
306-
self._es_instance,
307-
to_block,
308-
self._chain_id,
309-
)
310-
event_processor.process()
311-
except Exception as e:
312-
logger.error(
313-
f"Error processing {event_name} event: {e}\n" f"event={event}"
314-
)
315-
316-
def handle_token_uri_update(self, from_block, to_block):
317-
events = self.get_event_logs(
318-
EventTypes.EVENT_TOKEN_URI_UPDATE, from_block, to_block
319-
)
336+
erc20_contract = self._web3.eth.contract(
337+
abi=ERC20Template.abi,
338+
address=self._web3.toChecksumAddress(erc20_address),
339+
)
340+
341+
logger.debug(f"{event_name} detected on ERC20 contract {event.address}.")
320342

343+
try:
344+
event_processor = OrderStartedProcessor(
345+
erc20_contract.caller.getERC721Address(),
346+
self._es_instance,
347+
to_block,
348+
self._chain_id,
349+
)
350+
event_processor.process()
351+
except Exception as e:
352+
logger.error(
353+
f"Error processing {event_name} event: {e}\n" f"event={event}"
354+
)
355+
356+
def handle_token_uri_update(self, events):
321357
for event in events:
322358
try:
323359
event_processor = TokenURIUpdatedProcessor(
@@ -329,9 +365,7 @@ def handle_token_uri_update(self, from_block, to_block):
329365
f"Error processing token update event: {e}\n" f"event={event}"
330366
)
331367

332-
def handle_transfer_ownership(self, from_block, to_block):
333-
events = self.get_event_logs(EventTypes.EVENT_TRANSFER, from_block, to_block)
334-
368+
def handle_transfer_ownership(self, events):
335369
for event in events:
336370
try:
337371
event_processor = TransferProcessor(
@@ -507,3 +541,12 @@ def get_event_logs(self, event_name, from_block, to_block, chunk_size=1000):
507541
)
508542

509543
return all_logs
544+
545+
546+
def merge_list_dictionary(dict_1, dict_2):
547+
dict_3 = {**dict_1, **dict_2}
548+
for key, value in dict_3.items():
549+
if key in dict_1 and key in dict_2:
550+
dict_3[key] = value + dict_1[key]
551+
552+
return dict_3

0 commit comments

Comments
 (0)