Skip to content

Commit 1671fd0

Browse files
author
Matthias Zimmermann
committed
feat: add owner changed watching for (Async)Arkiv, cleanup event watching
1 parent 468fd84 commit 1671fd0

13 files changed

+728
-199
lines changed

README.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@ Arkiv is a permissioned storage system for decentralized apps, supporting flexib
44

55
The Arkiv SDK is the official Python library for interacting with Arkiv networks. It offers a type-safe, developer-friendly API for managing entities, querying data, subscribing to events, and offchain verification—ideal for both rapid prototyping and production use.
66

7-
## TODO AsyncArkiv
8-
9-
- check for duplicate code with Arkiv
10-
- refactor
11-
- re-check testing
12-
- commit
13-
- move to ArkivModule for async usage
14-
157
## Architecture
168

179
Principles:

src/arkiv/events.py

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@
55
import logging
66
import threading
77
import time
8-
from typing import TYPE_CHECKING, cast
8+
from typing import TYPE_CHECKING
99

1010
from web3._utils.filters import LogFilter
1111
from web3.contract import Contract
12-
from web3.types import EventData, LogReceipt
12+
from web3.types import LogReceipt
13+
14+
from arkiv.utils import get_tx_hash, to_event
1315

1416
from .events_base import EventFilterBase
1517
from .types import (
18+
ChangeOwnerCallback,
1619
CreateCallback,
1720
DeleteCallback,
1821
EventType,
@@ -26,7 +29,13 @@
2629
logger = logging.getLogger(__name__)
2730

2831
# Union of all sync callback types
29-
SyncCallback = CreateCallback | UpdateCallback | ExtendCallback | DeleteCallback
32+
SyncCallback = (
33+
CreateCallback
34+
| UpdateCallback
35+
| ExtendCallback
36+
| DeleteCallback
37+
| ChangeOwnerCallback
38+
)
3039

3140

3241
class EventFilter(EventFilterBase[SyncCallback]):
@@ -78,36 +87,41 @@ def start(self) -> None:
7887
logger.info(f"Starting event filter for {self.event_type}")
7988

8089
# Create the Web3 filter using base class helper
81-
self._filter = self._create_filter()
90+
# Note: base class returns LogFilter | AsyncLogFilter, but for sync we only use LogFilter
91+
filter_result = self._create_filter()
92+
assert isinstance(filter_result, LogFilter), (
93+
"Expected LogFilter for sync client"
94+
)
95+
self._filter = filter_result
8296

8397
# Start polling thread
8498
self._running = True
8599
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
86100
self._thread.start()
87101

88-
logger.info(f"Event filter for {self.event_type} started")
102+
logger.info(f"Event filter for '{self.event_type}' started")
89103

90104
def stop(self) -> None:
91105
"""
92106
Stop polling for events.
93107
"""
94108
if not self._running:
95-
logger.warning(f"Filter for {self.event_type} is not running")
109+
logger.warning(f"Filter for '{self.event_type}' is not running")
96110
return
97111

98-
logger.info(f"Stopping event filter for {self.event_type}")
112+
logger.info(f"Stopping event filter for '{self.event_type}'")
99113
self._running = False
100114

101115
# Wait for thread to finish
102116
if self._thread:
103117
self._thread.join(timeout=5.0)
104118
self._thread = None
105119

106-
logger.info(f"Event filter for {self.event_type} stopped")
120+
logger.info(f"Event filter for '{self.event_type}' stopped")
107121

108122
def uninstall(self) -> None:
109123
"""Uninstall the filter and cleanup resources."""
110-
logger.info(f"Uninstalling event filter for {self.event_type}")
124+
logger.info(f"Uninstalling event filter for '{self.event_type}'")
111125

112126
# Stop polling if running
113127
if self._running:
@@ -126,13 +140,10 @@ def _poll_loop(self) -> None:
126140
try:
127141
# Get new entries from filter
128142
if self._filter:
129-
new_entries: list[LogReceipt] = self._filter.get_new_entries()
130-
131-
for entry in new_entries:
143+
logs: list[LogReceipt] = self._filter.get_new_entries()
144+
for log in logs:
132145
try:
133-
# LogFilter from contract event has log_entry_formatter that
134-
# converts LogReceipt to EventData, but type system shows LogReceipt
135-
self._process_event(cast(EventData, entry))
146+
self._process_log(log)
136147
except Exception as e:
137148
logger.error(f"Error processing event: {e}", exc_info=True)
138149

@@ -145,18 +156,20 @@ def _poll_loop(self) -> None:
145156

146157
logger.debug(f"Poll loop ended for {self.event_type}")
147158

148-
def _process_event(self, event_data: EventData) -> None:
159+
def _process_log(self, log: LogReceipt) -> None:
149160
"""
150-
Process a single event and trigger sync callback.
161+
Process a single log receipt and trigger sync callback.
151162
152-
Args:
153-
event_data: Event data from Web3 filter
163+
Only processes logs from the contract address we're monitoring.
164+
Logs from other contracts are silently skipped.
154165
"""
155-
# Use base class to parse event data
156-
event, tx_hash = self._parse_event_data(event_data)
157-
158-
# Trigger sync callback with error handling
159166
try:
167+
# to_event handles both raw logs and already-processed EventData
168+
event = to_event(self.contract, log)
169+
tx_hash = get_tx_hash(log)
170+
171+
logger.info(f"Starting callback for hash: {tx_hash} and event: {event}")
160172
self.callback(event, tx_hash) # type: ignore[arg-type]
173+
161174
except Exception as e:
162175
logger.error(f"Error in callback: {e}", exc_info=True)

src/arkiv/events_async.py

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44

55
import asyncio
66
import logging
7-
from typing import TYPE_CHECKING, Any, cast
7+
from typing import TYPE_CHECKING, Any
88

99
from web3._utils.filters import LogFilter
1010
from web3.contract import Contract
11-
from web3.types import EventData, LogReceipt
11+
from web3.types import LogReceipt
12+
13+
from arkiv.utils import get_tx_hash, to_event
1214

1315
from .events_base import EventFilterBase
1416
from .types import (
17+
AsyncChangeOwnerCallback,
1518
AsyncCreateCallback,
1619
AsyncDeleteCallback,
1720
AsyncExtendCallback,
@@ -30,6 +33,7 @@
3033
| AsyncUpdateCallback
3134
| AsyncExtendCallback
3235
| AsyncDeleteCallback
36+
| AsyncChangeOwnerCallback
3337
)
3438

3539

@@ -70,7 +74,7 @@ def __init__(
7074
self._task: asyncio.Task[None] | None = None
7175
self._filter: LogFilter | None = None
7276

73-
async def _create_filter(self) -> Any:
77+
async def _create_filter(self) -> Any: # type: ignore[override]
7478
"""
7579
Create a Web3 contract event filter for async HTTP polling.
7680
@@ -83,7 +87,10 @@ async def _create_filter(self) -> Any:
8387
"""
8488
event_name = self._get_contract_event_name()
8589
contract_event = self.contract.events[event_name]
86-
return await contract_event.create_filter(from_block=self.from_block)
90+
return await contract_event.create_filter(
91+
from_block=self.from_block,
92+
address=self.contract.address,
93+
)
8794

8895
async def start(self) -> None:
8996
"""
@@ -147,15 +154,10 @@ async def _poll_loop(self) -> None:
147154
try:
148155
# Get new entries from filter
149156
if self._filter:
150-
# For async providers, get_new_entries() returns a coroutine
151-
# Type system doesn't reflect this, so we need to ignore the type error
152-
new_entries: list[LogReceipt] = await self._filter.get_new_entries() # type: ignore[misc]
153-
154-
for entry in new_entries:
157+
logs: list[LogReceipt] = await self._filter.get_new_entries() # type: ignore[misc]
158+
for log in logs:
155159
try:
156-
# LogFilter from contract event has log_entry_formatter that
157-
# converts LogReceipt to EventData, but type system shows LogReceipt
158-
await self._process_event(cast(EventData, entry))
160+
await self._process_log(log)
159161
except Exception as e:
160162
logger.error(f"Error processing event: {e}", exc_info=True)
161163

@@ -171,18 +173,30 @@ async def _poll_loop(self) -> None:
171173

172174
logger.debug(f"Async poll loop ended for {self.event_type}")
173175

174-
async def _process_event(self, event_data: EventData) -> None:
176+
async def _process_log(self, log: LogReceipt) -> None:
175177
"""
176178
Process a single event and trigger async callback.
177179
178-
Args:
179-
event_data: Event data from Web3 filter
180+
Only processes logs from the contract address we're monitoring.
181+
Logs from other contracts are silently skipped.
180182
"""
181-
# Use base class to parse event data
182-
event, tx_hash = self._parse_event_data(event_data)
183-
184-
# Trigger async callback with error handling
185183
try:
184+
# Defensive check: Only process logs from our contract
185+
log_address = log.get("address")
186+
if log_address and log_address.lower() != self.contract.address.lower():
187+
logger.debug(
188+
f"Skipping log from different contract: {log_address} "
189+
f"(expected {self.contract.address})"
190+
)
191+
return
192+
193+
# to_event handles both raw logs and already-processed EventData
194+
event = to_event(self.contract, log)
195+
tx_hash = get_tx_hash(log)
196+
197+
logger.info(f"Starting callback for hash: {tx_hash} and event: {event}")
198+
186199
await self.callback(event, tx_hash) # type: ignore[arg-type]
200+
187201
except Exception as e:
188202
logger.error(f"Error in async callback: {e}", exc_info=True)

src/arkiv/events_base.py

Lines changed: 23 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,19 @@
66
from abc import ABC, abstractmethod
77
from typing import TYPE_CHECKING, Any, Generic, TypeVar
88

9-
from eth_typing import HexStr
10-
from web3.types import EventData
11-
129
from .contract import EVENTS
1310
from .types import (
1411
CreateEvent,
1512
DeleteEvent,
1613
EventType,
1714
ExtendEvent,
18-
TxHash,
1915
UpdateEvent,
2016
)
21-
from .utils import to_entity_key
2217

2318
if TYPE_CHECKING:
19+
from web3._utils.filters import AsyncLogFilter, LogFilter
2420
from web3.contract import Contract
21+
from web3.contract.contract import ContractEvent
2522

2623
logger = logging.getLogger(__name__)
2724

@@ -85,7 +82,7 @@ def _get_contract_event_name(self) -> str:
8582
Get the Web3 contract event name for this event type.
8683
8784
Returns:
88-
Contract event name (e.g., "GolemBaseStorageEntityCreated")
85+
Contract event name (e.g., "ArkivEntityCreated")
8986
9087
Raises:
9188
NotImplementedError: If event type is not supported
@@ -96,11 +93,15 @@ def _get_contract_event_name(self) -> str:
9693
)
9794
return EVENTS[self.event_type]
9895

99-
def _create_filter(self) -> Any:
96+
def _create_filter(self) -> LogFilter | AsyncLogFilter:
10097
"""
10198
Create a Web3 contract event filter for HTTP polling.
10299
103100
This method creates a LogFilter using the contract's create_filter method.
101+
The filter is automatically configured to:
102+
- Only receive events matching the specific event signature (topic[0])
103+
- Only receive events from this contract's address
104+
104105
Subclasses that use different subscription mechanisms (e.g., WebSocket)
105106
should override this method to return an appropriate subscription handle.
106107
@@ -112,83 +113,21 @@ def _create_filter(self) -> Any:
112113
override to create subscription handles via provider-specific APIs.
113114
"""
114115
event_name = self._get_contract_event_name()
115-
contract_event = self.contract.events[event_name]
116-
return contract_event.create_filter(from_block=self.from_block)
117-
118-
def _extract_tx_hash(self, event_data: EventData) -> TxHash:
119-
"""
120-
Extract and normalize transaction hash from event data.
121-
122-
Args:
123-
event_data: Event data from Web3 filter
124-
125-
Returns:
126-
Transaction hash with 0x prefix
127-
"""
128-
tx_hash_hex = event_data["transactionHash"].hex()
129-
if not tx_hash_hex.startswith("0x"):
130-
tx_hash_hex = f"0x{tx_hash_hex}"
131-
return TxHash(HexStr(tx_hash_hex))
132-
133-
# TODO (1) check/match against utils.py::to_receipt
134-
# TODO (2) update to new log/event definition in contract.py
135-
def _parse_event_data(self, event_data: EventData) -> tuple[EventObject, TxHash]:
136-
"""
137-
Parse event data and create appropriate event object.
138-
139-
This method contains the shared logic for parsing events from Web3.
140-
It does NOT trigger the callback - that's done by subclasses to allow
141-
for sync vs async callback invocation.
142-
143-
Args:
144-
event_data: Event data from Web3 filter
145-
146-
Returns:
147-
Tuple of (event_object, tx_hash)
148-
149-
Raises:
150-
ValueError: If event_type is unknown
151-
"""
152-
logger.info(f"Parsing event: {event_data}")
153-
154-
# Extract common data
155-
entity_key = to_entity_key(event_data["args"]["entityKey"])
156-
tx_hash = self._extract_tx_hash(event_data)
157-
158-
# Create event object based on type
159-
event: EventObject
160-
if self.event_type == "created":
161-
event = CreateEvent(
162-
entity_key=entity_key,
163-
owner_address=event_data["args"]["ownerAddress"],
164-
expiration_block=event_data["args"]["expirationBlock"],
165-
cost=event_data["args"]["cost"],
166-
)
167-
elif self.event_type == "updated":
168-
event = UpdateEvent(
169-
entity_key=entity_key,
170-
owner_address=event_data["args"]["ownerAddress"],
171-
old_expiration_block=event_data["args"]["oldExpirationBlock"],
172-
new_expiration_block=event_data["args"]["newExpirationBlock"],
173-
cost=event_data["args"]["cost"],
174-
)
175-
elif self.event_type == "extended":
176-
event = ExtendEvent(
177-
entity_key=entity_key,
178-
owner_address=event_data["args"]["ownerAddress"],
179-
old_expiration_block=event_data["args"]["oldExpirationBlock"],
180-
new_expiration_block=event_data["args"]["newExpirationBlock"],
181-
cost=event_data["args"]["cost"],
182-
)
183-
elif self.event_type == "deleted":
184-
event = DeleteEvent(
185-
entity_key=entity_key,
186-
owner_address=event_data["args"]["ownerAddress"],
187-
)
188-
else:
189-
raise ValueError(f"Unknown event type: {self.event_type}")
190-
191-
return event, tx_hash
116+
contract_event: ContractEvent = self.contract.events[event_name]
117+
118+
# Create filter with explicit address filtering
119+
# The ContractEvent.create_filter automatically sets the event signature
120+
# as topic[0], and we explicitly filter by contract address
121+
filter: LogFilter | AsyncLogFilter = contract_event.create_filter(
122+
from_block=self.from_block,
123+
address=self.contract.address,
124+
)
125+
126+
logger.info(
127+
f"Created filter for event {event_name} from block {self.from_block} "
128+
f"at address {self.contract.address}: {filter}"
129+
)
130+
return filter
192131

193132
# Abstract methods that subclasses must implement
194133
@abstractmethod

0 commit comments

Comments
 (0)