Skip to content

Commit 9b9d664

Browse files
refactor entity ops watching
1 parent 1629e23 commit 9b9d664

File tree

5 files changed

+668
-114
lines changed

5 files changed

+668
-114
lines changed

src/arkiv/events.py

Lines changed: 66 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
from web3.contract.contract import ContractEvent
1212
from web3.types import EventData, LogReceipt
1313

14-
from .contract import CREATED_EVENT, UPDATED_EVENT
14+
from .contract import CREATED_EVENT, EXTENDED_EVENT, UPDATED_EVENT
1515
from .types import (
1616
CreateCallback,
1717
CreateEvent,
1818
EventType,
19+
ExtendCallback,
20+
ExtendEvent,
1921
TxHash,
2022
UpdateCallback,
2123
UpdateEvent,
@@ -32,7 +34,7 @@ def __init__(
3234
self,
3335
contract: Contract,
3436
event_type: EventType,
35-
callback: CreateCallback | UpdateCallback,
37+
callback: CreateCallback | UpdateCallback | ExtendCallback,
3638
from_block: str | int = "latest",
3739
auto_start: bool = True,
3840
) -> None:
@@ -48,7 +50,7 @@ def __init__(
4850
"""
4951
self.contract: Contract = contract
5052
self.event_type: EventType = event_type
51-
self.callback: CreateCallback | UpdateCallback = callback
53+
self.callback: CreateCallback | UpdateCallback | ExtendCallback = callback
5254
self.from_block: str | int = from_block
5355

5456
# Internal state
@@ -78,6 +80,9 @@ def start(self) -> None:
7880
elif self.event_type == "updated":
7981
contract_event = self.contract.events[UPDATED_EVENT]
8082
self._filter = contract_event.create_filter(from_block=self.from_block)
83+
elif self.event_type == "extended":
84+
contract_event = self.contract.events[EXTENDED_EVENT]
85+
self._filter = contract_event.create_filter(from_block=self.from_block)
8186
else:
8287
raise NotImplementedError(
8388
f"Event type {self.event_type} not yet implemented"
@@ -166,50 +171,72 @@ def _process_event(self, event_data: EventData) -> None:
166171
"""
167172
logger.info(f"Processing event: {event_data}")
168173

169-
# Extract data based on event type
170-
if self.event_type == "created":
171-
# Parse CreateEvent
172-
entity_key = to_entity_key(event_data["args"]["entityKey"])
173-
expiration_block = event_data["args"]["expirationBlock"]
174-
175-
# Ensure transaction hash has 0x prefix
176-
tx_hash_hex = event_data["transactionHash"].hex()
177-
if not tx_hash_hex.startswith("0x"):
178-
tx_hash_hex = f"0x{tx_hash_hex}"
179-
tx_hash = TxHash(HexStr(tx_hash_hex))
174+
# Extract common data
175+
entity_key = to_entity_key(event_data["args"]["entityKey"])
176+
tx_hash = self._extract_tx_hash(event_data)
180177

178+
# Create event object and trigger callback based on type
179+
if self.event_type == "created":
181180
create_event = CreateEvent(
182-
entity_key=entity_key, expiration_block=expiration_block
181+
entity_key=entity_key,
182+
expiration_block=event_data["args"]["expirationBlock"],
183+
)
184+
self._trigger_callback(
185+
cast(CreateCallback, self.callback), create_event, tx_hash
183186
)
184-
185-
# Trigger callback
186-
try:
187-
# Type narrowing: when event_type is "created", callback is CreateCallback
188-
cast(CreateCallback, self.callback)(create_event, tx_hash)
189-
except Exception as e:
190-
logger.error(f"Error in callback: {e}", exc_info=True)
191187

192188
elif self.event_type == "updated":
193-
# Parse UpdateEvent
194-
entity_key = to_entity_key(event_data["args"]["entityKey"])
195-
expiration_block = event_data["args"]["expirationBlock"]
196-
197-
# Ensure transaction hash has 0x prefix
198-
tx_hash_hex = event_data["transactionHash"].hex()
199-
if not tx_hash_hex.startswith("0x"):
200-
tx_hash_hex = f"0x{tx_hash_hex}"
201-
tx_hash = TxHash(HexStr(tx_hash_hex))
202-
203189
update_event = UpdateEvent(
204-
entity_key=entity_key, expiration_block=expiration_block
190+
entity_key=entity_key,
191+
expiration_block=event_data["args"]["expirationBlock"],
192+
)
193+
self._trigger_callback(
194+
cast(UpdateCallback, self.callback), update_event, tx_hash
205195
)
206196

207-
# Trigger callback
208-
try:
209-
# Type narrowing: when event_type is "updated", callback is UpdateCallback
210-
cast(UpdateCallback, self.callback)(update_event, tx_hash)
211-
except Exception as e:
212-
logger.error(f"Error in callback: {e}", exc_info=True)
197+
elif self.event_type == "extended":
198+
extend_event = ExtendEvent(
199+
entity_key=entity_key,
200+
old_expiration_block=event_data["args"]["oldExpirationBlock"],
201+
new_expiration_block=event_data["args"]["newExpirationBlock"],
202+
)
203+
self._trigger_callback(
204+
cast(ExtendCallback, self.callback), extend_event, tx_hash
205+
)
213206

214207
else:
215208
logger.warning(f"Unknown event type: {self.event_type}")
209+
210+
def _extract_tx_hash(self, event_data: EventData) -> TxHash:
211+
"""
212+
Extract and normalize transaction hash from event data.
213+
214+
Args:
215+
event_data: Event data from Web3 filter
216+
217+
Returns:
218+
Transaction hash with 0x prefix
219+
"""
220+
tx_hash_hex = event_data["transactionHash"].hex()
221+
if not tx_hash_hex.startswith("0x"):
222+
tx_hash_hex = f"0x{tx_hash_hex}"
223+
return TxHash(HexStr(tx_hash_hex))
224+
225+
def _trigger_callback(
226+
self,
227+
callback: CreateCallback | UpdateCallback | ExtendCallback,
228+
event: CreateEvent | UpdateEvent | ExtendEvent,
229+
tx_hash: TxHash,
230+
) -> None:
231+
"""
232+
Trigger callback with error handling.
233+
234+
Args:
235+
callback: Callback function to invoke
236+
event: Event object to pass to callback
237+
tx_hash: Transaction hash to pass to callback
238+
"""
239+
try:
240+
callback(event, tx_hash) # type: ignore[arg-type]
241+
except Exception as e:
242+
logger.error(f"Error in callback: {e}", exc_info=True)

src/arkiv/module.py

Lines changed: 100 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
DeleteOp,
2424
Entity,
2525
EntityKey,
26+
EventType,
27+
ExtendCallback,
2628
ExtendOp,
2729
Operations,
2830
TransactionReceipt,
@@ -397,35 +399,16 @@ def watch_entity_created(
397399
"""
398400
Watch for entity creation events.
399401
400-
Args:
401-
callback: Function to call when a creation event is detected.
402-
Receives (CreateEvent, TxHash) as arguments.
403-
from_block: Starting block for the filter ('latest' or block number)
404-
auto_start: If True, starts polling immediately
405-
406-
Returns:
407-
EventFilter instance for controlling the watch
402+
Creates an event filter that monitors entity creation events. The callback
403+
receives (CreateEvent, TxHash) for each created entity.
408404
409-
Example:
410-
def on_create(event: CreateEvent, tx_hash: TxHash) -> None:
411-
print(f"Entity created: {event.entity_key}")
412-
413-
filter = arkiv.watch_entity_created(on_create)
414-
# ... later ...
415-
filter.stop()
405+
See `_watch_entity_event` for detailed documentation on parameters, return
406+
value, error handling, and usage examples.
416407
"""
417-
event_filter = EventFilter(
418-
contract=self.contract,
419-
event_type="created",
420-
callback=callback,
421-
from_block=from_block,
422-
auto_start=auto_start,
408+
return self._watch_entity_event(
409+
"created", callback, from_block=from_block, auto_start=auto_start
423410
)
424411

425-
# Track the filter for cleanup
426-
self._active_filters.append(event_filter)
427-
return event_filter
428-
429412
def watch_entity_updated(
430413
self,
431414
callback: UpdateCallback,
@@ -436,15 +419,87 @@ def watch_entity_updated(
436419
"""
437420
Watch for entity update events.
438421
439-
This method creates an event filter that monitors for entity updates on the
440-
Arkiv storage contract. The callback is invoked each time an entity is updated,
441-
receiving details about the update event and the transaction hash.
422+
Creates an event filter that monitors entity update events. The callback
423+
receives (UpdateEvent, TxHash) for each updated entity.
424+
425+
See `_watch_entity_event` for detailed documentation on parameters, return
426+
value, error handling, and usage examples.
427+
"""
428+
return self._watch_entity_event(
429+
"updated", callback, from_block=from_block, auto_start=auto_start
430+
)
431+
432+
def watch_entity_extended(
433+
self,
434+
callback: ExtendCallback,
435+
*,
436+
from_block: str | int = "latest",
437+
auto_start: bool = True,
438+
) -> EventFilter:
439+
"""
440+
Watch for entity extension events.
441+
442+
Creates an event filter that monitors entity lifetime extension events. The
443+
callback receives (ExtendEvent, TxHash) for each extended entity.
444+
445+
See `_watch_entity_event` for detailed documentation on parameters, return
446+
value, error handling, and usage examples.
447+
"""
448+
return self._watch_entity_event(
449+
"extended", callback, from_block=from_block, auto_start=auto_start
450+
)
451+
452+
def cleanup_filters(self) -> None:
453+
"""
454+
Stop and uninstall all active event filters.
455+
456+
This is automatically called when the Arkiv client exits its context,
457+
but can be called manually if needed.
458+
"""
459+
if not self._active_filters:
460+
logger.debug("No active filters to cleanup")
461+
return
462+
463+
logger.info(
464+
f"Cleaning up {len(self._active_filters)} active event filter(s)..."
465+
)
466+
467+
for event_filter in self._active_filters:
468+
try:
469+
event_filter.uninstall()
470+
except Exception as e:
471+
logger.warning(f"Error cleaning up filter: {e}")
472+
473+
self._active_filters.clear()
474+
logger.info("All event filters cleaned up")
475+
476+
@property
477+
def active_filters(self) -> list[EventFilter]:
478+
"""Get a copy of currently active event filters."""
479+
return list(self._active_filters)
480+
481+
def _watch_entity_event(
482+
self,
483+
event_type: EventType,
484+
callback: CreateCallback | UpdateCallback | ExtendCallback,
485+
*,
486+
from_block: str | int = "latest",
487+
auto_start: bool = True,
488+
) -> EventFilter:
489+
"""
490+
Internal method to watch for entity events.
491+
492+
This method creates an event filter that monitors for entity events on the
493+
Arkiv storage contract. The callback is invoked each time the specified event
494+
occurs, receiving details about the event and the transaction hash.
442495
443496
Args:
444-
callback: Function to call when an update event is detected.
445-
Receives (UpdateEvent, TxHash) as arguments.
497+
event_type: Type of event to watch for ("created", "updated", "extended")
498+
callback: Function to call when an event is detected.
499+
Receives (Event, TxHash) as arguments where Event is one of:
500+
CreateEvent, UpdateEvent, or ExtendEvent depending on event_type.
446501
from_block: Starting block for the filter. Can be:
447-
- "latest": Only watch for new updates (default)
502+
- "latest": Only watch for new events (default)
448503
- Block number (int): Watch from a specific historical block
449504
auto_start: If True, starts polling immediately (default: True).
450505
If False, you must manually call filter.start()
@@ -462,44 +517,44 @@ def watch_entity_updated(
462517
463518
Example:
464519
Basic usage with automatic start:
465-
>>> def on_update(event: UpdateEvent, tx_hash: TxHash) -> None:
466-
... print(f"Entity updated: {event.entity_key}")
467-
... print(f"New expiration: {event.expiration_block}")
520+
>>> def on_event(event: CreateEvent, tx_hash: TxHash) -> None:
521+
... print(f"Event occurred: {event.entity_key}")
468522
...
469-
>>> filter = arkiv.watch_entity_updated(on_update)
470-
>>> # Filter is now running and will call on_update for each update
523+
>>> filter = arkiv._watch_entity_event("created", on_event)
524+
>>> # Filter is now running and will call on_event for each event
471525
>>> # ... later ...
472526
>>> filter.stop() # Pause watching
473527
>>> filter.uninstall() # Cleanup resources
474528
475529
Manual start/stop control:
476-
>>> def on_update(event: UpdateEvent, tx_hash: TxHash) -> None:
477-
... print(f"Updated: {event.entity_key}")
530+
>>> def on_event(event: UpdateEvent, tx_hash: TxHash) -> None:
531+
... print(f"Event occurred: {event.entity_key}")
478532
...
479-
>>> filter = arkiv.watch_entity_updated(on_update, auto_start=False)
533+
>>> filter = arkiv._watch_entity_event("updated", on_event, auto_start=False)
480534
>>> # Do some setup work...
481535
>>> filter.start() # Begin watching
482536
>>> # ... later ...
483537
>>> filter.stop() # Stop watching
484538
>>> filter.uninstall() # Cleanup
485539
486-
Historical updates from specific block:
487-
>>> filter = arkiv.watch_entity_updated(
488-
... on_update,
540+
Historical events from specific block:
541+
>>> filter = arkiv._watch_entity_event(
542+
... "extended",
543+
... on_event,
489544
... from_block=1000 # Start from block 1000
490545
... )
491546
492547
Note:
493-
- Only captures UPDATE events (not creates, deletes, or extends)
494-
- With from_block="latest", misses updates before filter creation
548+
- Only captures the specified event type (not other lifecycle events)
549+
- With from_block="latest", misses events before filter creation
495550
- Filter must be uninstalled via filter.uninstall() to free resources
496551
- All active filters are automatically cleaned up when Arkiv client
497552
context exits
498553
- Callback exceptions are caught and logged but don't stop the filter
499554
"""
500555
event_filter = EventFilter(
501556
contract=self.contract,
502-
event_type="updated",
557+
event_type=event_type,
503558
callback=callback,
504559
from_block=from_block,
505560
auto_start=auto_start,
@@ -509,35 +564,6 @@ def watch_entity_updated(
509564
self._active_filters.append(event_filter)
510565
return event_filter
511566

512-
@property
513-
def active_filters(self) -> list[EventFilter]:
514-
"""Get a copy of currently active event filters."""
515-
return list(self._active_filters)
516-
517-
def cleanup_filters(self) -> None:
518-
"""
519-
Stop and uninstall all active event filters.
520-
521-
This is automatically called when the Arkiv client exits its context,
522-
but can be called manually if needed.
523-
"""
524-
if not self._active_filters:
525-
logger.debug("No active filters to cleanup")
526-
return
527-
528-
logger.info(
529-
f"Cleaning up {len(self._active_filters)} active event filter(s)..."
530-
)
531-
532-
for event_filter in self._active_filters:
533-
try:
534-
event_filter.uninstall()
535-
except Exception as e:
536-
logger.warning(f"Error cleaning up filter: {e}")
537-
538-
self._active_filters.clear()
539-
logger.info("All event filters cleaned up")
540-
541567
def _get_owner(self, metadata: dict[str, Any]) -> ChecksumAddress:
542568
"""Get the owner address of the given entity."""
543569
owner_metadata = metadata.get("owner")

0 commit comments

Comments
 (0)