Skip to content

Commit 6b3e803

Browse files
add watching of entity creation events (http polling)
1 parent c9a96db commit 6b3e803

File tree

12 files changed

+692
-20
lines changed

12 files changed

+692
-20
lines changed

src/arkiv/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from .account import NamedAccount
44
from .client import Arkiv
5+
from .events import EventFilter
56
from .node import ArkivNode
67
from .types import (
78
CreateEvent,
@@ -17,6 +18,7 @@
1718
"ArkivNode",
1819
"CreateEvent",
1920
"DeleteEvent",
21+
"EventFilter",
2022
"ExtendEvent",
2123
"NamedAccount",
2224
"TransactionReceipt",

src/arkiv/client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,17 @@ def __exit__(
128128
exc_val: BaseException | None,
129129
exc_tb: Any,
130130
) -> None:
131+
# Cleanup event filters first
132+
logger.debug("Cleaning up event filters...")
133+
self.arkiv.cleanup_filters()
134+
135+
# Then stop the node if managed
131136
if self.node:
132137
logger.debug("Stopping managed ArkivNode...")
133138
self.node.stop()
134139

135140
def __del__(self) -> None:
136-
if self.node and self.node.is_running():
141+
if self.node and self.node.is_running:
137142
logger.warning(
138143
"Arkiv client with managed node is being destroyed but node is still running. "
139144
"Call arkiv.node.stop() or use context manager: 'with Arkiv() as arkiv:'"

src/arkiv/events.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""Event filtering for Arkiv entity events."""
2+
3+
import logging
4+
import threading
5+
import time
6+
from typing import Any, cast
7+
8+
from eth_typing import HexStr
9+
from web3._utils.filters import LogFilter
10+
from web3.contract import Contract
11+
from web3.contract.contract import ContractEvent
12+
from web3.types import EventData, LogReceipt
13+
14+
from .contract import CREATED_EVENT
15+
from .types import CreateCallback, CreateEvent, EventType, TxHash
16+
from .utils import to_entity_key
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class EventFilter:
22+
"""Handle for watching entity events."""
23+
24+
def __init__(
25+
self,
26+
contract: Contract,
27+
event_type: EventType,
28+
callback: CreateCallback,
29+
from_block: str | int = "latest",
30+
auto_start: bool = True,
31+
) -> None:
32+
"""
33+
Initialize event filter.
34+
35+
Args:
36+
contract: Web3 contract instance
37+
event_type: Type of event to watch
38+
callback: Callback function for the event
39+
from_block: Starting block for the filter
40+
auto_start: If True, starts polling immediately
41+
"""
42+
self.contract: Contract = contract
43+
self.event_type: EventType = event_type
44+
self.callback: CreateCallback = callback
45+
self.from_block: str | int = from_block
46+
47+
# Internal state
48+
self._filter: LogFilter | None = None
49+
self._running: bool = False
50+
self._thread: threading.Thread | None = None
51+
self._poll_interval: float = 2.0 # seconds
52+
53+
if auto_start:
54+
self.start()
55+
56+
def start(self) -> None:
57+
"""
58+
Start polling for events.
59+
"""
60+
if self._running:
61+
logger.warning(f"Filter for {self.event_type} is already running")
62+
return
63+
64+
logger.info(f"Starting event filter for {self.event_type}")
65+
66+
# Create the Web3 filter
67+
if self.event_type == "created":
68+
event: ContractEvent = self.contract.events[CREATED_EVENT]
69+
self._filter = event.create_filter(from_block=self.from_block)
70+
else:
71+
raise NotImplementedError(
72+
f"Event type {self.event_type} not yet implemented"
73+
)
74+
75+
# Start polling thread
76+
self._running = True
77+
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
78+
self._thread.start()
79+
80+
logger.info(f"Event filter for {self.event_type} started")
81+
82+
def stop(self) -> None:
83+
"""
84+
Stop polling for events.
85+
"""
86+
if not self._running:
87+
logger.warning(f"Filter for {self.event_type} is not running")
88+
return
89+
90+
logger.info(f"Stopping event filter for {self.event_type}")
91+
self._running = False
92+
93+
if self._thread:
94+
self._thread.join(timeout=5.0)
95+
self._thread = None
96+
97+
logger.info(f"Event filter for {self.event_type} stopped")
98+
99+
@property
100+
def is_running(self) -> bool:
101+
"""
102+
Check if the filter is currently running.
103+
104+
Returns:
105+
True if the filter's polling loop is active, False otherwise
106+
"""
107+
return self._running
108+
109+
def uninstall(self) -> None:
110+
"""Uninstall the filter and cleanup resources."""
111+
logger.info(f"Uninstalling event filter for {self.event_type}")
112+
113+
# Stop polling if running
114+
if self._running:
115+
self.stop()
116+
117+
# Clear filter reference (Web3 filters don't have uninstall method)
118+
self._filter = None
119+
120+
logger.info(f"Event filter for {self.event_type} uninstalled")
121+
122+
def _poll_loop(self) -> None:
123+
"""Background polling loop for events."""
124+
logger.debug(f"Poll loop started for {self.event_type}")
125+
126+
while self._running:
127+
try:
128+
# Get new entries from filter
129+
if self._filter:
130+
new_entries: list[LogReceipt] = self._filter.get_new_entries()
131+
132+
for entry in new_entries:
133+
try:
134+
# LogFilter from contract event has log_entry_formatter that
135+
# converts LogReceipt to EventData, but type system shows LogReceipt
136+
self._process_event(cast(EventData, entry))
137+
except Exception as e:
138+
logger.error(
139+
f"Error processing event: {e}", exc_info=True
140+
)
141+
142+
# Sleep before next poll
143+
time.sleep(self._poll_interval)
144+
145+
except Exception as e:
146+
logger.error(f"Error in poll loop: {e}", exc_info=True)
147+
time.sleep(self._poll_interval)
148+
149+
logger.debug(f"Poll loop ended for {self.event_type}")
150+
151+
def _process_event(self, event_data: EventData) -> None:
152+
"""
153+
Process a single event and trigger callback.
154+
155+
Args:
156+
event_data: Event data from Web3 filter
157+
"""
158+
logger.info(f"Processing event: {event_data}")
159+
160+
# Extract data based on event type
161+
if self.event_type == "created":
162+
# Parse CreateEvent
163+
entity_key = to_entity_key(event_data["args"]["entityKey"])
164+
expiration_block = event_data["args"]["expirationBlock"]
165+
166+
# Ensure transaction hash has 0x prefix
167+
tx_hash_hex = event_data["transactionHash"].hex()
168+
if not tx_hash_hex.startswith("0x"):
169+
tx_hash_hex = f"0x{tx_hash_hex}"
170+
tx_hash = TxHash(HexStr(tx_hash_hex))
171+
172+
event = CreateEvent(
173+
entity_key=entity_key, expiration_block=expiration_block
174+
)
175+
176+
# Trigger callback
177+
try:
178+
self.callback(event, tx_hash)
179+
except Exception as e:
180+
logger.error(f"Error in callback: {e}", exc_info=True)
181+
else:
182+
logger.warning(f"Unknown event type: {self.event_type}")

src/arkiv/module.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
from arkiv.account import NamedAccount
1212

1313
from .contract import EVENTS_ABI, FUNCTIONS_ABI, STORAGE_ADDRESS
14+
from .events import EventFilter
1415
from .types import (
1516
ALL,
1617
ANNOTATIONS,
1718
METADATA,
1819
PAYLOAD,
1920
Annotations,
21+
CreateCallback,
2022
CreateOp,
2123
DeleteOp,
2224
Entity,
@@ -55,6 +57,9 @@ def __init__(self, client: "Arkiv") -> None:
5557
for event in self.contract.all_events():
5658
logger.debug(f"Entity event {event.topic}: {event.signature}")
5759

60+
# Track active event filters for cleanup
61+
self._active_filters: list[EventFilter] = []
62+
5863
def is_available(self) -> bool:
5964
"""Check if Arkiv functionality is available. Should always be true for Arkiv clients."""
6065
return True
@@ -408,3 +413,72 @@ def _get_entity_metadata(self, entity_key: EntityKey) -> dict[str, Any]:
408413
metadata: dict[str, Any] = self.client.eth.get_entity_metadata(entity_key) # type: ignore[attr-defined]
409414
logger.debug(f"Raw metadata: {metadata}")
410415
return metadata
416+
417+
def watch_entity_created(
418+
self,
419+
callback: CreateCallback,
420+
*,
421+
from_block: str | int = "latest",
422+
auto_start: bool = True,
423+
) -> EventFilter:
424+
"""
425+
Watch for entity creation events.
426+
427+
Args:
428+
callback: Function to call when a creation event is detected.
429+
Receives (CreateEvent, TxHash) as arguments.
430+
from_block: Starting block for the filter ('latest' or block number)
431+
auto_start: If True, starts polling immediately
432+
433+
Returns:
434+
EventFilter instance for controlling the watch
435+
436+
Example:
437+
def on_create(event: CreateEvent, tx_hash: TxHash) -> None:
438+
print(f"Entity created: {event.entity_key}")
439+
440+
filter = arkiv.watch_entity_created(on_create)
441+
# ... later ...
442+
filter.stop()
443+
"""
444+
event_filter = EventFilter(
445+
contract=self.contract,
446+
event_type="created",
447+
callback=callback,
448+
from_block=from_block,
449+
auto_start=auto_start,
450+
)
451+
452+
# Track the filter for cleanup
453+
self._active_filters.append(event_filter)
454+
455+
return event_filter
456+
457+
@property
458+
def active_filters(self) -> list[EventFilter]:
459+
"""Get a copy of currently active event filters."""
460+
return list(self._active_filters)
461+
462+
def cleanup_filters(self) -> None:
463+
"""
464+
Stop and uninstall all active event filters.
465+
466+
This is automatically called when the Arkiv client exits its context,
467+
but can be called manually if needed.
468+
"""
469+
if not self._active_filters:
470+
logger.debug("No active filters to cleanup")
471+
return
472+
473+
logger.info(
474+
f"Cleaning up {len(self._active_filters)} active event filter(s)..."
475+
)
476+
477+
for event_filter in self._active_filters:
478+
try:
479+
event_filter.uninstall()
480+
except Exception as e:
481+
logger.warning(f"Error cleaning up filter: {e}")
482+
483+
self._active_filters.clear()
484+
logger.info("All event filters cleaned up")

src/arkiv/node.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ def container(self) -> DockerContainer:
219219
)
220220
return self._container
221221

222+
@property
222223
def is_external(self) -> bool:
223224
"""
224225
Check if this node is configured as an external node.
@@ -228,6 +229,7 @@ def is_external(self) -> bool:
228229
"""
229230
return self._is_external
230231

232+
@property
231233
def is_running(self) -> bool:
232234
"""
233235
Check if the node is currently running.
@@ -351,7 +353,7 @@ def fund_account(self, account: NamedAccount) -> None:
351353
f"Cannot fund account on external node - account {account.name} must be pre-funded via external means"
352354
)
353355

354-
if not self.is_running():
356+
if not self.is_running:
355357
msg = "Node is not running. Call start() first."
356358
raise RuntimeError(msg)
357359

src/arkiv/provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def node(self, arkiv_node: ArkivNode | None = None) -> ProviderBuilder:
137137
arkiv_node = ArkivNode()
138138

139139
# Auto-start the node if not running
140-
if not arkiv_node.is_running():
140+
if not arkiv_node.is_running:
141141
logger.debug("Auto-starting managed ArkivNode...")
142142
arkiv_node.start()
143143

src/arkiv/types.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
"""Arkiv SDK Types."""
1+
"""Type definitions for the Arkiv SDK."""
22

3-
from collections.abc import Sequence
3+
from collections.abc import Callable, Sequence
44
from dataclasses import dataclass
5-
from typing import NewType
5+
from typing import Literal, NewType
66

77
from eth_typing import ChecksumAddress, HexStr
88
from web3.datastructures import AttributeDict
@@ -181,6 +181,15 @@ class TransactionReceipt:
181181
deletes: Sequence[DeleteEvent]
182182

183183

184+
# Event callback types
185+
CreateCallback = Callable[[CreateEvent, TxHash], None]
186+
UpdateCallback = Callable[[UpdateEvent, TxHash], None]
187+
DeleteCallback = Callable[[DeleteEvent, TxHash], None]
188+
ExtendCallback = Callable[[ExtendEvent, TxHash], None]
189+
190+
# Event type literal
191+
EventType = Literal["created", "updated", "deleted", "extended"]
192+
184193
# Low level annotations for RLP encoding
185194
StringAnnotationsRlp = NewType("StringAnnotationsRlp", list[tuple[str, str]])
186195
NumericAnnotationsRlp = NewType("NumericAnnotationsRlp", list[tuple[str, int]])

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def arkiv_node() -> Generator[ArkivNode, None, None]:
114114
def arkiv_client_http(arkiv_node: ArkivNode, account_1: NamedAccount) -> Arkiv:
115115
"""Return Arkiv client with funded account connected via HTTP."""
116116
# Fund the account using the node (only for local containerized nodes)
117-
if not arkiv_node.is_external():
117+
if not arkiv_node.is_external:
118118
arkiv_node.fund_account(account_1)
119119

120120
# Create provider and client
@@ -151,7 +151,7 @@ def account_2(arkiv_node: ArkivNode) -> NamedAccount:
151151
account = create_account(2, BOB)
152152

153153
# Fund the account using the node (only for local containerized nodes)
154-
if not arkiv_node.is_external():
154+
if not arkiv_node.is_external:
155155
arkiv_node.fund_account(account)
156156

157157
return account

0 commit comments

Comments
 (0)