Skip to content

Commit f8907cf

Browse files
Add SubscriptionManager class to avoid subscription storm, add merge_subscriptions flag (#170)
* wip * Useful cli options * on_error * Bump * Drop heavily outdated datasource tests * Docs * Fixes * Fix tests, switch dependencies, fix lost subs * TConfigure action on reindexing triggered * Bump deps * Initial SubscriptionManager impl attempt * Fix subs shitstorm * Less logs * Logs * Fix false alarm on duplicate sub * Bump * Convinience cli commands * You gotta go fast * Bump * Subscription sync_levels WIP * Typo * Fix retrier * Lint cli * Bump, docs * Typo * Fix levels one more time * Bump * Fix sync levels * Filter out operation subgroups, fix merge_subscriptions flag * daemon flag, fix subgroups filtering * slowdoooown, fix crash on empty operation batches, increase maxsize * Fix bencchmarks * Prefiilter operations during subgroup extraction * Bump * Fix applying filters to indexdispatcher * Bump
1 parent d1d2fba commit f8907cf

File tree

20 files changed

+989
-563
lines changed

20 files changed

+989
-563
lines changed

CHANGELOG.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,31 @@
11
# Changelog
22

3-
## 3.2.0 - [unreleased]
3+
## 4.0.0 - [unreleased]
44

55
### Fixed
66

77
* tzkt: Realtime connection watchdog is no longer triggered after TzKT outtage.
8-
* codegen: Fixed missing imports in handlers generated during init.
9-
* coinbase: Candles caching disabled.
8+
* tzkt: Adding many indexes in runtime no longer leads to sending useless subscription requests.
9+
* tzkt: Fixed `get_originated_contracts` and `get_similar_contracts` methods whose output was limited to `batch_size` tunable.
1010
* index: Fixed head index callback invocation.
11+
* coinbase: Candles caching disabled.
12+
13+
### Changed
14+
15+
* cli: `run --oneshot` option is deprecated and will be removed in the next major release. Oneshot mode applied automatically when `first_level` and `last_level` fields are set in index config.
16+
* codegen: Fixed missing imports in handlers generated during init.
1117

1218
### Added
1319

14-
* cli: `dipdip run --early-realtime` flag to establish a real-time connection before all indexes are synchronized.
15-
* cli: `dipdup run --skip-hasura` flag to skip updating Hasura metadata.
20+
* cli: New flag `dipdip run --early-realtime` to establish a real-time connection before all indexes are synchronized.
21+
* cli: New flag `dipdup run --skip-hasura` to skip updating Hasura metadata.
22+
* cli: New command `dipdup status` to print the current status of indexes from database
23+
* cli: New command `dipdup config` to print config after resolving all links and variables
24+
* config: Added optional fields `first_level` and `last_level` to `TemplateIndexConfig`. These limits are applied after ones from the template itself.
1625

1726
### Improved
1827

28+
* index: Time required to initialize indexes presented in database reduced by ~25%.
1929
* tzkt: Replaced `aiosignalrcore` library with `pysignalr`.
2030

2131
## 3.1.3 - 2021-11-15

poetry.lock

Lines changed: 227 additions & 189 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ datamodel-code-generator = "^0.11.1"
3232
fcache = "^0.4.7"
3333
pydantic = "^1.8.1"
3434
pyhumps = "^3.0.2"
35-
pysignalr = {git = "https://github.com/baking-bad/pysignalr.git", branch = "initial-release"}
35+
pysignalr = "^0.1.0"
3636
pytezos = {version = "^3.2.4", optional = true}
3737
python-dotenv = "^0.18.0"
3838
"ruamel.yaml" = "^0.17.2"

scripts/run_benchmarks.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
for i in config index; do
22
rm tests/benchmarks/$i.latest.json;
3-
python tests/benchmarks/$i.py -o tests/benchmarks/$i.latest.json;
3+
python tests/benchmarks/$i.py -o tests/benchmarks/$i.latest.json --quiet;
44
python -m pyperf compare_to --table tests/benchmarks/$i.json tests/benchmarks/$i.latest.json;
55
done;

src/dipdup/cli.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import json
23
import logging
34
import os
45
import signal
@@ -12,6 +13,7 @@
1213
import sentry_sdk
1314
from dotenv import load_dotenv
1415
from fcache.cache import FileCache # type: ignore
16+
from pydantic.json import pydantic_encoder
1517
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
1618
from sentry_sdk.integrations.logging import LoggingIntegration
1719
from tortoise import Tortoise
@@ -145,6 +147,7 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str):
145147
@click.option('--postpone-jobs', is_flag=True, help='Do not start job scheduler until all indexes are synchronized')
146148
@click.option('--skip-hasura', is_flag=True, help='Do not update Hasura metadata')
147149
@click.option('--early-realtime', is_flag=True, help='Establish a realtime connection before all indexes are synchronized')
150+
@click.option('--merge-subscriptions', is_flag=True, help='Subscribe to all updates instead of individual contracts')
148151
@click.pass_context
149152
@cli_wrapper
150153
async def run(
@@ -153,17 +156,21 @@ async def run(
153156
postpone_jobs: bool,
154157
skip_hasura: bool,
155158
early_realtime: bool,
159+
merge_subscriptions: bool,
156160
) -> None:
161+
if oneshot:
162+
_logger.warning('`oneshot` argument is deprecated: use `first_level` and `last_level` fields of index config instead')
157163
config: DipDupConfig = ctx.obj.config
158164
config.initialize()
165+
config.advanced.postpone_jobs |= postpone_jobs
166+
config.advanced.skip_hasura |= skip_hasura
167+
config.advanced.early_realtime |= early_realtime
168+
config.advanced.merge_subscriptions |= merge_subscriptions
169+
159170
set_decimal_context(config.package)
171+
160172
dipdup = DipDup(config)
161-
await dipdup.run(
162-
oneshot=oneshot,
163-
postpone_jobs=postpone_jobs or config.advanced.postpone_jobs,
164-
skip_hasura=skip_hasura or config.advanced.skip_hasura,
165-
early_realtime=early_realtime or config.advanced.early_realtime,
166-
)
173+
await dipdup.run()
167174

168175

169176
@cli.command(help='Generate missing callbacks and types')
@@ -187,6 +194,39 @@ async def migrate(ctx):
187194
await migrations.migrate()
188195

189196

197+
from dipdup.models import Index
198+
199+
200+
@cli.command(help='Show current status of indexes in database')
201+
@click.pass_context
202+
@cli_wrapper
203+
async def status(ctx):
204+
config: DipDupConfig = ctx.obj.config
205+
url = config.database.connection_string
206+
models = f'{config.package}.models'
207+
async with tortoise_wrapper(url, models):
208+
# TODO: Formatting
209+
print('_' * 80)
210+
async for index in Index.all():
211+
print(f'{index.name}\t{index.status.value}\t{index.level}')
212+
print('_' * 80)
213+
214+
215+
# TODO: Docs, `--unsafe` argument to resolve env variables, default to not doing it
216+
@cli.command(help='Show config')
217+
@click.pass_context
218+
@cli_wrapper
219+
async def config(ctx):
220+
import ruamel.yaml as yaml
221+
222+
config: DipDupConfig = ctx.obj.config
223+
config_json = json.dumps(config, default=pydantic_encoder)
224+
config_yaml = yaml.dump(yaml.safe_load(config_json), indent=2, default_flow_style=False)
225+
print('_' * 80)
226+
print(config_yaml)
227+
print('_' * 80)
228+
229+
190230
# TODO: "cache clear"?
191231
@cli.command(help='Clear development request cache')
192232
@click.pass_context

src/dipdup/config.py

Lines changed: 108 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from copy import copy
1313
from dataclasses import field
1414
from enum import Enum
15-
from functools import reduce
15+
from functools import cached_property, reduce
1616
from os import environ as env
1717
from os.path import dirname
1818
from pydoc import locate
@@ -25,6 +25,7 @@
2525
from ruamel.yaml import YAML
2626
from typing_extensions import Literal
2727

28+
from dipdup.datasources.subscription import BigMapSubscription, OriginationSubscription, Subscription, TransactionSubscription
2829
from dipdup.enums import ReindexingAction, ReindexingReasonC
2930
from dipdup.exceptions import ConfigInitializationException, ConfigurationError
3031
from dipdup.utils import import_from, pascal_to_snake, snake_to_pascal
@@ -590,20 +591,33 @@ def template_values(self, value: Dict[str, str]) -> None:
590591
self._template_values = value
591592

592593

594+
@dataclass
595+
class SubscriptionsMixin:
596+
def __post_init_post_parse__(self) -> None:
597+
self._subscriptions: Set[Subscription] = set()
598+
599+
@property
600+
def subscriptions(self) -> Set[Subscription]:
601+
return self._subscriptions
602+
603+
593604
@dataclass
594605
class IndexTemplateConfig(NameMixin):
595606
kind = 'template'
596607
template: str
597608
values: Dict[str, str]
609+
first_level: int = 0
610+
last_level: int = 0
598611

599612

600613
@dataclass
601-
class IndexConfig(TemplateValuesMixin, NameMixin, ParentMixin['ResolvedIndexConfigT']):
614+
class IndexConfig(TemplateValuesMixin, NameMixin, SubscriptionsMixin, ParentMixin['ResolvedIndexConfigT']):
602615
datasource: Union[str, TzktDatasourceConfig]
603616

604617
def __post_init_post_parse__(self) -> None:
605618
TemplateValuesMixin.__post_init_post_parse__(self)
606619
NameMixin.__post_init_post_parse__(self)
620+
SubscriptionsMixin.__post_init_post_parse__(self)
607621
ParentMixin.__post_init_post_parse__(self)
608622

609623
@property
@@ -642,8 +656,8 @@ class OperationIndexConfig(IndexConfig):
642656
643657
:param datasource: Alias of index datasource in `datasources` section
644658
:param contracts: Aliases of contracts being indexed in `contracts` section
645-
:param first_level: First block to process (use with `--oneshot` run argument)
646-
:param last_level: Last block to process (use with `--oneshot` run argument)
659+
:param first_level: First block to process (one time sync)
660+
:param last_level: Last block to process (one time sync)
647661
:param handlers: List of indexer handlers
648662
"""
649663

@@ -655,6 +669,33 @@ class OperationIndexConfig(IndexConfig):
655669
first_level: int = 0
656670
last_level: int = 0
657671

672+
@cached_property
673+
def entrypoint_filter(self) -> Set[Optional[str]]:
674+
entrypoints = set()
675+
for handler_config in self.handlers:
676+
for pattern_config in handler_config.pattern:
677+
if isinstance(pattern_config, OperationHandlerTransactionPatternConfig):
678+
entrypoints.add(pattern_config.entrypoint)
679+
return set(entrypoints)
680+
681+
@cached_property
682+
def address_filter(self) -> Set[str]:
683+
addresses = set()
684+
for handler_config in self.handlers:
685+
for pattern_config in handler_config.pattern:
686+
if isinstance(pattern_config, OperationHandlerTransactionPatternConfig):
687+
if isinstance(pattern_config.source, ContractConfig):
688+
addresses.add(pattern_config.source.address)
689+
elif isinstance(pattern_config.source, str):
690+
raise ConfigInitializationException
691+
692+
if isinstance(pattern_config.destination, ContractConfig):
693+
addresses.add(pattern_config.destination.address)
694+
elif isinstance(pattern_config.destination, str):
695+
raise ConfigInitializationException
696+
697+
return addresses
698+
658699

659700
@dataclass
660701
class BigMapHandlerConfig(HandlerConfig, kind='handler'):
@@ -809,11 +850,15 @@ class JobConfig(NameMixin):
809850
hook: Union[str, 'HookConfig']
810851
crontab: Optional[str] = None
811852
interval: Optional[int] = None
853+
daemon: bool = False
812854
args: Dict[str, Any] = field(default_factory=dict)
813855

814856
def __post_init_post_parse__(self):
815-
if int(bool(self.crontab)) + int(bool(self.interval)) != 1:
816-
raise ConfigurationError('Either `interval` or `crontab` field must be specified')
857+
if self.crontab and self.interval:
858+
raise ConfigurationError('Only one of `crontab` and `interval` can be specified')
859+
elif not (self.crontab or self.interval or self.daemon):
860+
raise ConfigurationError('One of `crontab`, `interval` or `daemon` must be specified')
861+
817862
NameMixin.__post_init_post_parse__(self)
818863

819864
@property
@@ -878,10 +923,12 @@ def _args_with_context(self) -> Dict[str, str]:
878923

879924
@dataclass
880925
class AdvancedConfig:
926+
reindex: Dict[ReindexingReasonC, ReindexingAction] = Field(default_factory=dict)
927+
oneshot: bool = False
881928
postpone_jobs: bool = False
882929
skip_hasura: bool = False
883930
early_realtime: bool = False
884-
reindex: Dict[ReindexingReasonC, ReindexingAction] = Field(default_factory=dict)
931+
merge_subscriptions: bool = False
885932

886933

887934
@dataclass
@@ -945,6 +992,17 @@ def package_path(self, value: str):
945992
raise ConfigInitializationException
946993
self._package_path = value
947994

995+
@property
996+
def oneshot(self) -> bool:
997+
syncable_indexes = tuple(c for c in self.indexes.values() if not isinstance(c, HeadIndexConfig))
998+
oneshot_indexes = tuple(c for c in syncable_indexes if c.last_level)
999+
if not oneshot_indexes:
1000+
return False
1001+
elif len(oneshot_indexes) == len(syncable_indexes):
1002+
return True
1003+
else:
1004+
raise ConfigurationError('Either all or none of indexes can have `last_level` field set')
1005+
9481006
@classmethod
9491007
def load(
9501008
cls,
@@ -962,7 +1020,7 @@ def load(
9621020
with open(filename) as file:
9631021
raw_config = file.read()
9641022

965-
_logger.info('Substituting environment variables')
1023+
_logger.debug('Substituting environment variables')
9661024
for match in re.finditer(ENV_VARIABLE_REGEX, raw_config):
9671025
variable, default_value = match.group(1), match.group(2)
9681026
config_environment[variable] = default_value
@@ -1034,7 +1092,7 @@ def initialize(self, skip_imports: bool = False) -> None:
10341092
if index_config.name in self._imports_resolved:
10351093
continue
10361094

1037-
_logger.info('Loading callbacks and typeclasses of index `%s`', index_config.name)
1095+
_logger.debug('Loading callbacks and typeclasses of index `%s`', index_config.name)
10381096

10391097
if isinstance(index_config, IndexTemplateConfig):
10401098
raise ConfigInitializationException
@@ -1107,7 +1165,7 @@ def get_pattern_type(pattern_items: Sequence[HandlerPatternConfigT]) -> str:
11071165
)
11081166

11091167
def _resolve_template(self, template_config: IndexTemplateConfig) -> None:
1110-
_logger.info('Resolving index config `%s` from template `%s`', template_config.name, template_config.template)
1168+
_logger.debug('Resolving index config `%s` from template `%s`', template_config.name, template_config.template)
11111169

11121170
template = self.get_template(template_config.template)
11131171
raw_template = json.dumps(template, default=pydantic_encoder)
@@ -1124,6 +1182,9 @@ def _resolve_template(self, template_config: IndexTemplateConfig) -> None:
11241182
new_index_config.template_values = template_config.values
11251183
new_index_config.parent = template
11261184
new_index_config.name = template_config.name
1185+
if not isinstance(new_index_config, HeadIndexConfig):
1186+
new_index_config.first_level |= template_config.first_level
1187+
new_index_config.last_level |= template_config.last_level
11271188
self.indexes[template_config.name] = new_index_config
11281189

11291190
def _resolve_templates(self) -> None:
@@ -1136,12 +1197,49 @@ def _resolve_links(self) -> None:
11361197
if name in self._links_resolved:
11371198
continue
11381199
self._resolve_index_links(index_config)
1200+
# TODO: Not exactly link resolving, move somewhere else
1201+
self._resolve_index_subscriptions(index_config)
11391202
self._links_resolved.add(index_config.name)
11401203

11411204
for job_config in self.jobs.values():
11421205
if isinstance(job_config.hook, str):
11431206
job_config.hook = self.hooks[job_config.hook]
11441207

1208+
def _resolve_index_subscriptions(self, index_config: IndexConfigT) -> None:
1209+
if isinstance(index_config, IndexTemplateConfig):
1210+
return
1211+
if index_config.subscriptions:
1212+
return
1213+
1214+
if isinstance(index_config, OperationIndexConfig):
1215+
if self.advanced.merge_subscriptions:
1216+
index_config.subscriptions.add(TransactionSubscription())
1217+
else:
1218+
for contract_config in index_config.contracts or ():
1219+
address = cast(ContractConfig, contract_config).address
1220+
index_config.subscriptions.add(TransactionSubscription(address=address))
1221+
1222+
for handler_config in index_config.handlers:
1223+
for pattern_config in handler_config.pattern:
1224+
if isinstance(pattern_config, OperationHandlerOriginationPatternConfig):
1225+
index_config.subscriptions.add(OriginationSubscription())
1226+
break
1227+
1228+
elif isinstance(index_config, BigMapIndexConfig):
1229+
if self.advanced.merge_subscriptions:
1230+
index_config.subscriptions.add(BigMapSubscription())
1231+
else:
1232+
for big_map_handler_config in index_config.handlers:
1233+
address, path = big_map_handler_config.contract_config.address, big_map_handler_config.path
1234+
index_config.subscriptions.add(BigMapSubscription(address=address, path=path))
1235+
1236+
# NOTE: HeadSubscription is always enabled
1237+
elif isinstance(index_config, HeadIndexConfig):
1238+
pass
1239+
1240+
else:
1241+
raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported')
1242+
11451243
def _resolve_index_links(self, index_config: IndexConfigT) -> None:
11461244
"""Resolve contract and datasource configs by aliases"""
11471245

0 commit comments

Comments
 (0)