Skip to content

Commit 4fc2776

Browse files
author
Matthias Zimmermann
committed
adjust to new arkiv node for AsyncArkiv
1 parent e883f7b commit 4fc2776

File tree

7 files changed

+163
-174
lines changed

7 files changed

+163
-174
lines changed

src/arkiv/module.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def create_entity(
102102
create_op = CreateOp(
103103
payload=payload, content_type=content_type, annotations=annotations, btl=btl
104104
)
105-
logger.info(f"Creating entity:{create_op}")
106105

107106
# Wrap in Operations container and execute
108107
operations = Operations(creates=[create_op])

src/arkiv/module_async.py

Lines changed: 60 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,38 @@
66
import logging
77
from typing import TYPE_CHECKING, Any
88

9-
from eth_typing import ChecksumAddress, HexStr
9+
from eth_typing import HexStr
1010
from web3.types import TxParams, TxReceipt
1111

1212
from .events_async import AsyncEventFilter
1313
from .module_base import ArkivModuleBase
1414
from .types import (
1515
ALL,
16-
ANNOTATIONS,
17-
METADATA,
18-
PAYLOAD,
16+
NONE,
17+
QUERY_OPTIONS_DEFAULT,
1918
Annotations,
2019
AsyncCreateCallback,
2120
AsyncDeleteCallback,
2221
AsyncExtendCallback,
2322
AsyncUpdateCallback,
2423
CreateOp,
25-
Cursor,
2624
DeleteOp,
2725
Entity,
2826
EntityKey,
2927
EventType,
3028
ExtendOp,
3129
Operations,
30+
QueryOptions,
3231
QueryResult,
3332
TransactionReceipt,
3433
TxHash,
3534
UpdateOp,
3635
)
37-
from .utils import merge_annotations, to_tx_params
36+
from .utils import (
37+
to_query_result,
38+
to_rpc_query_options,
39+
to_tx_params,
40+
)
3841

3942
# Deal with potential circular imports between client.py and module_async.py
4043
if TYPE_CHECKING:
@@ -75,6 +78,7 @@ async def execute(
7578
async def create_entity(
7679
self,
7780
payload: bytes | None = None,
81+
content_type: str | None = None,
7882
annotations: Annotations | None = None,
7983
btl: int | None = None,
8084
tx_params: TxParams | None = None,
@@ -84,6 +88,7 @@ async def create_entity(
8488
8589
Args:
8690
payload: Optional data payload for the entity
91+
content_type: Optional content type of the payload
8792
annotations: Optional key-value annotations
8893
btl: Blocks to live (default: self.btl_default, ~30 minutes with 2s blocks)
8994
tx_params: Optional additional transaction parameters
@@ -92,10 +97,12 @@ async def create_entity(
9297
The entity key and transaction hash of the create operation
9398
"""
9499
# Create the operation
95-
payload, annotations, btl = self._check_and_set_argument_defaults(
96-
payload, annotations, btl
100+
payload, content_type, annotations, btl = self._check_and_set_argument_defaults(
101+
payload, content_type, annotations, btl
102+
)
103+
create_op = CreateOp(
104+
payload=payload, content_type=content_type, annotations=annotations, btl=btl
97105
)
98-
create_op = CreateOp(payload=payload, annotations=annotations, btl=btl)
99106

100107
# Wrap in Operations container and execute
101108
operations = Operations(creates=[create_op])
@@ -116,6 +123,7 @@ async def update_entity(
116123
self,
117124
entity_key: EntityKey,
118125
payload: bytes | None = None,
126+
content_type: str | None = None,
119127
annotations: Annotations | None = None,
120128
btl: int | None = None,
121129
tx_params: TxParams | None = None,
@@ -134,12 +142,13 @@ async def update_entity(
134142
Transaction hash of the update operation
135143
"""
136144
# Create the update operation
137-
payload, annotations, btl = self._check_and_set_argument_defaults(
138-
payload, annotations, btl
145+
payload, content_type, annotations, btl = self._check_and_set_argument_defaults(
146+
payload, content_type, annotations, btl
139147
)
140148
update_op = UpdateOp(
141149
entity_key=entity_key,
142150
payload=payload,
151+
content_type=content_type,
143152
annotations=annotations,
144153
btl=btl,
145154
)
@@ -218,138 +227,81 @@ async def delete_entity(
218227

219228
return receipt.tx_hash
220229

221-
async def entity_exists(self, entity_key: EntityKey) -> bool:
230+
async def entity_exists(
231+
self, entity_key: EntityKey, at_block: int | None = None
232+
) -> bool:
222233
"""
223-
Check if an entity exists storage (async).
234+
Check if an entity exists storage.
224235
225236
Args:
226237
entity_key: The entity key to check
238+
at_block: Block number to pin query to specific block, or None to use latest block available
227239
228240
Returns:
229241
True if the entity exists, False otherwise
230242
"""
231243
try:
232-
# TODO self.client.eth.get_entity_metadata by itself does not guarantee existence
233-
await self._get_entity_metadata(entity_key)
234-
return True
235-
244+
options = QueryOptions(fields=NONE, at_block=at_block)
245+
query_result: QueryResult = await self.query_entities(
246+
f"$key = {entity_key}", options=options
247+
)
248+
return len(query_result.entities) > 0
236249
except Exception:
237250
return False
238251

239-
async def get_entity(self, entity_key: EntityKey, fields: int = ALL) -> Entity:
252+
async def get_entity(
253+
self, entity_key: EntityKey, fields: int = ALL, at_block: int | None = None
254+
) -> Entity:
240255
"""
241-
Get an entity by its entity key (async).
256+
Get an entity by its entity key.
242257
243258
Args:
244259
entity_key: The entity key to retrieve
245-
fields: Bitfield indicating which fields to retrieve
246-
PAYLOAD (1) = retrieve payload
247-
METADATA (2) = retrieve metadata
248-
ANNOTATIONS (4) = retrieve annotations
260+
fields: Bitfield indicating which fields to retrieve. See file types.py
261+
at_block: Block number to pin query to specific block, or None to use latest block available
249262
250263
Returns:
251264
Entity object with the requested fields
252265
"""
253-
# Gather the requested data
254-
owner: ChecksumAddress | None = None
255-
expires_at_block: int | None = None
256-
payload: bytes | None = None
257-
annotations: Annotations | None = None
258-
259-
# HINT: rpc methods to fetch entity content might change this is the place to adapt
260-
# get and decode payload if requested
261-
try:
262-
if fields & PAYLOAD:
263-
payload = await self._get_storage_value(entity_key)
264-
265-
# get and decode annotations and/or metadata if requested
266-
if fields & METADATA or fields & ANNOTATIONS:
267-
metadata_all = await self._get_entity_metadata(entity_key)
268-
269-
if fields & METADATA:
270-
# Convert owner address to checksummed format
271-
owner = self._get_owner(metadata_all)
272-
expires_at_block = self._get_expires_at_block(metadata_all)
273-
274-
if fields & ANNOTATIONS:
275-
annotations = merge_annotations(
276-
string_annotations=metadata_all.get("stringAnnotations", []),
277-
numeric_annotations=metadata_all.get("numericAnnotations", []),
278-
)
279-
except Exception as e:
280-
logger.warning(f"Error fetching entity[{entity_key}]: {e}")
281-
282-
# Create and return entity
283-
return Entity(
284-
entity_key=entity_key,
285-
fields=fields,
286-
owner=owner,
287-
expires_at_block=expires_at_block,
288-
payload=payload,
289-
annotations=annotations,
266+
267+
options = QueryOptions(fields=fields, at_block=at_block)
268+
query_result: QueryResult = await self.query_entities(
269+
f"$key = {entity_key}", options=options
290270
)
291271

272+
if not query_result:
273+
raise ValueError(f"Entity not found: {entity_key}")
274+
275+
if len(query_result.entities) != 1:
276+
raise ValueError(f"Expected 1 entity, got {len(query_result.entities)}")
277+
278+
result_entity = query_result.entities[0]
279+
return result_entity
280+
292281
async def query_entities(
293282
self,
294283
query: str | None = None,
295-
*,
296-
limit: int | None = None,
297-
at_block: int | str = "latest",
298-
cursor: Cursor | None = None,
284+
options: QueryOptions = QUERY_OPTIONS_DEFAULT,
299285
) -> QueryResult:
300286
"""
301-
Query entities with manual pagination control (async).
302-
303-
Execute a query against entity storage and return a single page of results.
304-
Use the returned cursor to fetch subsequent pages.
287+
Execute a query against entity storage.
305288
306289
Args:
307-
query: SQL-like query string for the first page. Mutually exclusive with cursor.
308-
Example: "SELECT * WHERE owner = '0x...' ORDER BY created_at DESC"
309-
limit: Maximum number of entities to return per page.
310-
Server may enforce a maximum limit. Can be modified between pagination requests.
311-
at_block: Block number or "latest" at which to execute query.
312-
Ensures consistency across paginated results. Ignored when using cursor
313-
(cursor already contains the block number).
314-
cursor: Cursor from previous QueryResult to fetch next page.
315-
Mutually exclusive with query. The cursor encodes the query and block number.
316-
317-
Returns:
318-
QueryResult with entities, block number, and optional next cursor.
290+
query: SQL-like query string
291+
options: QueryOptions for the query execution
319292
320293
Raises:
321-
ValueError: If both query and cursor are provided, or if neither is provided.
322-
NotImplementedError: This method is not yet implemented.
323-
324-
Examples:
325-
First page:
326-
>>> result = await arkiv.arkiv.query_entities(
327-
... "SELECT * WHERE owner = '0x1234...'",
328-
... limit=100
329-
... )
330-
>>> for entity in result:
331-
... print(entity.entity_key)
294+
ValueError: if invalid query options are provided.
332295
333-
Next page:
334-
>>> if result.has_more():
335-
... next_page = await arkiv.arkiv.query_entities(cursor=result.next_cursor)
336-
337-
Note:
338-
For automatic pagination across all pages, use query_all_entities() instead.
296+
Returns:
297+
QueryResult with entities, block number, and an optional cursor to fetch the next page
339298
"""
340299
# Validate parameters using base class helper
341-
self._validate_query_entities_params(query, limit, at_block, cursor)
342-
343-
if query:
344-
# Fetch raw results from RPC
345-
raw_results = await self.client.eth.query_entities(query)
346-
block_number = await self.client.eth.get_block_number()
347-
348-
# Transform raw results into QueryResult using base class helper
349-
return self._build_query_result(raw_results, block_number)
300+
options.validate(query)
301+
rpc_options = to_rpc_query_options(options)
302+
raw_results = await self.client.eth.query(query, rpc_options)
350303

351-
# Cursor based pagination not implemented yet
352-
raise NotImplementedError("query_entities is not yet implemented for cursors")
304+
return to_query_result(options.fields, raw_results)
353305

354306
async def watch_entity_created(
355307
self,

0 commit comments

Comments
 (0)