Skip to content

Commit 4968ee3

Browse files
Track datasource level by channel, ignore head rollbacks (#148)
* Track datasource level by channel, ignore head rollbacks * Docs * Typo * Changelog, fix level check * Rollback tests WIP * Single queue WIP * Refactor OperationIndex * Fix tests * Refactoring * Refactor `_extract_message_data` method * Allow "zero level" rollbacks, refactor `_on_rollback` callback * Zero level rollbacks special case, more refactoring * Refactoring, docs, fix integrity checks * Replace some lists with deques * test_rollback WIP * Another test * Another test * Fix realtime atomicity * More tests * Update changelog * Disable logger * Warn on rollbacks * Optimizations * Moar * Changelog * Ability to pass additional context on reindexing * Fix tests, enqueue single level rollback * InitializationRequiredError on create_package failure * Final touches, changelog
1 parent ccbb0cb commit 4968ee3

File tree

10 files changed

+424
-327
lines changed

10 files changed

+424
-327
lines changed

CHANGELOG.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,25 @@
22

33
## [unreleased]
44

5+
### Improved
6+
7+
* A significant increase in indexing speed.
8+
59
### Fixed
610

7-
* Removed unnecessary file IO calls, improved logging
11+
* Fixed unexpected reindexing caused by the bug in processing zero- and single-level rollbacks.
12+
* Removed unnecessary file IO calls that could cause `PermissionError` exception in Docker environments.
13+
* Fixed possible violation of block-level atomicity during real-time indexing.
14+
15+
### Changes
16+
17+
* Public methods of `TzktDatasource` now return immutable sequences.
818

919
## 3.0.3 - 2021-10-01
1020

1121
### Fixed
1222

13-
* Fixed processing of single level rollbacks emitted before rolled back head
23+
* Fixed processing of single-level rollbacks emitted before rolled back head.
1424

1525
## 3.0.2 - 2021-09-30
1626

src/dipdup/cli.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from dipdup.codegen import DEFAULT_DOCKER_ENV_FILE, DEFAULT_DOCKER_IMAGE, DEFAULT_DOCKER_TAG, DipDupCodeGenerator
2121
from dipdup.config import DipDupConfig, LoggingConfig, PostgresDatabaseConfig
2222
from dipdup.dipdup import DipDup
23-
from dipdup.exceptions import ConfigurationError, DeprecatedHandlerError, DipDupError, MigrationRequiredError
23+
from dipdup.exceptions import ConfigurationError, DeprecatedHandlerError, DipDupError, InitializationRequiredError, MigrationRequiredError
2424
from dipdup.hasura import HasuraGateway
2525
from dipdup.migrations import DipDupMigrationManager, deprecated_handlers
2626
from dipdup.utils.database import set_decimal_context, tortoise_wrapper
@@ -113,7 +113,10 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str):
113113
_config = DipDupConfig.load(config)
114114
init_sentry(_config)
115115

116-
await DipDupCodeGenerator(_config, {}).create_package()
116+
try:
117+
await DipDupCodeGenerator(_config, {}).create_package()
118+
except Exception as e:
119+
raise InitializationRequiredError from e
117120

118121
if _config.spec_version not in spec_version_mapping:
119122
raise ConfigurationError(f'Unknown `spec_version`, correct ones: {", ".join(spec_version_mapping)}')

src/dipdup/context.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,22 +90,26 @@ async def restart(self) -> None:
9090
sys.argv.remove('--reindex')
9191
os.execl(sys.executable, sys.executable, *sys.argv)
9292

93-
async def reindex(self, reason: Optional[Union[str, ReindexingReason]] = None) -> None:
93+
async def reindex(self, reason: Optional[Union[str, ReindexingReason]] = None, **context) -> None:
9494
"""Drop all tables or whole database and restart with the same CLI arguments"""
95-
reason_str = reason.value if isinstance(reason, ReindexingReason) else 'unknown'
96-
self.logger.warning('Reindexing initialized, reason: %s', reason_str)
97-
98-
if not reason or isinstance(reason, str):
95+
if not reason:
96+
reason = ReindexingReason.MANUAL
97+
elif isinstance(reason, str):
98+
context['message'] = reason
9999
reason = ReindexingReason.MANUAL
100100

101+
reason_str = reason.value + f' ({context["message"]})' if "message" in context else ''
102+
self.logger.warning('Reindexing initialized, reason: %s', reason_str)
103+
self.logger.info('Additional context: %s', context)
104+
101105
if forbid_reindexing:
102106
schema = await Schema.filter().get()
103107
if schema.reindex:
104-
raise ReindexingRequiredError(schema.reindex)
108+
raise ReindexingRequiredError(schema.reindex, context)
105109

106110
schema.reindex = reason
107111
await schema.save()
108-
raise ReindexingRequiredError(schema.reindex)
112+
raise ReindexingRequiredError(schema.reindex, context)
109113

110114
database_config = self.config.database
111115
if isinstance(database_config, PostgresDatabaseConfig):

src/dipdup/datasources/datasource.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from abc import abstractmethod
3-
from typing import Awaitable, Callable, List, Set
3+
from typing import Awaitable, Callable, Set, Tuple
44

55
from dipdup.config import HTTPConfig
66
from dipdup.http import HTTPGateway
@@ -11,8 +11,8 @@
1111

1212

1313
HeadCallbackT = Callable[['IndexDatasource', HeadBlockData], Awaitable[None]]
14-
OperationsCallbackT = Callable[['IndexDatasource', List[OperationData]], Awaitable[None]]
15-
BigMapsCallbackT = Callable[['IndexDatasource', List[BigMapData]], Awaitable[None]]
14+
OperationsCallbackT = Callable[['IndexDatasource', Tuple[OperationData, ...]], Awaitable[None]]
15+
BigMapsCallbackT = Callable[['IndexDatasource', Tuple[BigMapData, ...]], Awaitable[None]]
1616
RollbackCallbackT = Callable[['IndexDatasource', int, int], Awaitable[None]]
1717

1818

@@ -57,11 +57,11 @@ async def emit_head(self, head: HeadBlockData) -> None:
5757
for fn in self._on_head:
5858
await fn(self, head)
5959

60-
async def emit_operations(self, operations: List[OperationData]) -> None:
60+
async def emit_operations(self, operations: Tuple[OperationData, ...]) -> None:
6161
for fn in self._on_operations:
6262
await fn(self, operations)
6363

64-
async def emit_big_maps(self, big_maps: List[BigMapData]) -> None:
64+
async def emit_big_maps(self, big_maps: Tuple[BigMapData, ...]) -> None:
6565
for fn in self._on_big_maps:
6666
await fn(self, big_maps)
6767

0 commit comments

Comments
 (0)