Skip to content

Commit 8b680fb

Browse files
Fix data loss in skip_history, fix default sqlite path (#266)
* Fix data loss in `skip_history` and default sqlite path * Pagination fixes * Another attempt * similar_contracts test * faster get_originated * originated contracts test * More tzkt tests * even more tzkt tests * more tests * Check for offset.cr * Cleanup
1 parent 3cb3ca3 commit 8b680fb

File tree

5 files changed

+262
-49
lines changed

5 files changed

+262
-49
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
Please use [this](https://docs.gitlab.com/ee/development/changelog.html) document as guidelines to keep a changelog.
44

5+
## [unreleased]
6+
7+
### Fixed
8+
9+
* config: Fixed default SQLite path (`:memory:`).
10+
* tzkt: Fixed pagination in several getter methods.
11+
* tzkt: Fixed data loss when `skip_history` option is enabled.
12+
513
## 5.0.0-rc2 - 2022-03-13
614

715
### Fixed

src/dipdup/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
DEFAULT_POSTGRES_SCHEMA = 'public'
6363
DEFAULT_POSTGRES_USER = DEFAULT_POSTGRES_DATABASE = 'postgres'
6464
DEFAULT_POSTGRES_PORT = 5432
65-
DEFAULT_SQLITE_PATH = ':memory'
65+
DEFAULT_SQLITE_PATH = ':memory:'
6666

6767
_logger = logging.getLogger('dipdup.config')
6868

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -305,19 +305,19 @@ async def get_similar_contracts(
305305
'get',
306306
url=f'v1/contracts/{address}/{entrypoint}',
307307
params={
308-
'select': 'id,address',
309-
'offset.cr': offset,
308+
'select': 'address',
309+
'offset': offset,
310310
'limit': limit,
311311
},
312312
)
313-
return tuple(c['address'] for c in response)
313+
return tuple(response)
314314

315315
async def iter_similar_contracts(
316316
self,
317317
address: str,
318318
strict: bool = False,
319319
) -> AsyncIterator[Tuple[str, ...]]:
320-
async for batch in self._iter_batches(self.get_similar_contracts, address, strict):
320+
async for batch in self._iter_batches(self.get_similar_contracts, address, strict, cursor=False):
321321
yield batch
322322

323323
async def get_originated_contracts(
@@ -333,15 +333,15 @@ async def get_originated_contracts(
333333
'get',
334334
url=f'v1/accounts/{address}/contracts',
335335
params={
336-
'select': 'id,address',
337-
'offset.cr': offset,
336+
'select': 'address',
337+
'offset': offset,
338338
'limit': limit,
339339
},
340340
)
341-
return tuple(c['address'] for c in response)
341+
return tuple(response)
342342

343343
async def iter_originated_contracts(self, address: str) -> AsyncIterator[Tuple[str, ...]]:
344-
async for batch in self._iter_batches(self.get_originated_contracts, address):
344+
async for batch in self._iter_batches(self.get_originated_contracts, address, cursor=False):
345345
yield batch
346346

347347
async def get_contract_summary(self, address: str) -> Dict[str, Any]:
@@ -386,7 +386,7 @@ async def get_big_map(
386386
params={
387387
**kwargs,
388388
'level': level,
389-
'offset.cr': offset,
389+
'offset': offset,
390390
'limit': limit,
391391
},
392392
)
@@ -403,6 +403,7 @@ async def iter_big_map(
403403
big_map_id,
404404
level,
405405
active,
406+
cursor=False,
406407
):
407408
yield batch
408409

@@ -413,6 +414,7 @@ async def get_contract_big_maps(
413414
limit: Optional[int] = None,
414415
) -> Tuple[Dict[str, Any], ...]:
415416
offset, limit = offset or 0, limit or self.request_limit
417+
# TODO: Can we cache it?
416418
big_maps = await self.request(
417419
'get',
418420
url=f'v1/contracts/{address}/bigmaps',
@@ -705,8 +707,9 @@ async def _on_subscribe(message: CompletionMessage) -> None:
705707
await event.wait()
706708

707709
async def _iter_batches(self, fn, *args, cursor: bool = True, **kwargs) -> AsyncIterator:
708-
if 'offset' in kwargs or 'limit' in kwargs:
710+
if set(kwargs).intersection(('offset', 'offset.cr', 'limit')):
709711
raise ValueError('`offset` and `limit` arguments are not allowed')
712+
710713
size, offset = self.request_limit, 0
711714
while size == self.request_limit:
712715
result = await fn(*args, offset=offset, **kwargs)
@@ -717,7 +720,11 @@ async def _iter_batches(self, fn, *args, cursor: bool = True, **kwargs) -> Async
717720

718721
size = len(result)
719722
if cursor:
720-
offset = result[-1]['id']
723+
# NOTE: Guess if response is already deserialized or not
724+
try:
725+
offset = result[-1]['id']
726+
except TypeError:
727+
offset = result[-1].id
721728
else:
722729
offset += self.request_limit
723730

src/dipdup/index.py

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from collections import namedtuple
66
from contextlib import ExitStack
77
from datetime import datetime
8-
from typing import Any
98
from typing import DefaultDict
109
from typing import Deque
1110
from typing import Dict
@@ -635,8 +634,8 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
635634
async def _synchronize_full(self, first_level: int, last_level: int, cache: bool = False) -> None:
636635
self._logger.info('Fetching big map diffs from level %s to %s', first_level, last_level)
637636

638-
big_map_addresses = await self._get_big_map_addresses()
639-
big_map_paths = await self._get_big_map_paths()
637+
big_map_addresses = self._get_big_map_addresses()
638+
big_map_paths = self._get_big_map_paths()
640639

641640
fetcher = BigMapFetcher(
642641
datasource=self._datasource,
@@ -659,42 +658,38 @@ async def _synchronize_level(self, last_level: int, cache: bool = False) -> None
659658
if not self._ctx.config.advanced.early_realtime:
660659
raise ConfigurationError('`skip_history` requires `early_realtime` feature flag to be enabled')
661660

662-
big_map_addresses = await self._get_big_map_addresses()
663-
big_map_paths = await self._get_big_map_paths()
664-
big_map_ids: Set[Tuple[int, str]] = set()
661+
big_map_pairs = self._get_big_map_pairs()
662+
big_map_ids: Set[Tuple[int, str, str]] = set()
665663

666-
for address in big_map_addresses:
664+
for address, path in big_map_pairs:
667665
async for contract_big_maps in self._datasource.iter_contract_big_maps(address):
668666
for contract_big_map in contract_big_maps:
669-
if contract_big_map['path'] in big_map_paths:
670-
big_map_ids.add((int(contract_big_map['ptr']), contract_big_map['path']))
667+
if contract_big_map['path'] == path:
668+
big_map_ids.add((int(contract_big_map['ptr']), address, path))
671669

672670
# NOTE: Do not use `_process_level_big_maps` here; we want to maintain transaction manually.
673-
async def _process_big_map_batch(big_maps: Tuple[Dict[str, Any], ...], path: str) -> None:
674-
big_map_data = tuple(
675-
BigMapData(
676-
id=big_map['id'],
677-
level=last_level,
678-
operation_id=last_level,
679-
timestamp=datetime.now(),
680-
bigmap=big_map_id,
681-
contract_address=address,
682-
path=path,
683-
action=BigMapAction.ADD_KEY,
684-
active=big_map['active'],
685-
key=big_map['key'],
686-
value=big_map['value'],
687-
)
688-
for big_map in big_maps
689-
)
690-
matched_handlers = await self._match_big_maps(big_map_data)
691-
for handler_config, big_map_diff in matched_handlers:
692-
await self._call_matched_handler(handler_config, big_map_diff)
693-
694671
async with in_global_transaction():
695-
for big_map_id, path in big_map_ids:
696-
async for big_maps in self._datasource.iter_big_map(big_map_id, last_level):
697-
await _process_big_map_batch(big_maps, path)
672+
for big_map_id, address, path in big_map_ids:
673+
async for big_map_keys in self._datasource.iter_big_map(big_map_id, last_level):
674+
big_map_data = tuple(
675+
BigMapData(
676+
id=big_map_key['id'],
677+
level=last_level,
678+
operation_id=last_level,
679+
timestamp=datetime.now(),
680+
bigmap=big_map_id,
681+
contract_address=address,
682+
path=path,
683+
action=BigMapAction.ADD_KEY,
684+
active=big_map_key['active'],
685+
key=big_map_key['key'],
686+
value=big_map_key['value'],
687+
)
688+
for big_map_key in big_map_keys
689+
)
690+
matched_handlers = await self._match_big_maps(big_map_data)
691+
for handler_config, big_map_diff in matched_handlers:
692+
await self._call_matched_handler(handler_config, big_map_diff)
698693

699694
await self.state.update_status(level=last_level)
700695

@@ -767,8 +762,8 @@ async def _match_big_maps(self, big_maps: Iterable[BigMapData]) -> Deque[Matched
767762
"""Try to match big map diffs in cache with all patterns from indexes."""
768763
matched_handlers: Deque[MatchedBigMapsT] = deque()
769764

770-
for big_map in big_maps:
771-
for handler_config in self._config.handlers:
765+
for handler_config in self._config.handlers:
766+
for big_map in big_maps:
772767
big_map_matched = await self._match_big_map(handler_config, big_map)
773768
if big_map_matched:
774769
arg = await self._prepare_handler_args(handler_config, big_map)
@@ -789,20 +784,32 @@ async def _call_matched_handler(self, handler_config: BigMapHandlerConfig, big_m
789784
big_map_diff,
790785
)
791786

792-
async def _get_big_map_addresses(self) -> Set[str]:
787+
def _get_big_map_addresses(self) -> Set[str]:
793788
"""Get addresses to fetch big map diffs from during initial synchronization"""
794789
addresses = set()
795790
for handler_config in self._config.handlers:
796791
addresses.add(cast(ContractConfig, handler_config.contract).address)
797792
return addresses
798793

799-
async def _get_big_map_paths(self) -> Set[str]:
794+
def _get_big_map_paths(self) -> Set[str]:
800795
"""Get addresses to fetch big map diffs from during initial synchronization"""
801796
paths = set()
802797
for handler_config in self._config.handlers:
803798
paths.add(handler_config.path)
804799
return paths
805800

801+
def _get_big_map_pairs(self) -> Set[Tuple[str, str]]:
802+
"""Get address-path pairs for fetch big map diffs during sync with `skip_history`"""
803+
pairs = set()
804+
for handler_config in self._config.handlers:
805+
pairs.add(
806+
(
807+
cast(ContractConfig, handler_config.contract).address,
808+
handler_config.path,
809+
)
810+
)
811+
return pairs
812+
806813

807814
class HeadIndex(Index):
808815
_config: HeadIndexConfig

0 commit comments

Comments
 (0)