Skip to content

Commit 7f73c9b

Browse files
Metadata interface for TzKT (#213)
* Use orjson * metadata_interface flag * Builtin hooks fix * docs * Changelog * CLI, init cursor * fix metadata interface * network fiels * text token_id, fix decimal precision * Fix changelog * Additional config checks
1 parent 23f4341 commit 7f73c9b

File tree

15 files changed

+192
-16
lines changed

15 files changed

+192
-16
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Please use [this](https://docs.gitlab.com/ee/development/changelog.html) documen
66

77
### Added
88

9+
* metadata: Added `metadata_interface` feature flag to expose metadata in TzKT format.
10+
* tzkt: Added missing fields to the `HeadBlockData` model.
911
* tzkt: Added `iter_...` methods to iterate over item batches.
1012

1113
### Fixed
@@ -21,6 +23,10 @@ Please use [this](https://docs.gitlab.com/ee/development/changelog.html) documen
2123

2224
* bcd: Removed `bcd` datasource and config section.
2325

26+
### Performance
27+
28+
* dipdup: Use fast `orjson` library instead of built-in `json` where possible.
29+
2430
## 4.2.6 - 2022-02-25
2531

2632
### Fixed

poetry.lock

Lines changed: 35 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ asyncclick = "^8.0.1"
3030
asyncpg = "^0.24.0"
3131
datamodel-code-generator = "^0.11.18"
3232
fcache = "^0.4.7"
33+
orjson = "^3.6.6"
3334
pydantic = "^1.8.1"
3435
pyhumps = "^3.0.2"
3536
pysignalr = "^0.1.1"

src/dipdup/cli.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,19 +195,22 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str):
195195
@click.option('--postpone-jobs', is_flag=True, help='Do not start job scheduler until all indexes are synchronized')
196196
@click.option('--early-realtime', is_flag=True, help='Establish a realtime connection before all indexes are synchronized')
197197
@click.option('--merge-subscriptions', is_flag=True, help='Subscribe to all operations/big map diffs during realtime indexing')
198+
@click.option('--metadata-interface', is_flag=True, help='Enable metadata interface')
198199
@click.pass_context
199200
@cli_wrapper
200201
async def run(
201202
ctx,
202203
postpone_jobs: bool,
203204
early_realtime: bool,
204205
merge_subscriptions: bool,
206+
metadata_interface: bool,
205207
) -> None:
206208
config: DipDupConfig = ctx.obj.config
207209
config.initialize()
208210
config.advanced.postpone_jobs |= postpone_jobs
209211
config.advanced.early_realtime |= early_realtime
210212
config.advanced.merge_subscriptions |= merge_subscriptions
213+
config.advanced.metadata_interface |= metadata_interface
211214

212215
set_decimal_context(config.package)
213216

src/dipdup/codegen.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import logging
32
import os
43
import re
@@ -16,6 +15,7 @@
1615
from typing import List
1716
from typing import cast
1817

18+
import orjson as json
1919
from jinja2 import Template
2020

2121
from dipdup import __version__
@@ -182,7 +182,7 @@ async def fetch_schemas(self) -> None:
182182
storage_schema_path = join(contract_schemas_path, 'storage.json')
183183
storage_schema = preprocess_storage_jsonschema(contract_schemas['storageSchema'])
184184

185-
write(storage_schema_path, json.dumps(storage_schema, indent=4, sort_keys=True))
185+
write(storage_schema_path, json.dumps(storage_schema, option=json.OPT_INDENT_2))
186186

187187
if not isinstance(operation_pattern_config, OperationHandlerTransactionPatternConfig):
188188
continue
@@ -200,7 +200,7 @@ async def fetch_schemas(self) -> None:
200200

201201
entrypoint = entrypoint.replace('.', '_').lstrip('_')
202202
entrypoint_schema_path = join(parameter_schemas_path, f'{entrypoint}.json')
203-
written = write(entrypoint_schema_path, json.dumps(entrypoint_schema, indent=4))
203+
written = write(entrypoint_schema_path, json.dumps(entrypoint_schema, option=json.OPT_INDENT_2))
204204
if not written and contract_config.typename is not None:
205205
with open(entrypoint_schema_path, 'r') as file:
206206
existing_schema = json.loads(file.read())
@@ -229,11 +229,11 @@ async def fetch_schemas(self) -> None:
229229
big_map_path = big_map_handler_config.path.replace('.', '_')
230230
big_map_key_schema = big_map_schema['keySchema']
231231
big_map_key_schema_path = join(big_map_schemas_path, f'{big_map_path}_key.json')
232-
write(big_map_key_schema_path, json.dumps(big_map_key_schema, indent=4))
232+
write(big_map_key_schema_path, json.dumps(big_map_key_schema, option=json.OPT_INDENT_2))
233233

234234
big_map_value_schema = big_map_schema['valueSchema']
235235
big_map_value_schema_path = join(big_map_schemas_path, f'{big_map_path}_value.json')
236-
write(big_map_value_schema_path, json.dumps(big_map_value_schema, indent=4))
236+
write(big_map_value_schema_path, json.dumps(big_map_value_schema, option=json.OPT_INDENT_2))
237237

238238
elif isinstance(index_config, HeadIndexConfig):
239239
pass

src/dipdup/config.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -942,10 +942,12 @@ def iter_imports(self, package: str) -> Iterator[Tuple[str, str]]:
942942
class AdvancedConfig:
943943
reindex: Dict[ReindexingReasonC, ReindexingAction] = field(default_factory=dict)
944944
scheduler: Optional[Dict[str, Any]] = None
945+
# TODO: Drop in major version
945946
oneshot: bool = False
946947
postpone_jobs: bool = False
947948
early_realtime: bool = False
948949
merge_subscriptions: bool = False
950+
metadata_interface: bool = False
949951

950952

951953
@dataclass
@@ -1137,9 +1139,15 @@ def initialize(self, skip_imports: bool = False) -> None:
11371139
self._imports_resolved.add(index_config.name)
11381140

11391141
def _validate(self) -> None:
1140-
# NOTE: Hasura
1141-
if isinstance(self.database, SqliteDatabaseConfig) and self.hasura:
1142-
raise ConfigurationError('SQLite database engine is not supported by Hasura')
1142+
# NOTE: Hasura and metadata interface
1143+
if self.hasura:
1144+
if isinstance(self.database, SqliteDatabaseConfig):
1145+
raise ConfigurationError('SQLite database engine is not supported by Hasura')
1146+
if self.advanced.metadata_interface and self.hasura.camel_case:
1147+
raise ConfigurationError('`metadata_interface` flag is incompatible with `camel_case` one')
1148+
else:
1149+
if self.advanced.metadata_interface:
1150+
raise ConfigurationError('`metadata_interface` flag requires `hasura` section to be present')
11431151

11441152
# NOTE: Reserved hooks
11451153
for name, hook_config in self.hooks.items():

src/dipdup/context.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@
5353
from dipdup.exceptions import InitializationRequiredError
5454
from dipdup.exceptions import ReindexingRequiredError
5555
from dipdup.models import Contract
56+
from dipdup.models import ContractMetadata
5657
from dipdup.models import Index
5758
from dipdup.models import ReindexingReason
5859
from dipdup.models import Schema
60+
from dipdup.models import TokenMetadata
5961
from dipdup.utils import FormattedLogger
6062
from dipdup.utils import slowdown
6163
from dipdup.utils.database import execute_sql_scripts
@@ -68,6 +70,31 @@
6870
pending_hooks: Deque[Awaitable[None]] = deque()
6971

7072

73+
class MetadataCursor:
74+
_contract = 0
75+
_token = 0
76+
77+
def __new__(cls):
78+
raise NotImplementedError
79+
80+
@classmethod
81+
async def initialize(cls) -> None:
82+
if last_contract := await ContractMetadata.filter().order_by('-update_id').first():
83+
cls._contract = last_contract.update_id
84+
if last_token := await TokenMetadata.filter().order_by('-update_id').first():
85+
cls._token = last_token.update_id
86+
87+
@classmethod
88+
def contract(cls) -> int:
89+
cls._contract += 1
90+
return cls._contract
91+
92+
@classmethod
93+
def token(cls) -> int:
94+
cls._token += 1
95+
return cls._token
96+
97+
7198
# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic serialization.
7299
class DipDupContext:
73100
def __init__(
@@ -216,6 +243,41 @@ async def spawn_index(self, name: str, state: Optional[Index] = None) -> None:
216243
# NOTE: IndexDispatcher will handle further initialization when it's time
217244
pending_indexes.append(index)
218245

246+
async def update_contract_metadata(
247+
self,
248+
network: str,
249+
address: str,
250+
metadata: Dict[str, Any],
251+
) -> None:
252+
if not self.config.advanced.metadata_interface:
253+
return
254+
update_id = MetadataCursor.contract()
255+
await ContractMetadata.update_or_create(
256+
network=network,
257+
contract=address,
258+
defaults={'metadata': metadata, 'update_id': update_id},
259+
)
260+
261+
async def update_token_metadata(
262+
self,
263+
network: str,
264+
address: str,
265+
token_id: str,
266+
metadata: Dict[str, Any],
267+
) -> None:
268+
if not self.config.advanced.metadata_interface:
269+
return
270+
if not all(str.isdigit(c) for c in token_id):
271+
raise ValueError('`token_id` must be a number')
272+
273+
update_id = MetadataCursor.token()
274+
await TokenMetadata.update_or_create(
275+
network=network,
276+
contract=address,
277+
token_id=token_id,
278+
defaults={'metadata': metadata, 'update_id': update_id},
279+
)
280+
219281
def _get_datasource(self, name: str, type_: Type[DatasourceT]) -> DatasourceT:
220282
datasource = self.datasources.get(name)
221283
if not datasource:

src/dipdup/datasources/datasource.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,18 @@ def __init__(self, url: str, http_config: HTTPConfig, merge_subscriptions: bool
5252
self._on_rollback: Set[RollbackCallbackT] = set()
5353
self._subscriptions: SubscriptionManager = SubscriptionManager(merge_subscriptions)
5454
self._subscriptions.add(HeadSubscription())
55+
self._network: Optional[str] = None
5556

5657
@property
5758
def name(self) -> str:
5859
return self._http._url
5960

61+
@property
62+
def network(self) -> str:
63+
if not self._network:
64+
raise RuntimeError('Network is not set')
65+
return self._network
66+
6067
@abstractmethod
6168
async def subscribe(self) -> None:
6269
...
@@ -89,6 +96,11 @@ async def emit_rollback(self, from_level: int, to_level: int) -> None:
8996
for fn in self._on_rollback:
9097
await fn(self, from_level, to_level)
9198

99+
def set_network(self, network: str) -> None:
100+
if self._network:
101+
raise RuntimeError('Network is already set')
102+
self._network = network
103+
92104
def set_sync_level(self, subscription: Optional[Subscription], level: int) -> None:
93105
self._subscriptions.set_sync_level(subscription, level)
94106

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,10 +962,13 @@ def convert_block(cls, block_json: Dict[str, Any]) -> BlockData:
962962
def convert_head_block(cls, head_block_json: Dict[str, Any]) -> HeadBlockData:
963963
"""Convert raw head block message from WS/REST into dataclass"""
964964
return HeadBlockData(
965+
chain=head_block_json['chain'],
966+
chain_id=head_block_json['chainId'],
965967
cycle=head_block_json['cycle'],
966968
level=head_block_json['level'],
967969
hash=head_block_json['hash'],
968970
protocol=head_block_json['protocol'],
971+
next_protocol=head_block_json['nextProtocol'],
969972
timestamp=cls._parse_timestamp(head_block_json['timestamp']),
970973
voting_epoch=head_block_json['votingEpoch'],
971974
voting_period=head_block_json['votingPeriod'],
@@ -980,6 +983,7 @@ def convert_head_block(cls, head_block_json: Dict[str, Any]) -> HeadBlockData:
980983
quote_jpy=Decimal(head_block_json['quoteJpy']),
981984
quote_krw=Decimal(head_block_json['quoteKrw']),
982985
quote_eth=Decimal(head_block_json['quoteEth']),
986+
quote_gbp=Decimal(head_block_json['quoteGbp']),
983987
)
984988

985989
@classmethod

src/dipdup/dipdup.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from dipdup.config import default_hooks
3737
from dipdup.context import CallbackManager
3838
from dipdup.context import DipDupContext
39+
from dipdup.context import MetadataCursor
3940
from dipdup.context import pending_indexes
4041
from dipdup.datasources.coinbase.datasource import CoinbaseDatasource
4142
from dipdup.datasources.datasource import Datasource
@@ -351,6 +352,9 @@ async def run(self) -> None:
351352
await self._initialize_datasources()
352353
await self._set_up_hasura(stack)
353354

355+
if advanced_config.metadata_interface:
356+
await MetadataCursor.initialize()
357+
354358
if self._config.oneshot:
355359
start_scheduler_event, spawn_datasources_event = Event(), Event()
356360
else:
@@ -417,7 +421,8 @@ async def _initialize_schema(self) -> None:
417421
# TODO: Fix Tortoise ORM to raise more specific exception
418422
except KeyError:
419423
try:
420-
# NOTE: A small migration, ReindexingReason became ReversedEnum
424+
# TODO: Drop with major version bump
425+
# NOTE: A small migration, ReindexingReason became ReversedEnum in 3.1.0
421426
for item in ReindexingReason:
422427
await conn.execute_script(f'UPDATE dipdup_schema SET reindex = "{item.name}" WHERE reindex = "{item.value}"')
423428

@@ -491,9 +496,11 @@ async def _initialize_datasources(self) -> None:
491496
if not isinstance(datasource, TzktDatasource):
492497
continue
493498

499+
head_block = await datasource.get_head_block()
500+
datasource.set_network(head_block.chain)
494501
datasource.set_sync_level(
495502
subscription=None,
496-
level=(await datasource.get_head_block()).level,
503+
level=head_block.level,
497504
)
498505

499506
db_head = await Head.filter(name=datasource.name).first()

0 commit comments

Comments
 (0)