Skip to content

Commit d8bbb3f

Browse files
committed
Fix data loss in skip_history, fix default sqlite path (#266)
* Fix data loss in `skip_history` and default sqlite path * Pagination fixes * Another attempt * similar_contracts test * faster get_originated * originated contracts test * More tzkt tests * even more tzkt tests * more tests * Check for offset.cr * Cleanup
1 parent 56679f6 commit d8bbb3f

File tree

5 files changed

+250
-39
lines changed

5 files changed

+250
-39
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
Please use [this](https://docs.gitlab.com/ee/development/changelog.html) document as guidelines to keep a changelog.
44

5+
## [unreleased]
6+
7+
### Fixed
8+
9+
* config: Fixed default SQLite path (`:memory:`).
10+
* tzkt: Fixed data loss when `skip_history` option is enabled.
11+
512
## 4.2.6 - 2022-02-25
613

714
### Fixed

src/dipdup/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
DEFAULT_POSTGRES_SCHEMA = 'public'
6363
DEFAULT_POSTGRES_USER = DEFAULT_POSTGRES_DATABASE = 'postgres'
6464
DEFAULT_POSTGRES_PORT = 5432
65-
DEFAULT_SQLITE_PATH = ':memory'
65+
DEFAULT_SQLITE_PATH = ':memory:'
6666

6767
_logger = logging.getLogger('dipdup.config')
6868

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ async def get_similar_contracts(self, address: str, strict: bool = False) -> Tup
316316
'get',
317317
url=f'v1/contracts/{address}/{entrypoint}',
318318
params={
319-
'select': 'address',
319+
'select': 'id,address',
320320
'limit': self.request_limit,
321321
'offset': offset,
322322
},
@@ -339,7 +339,7 @@ async def get_originated_contracts(self, address: str) -> Tuple[str, ...]:
339339
'get',
340340
url=f'v1/accounts/{address}/contracts',
341341
params={
342-
'select': 'address',
342+
'select': 'id,address',
343343
'limit': self.request_limit,
344344
'offset': offset,
345345
},

src/dipdup/index.py

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,8 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
593593
async def _synchronize_full(self, first_level: int, last_level: int, cache: bool = False) -> None:
594594
self._logger.info('Fetching big map diffs from level %s to %s', first_level, last_level)
595595

596-
big_map_addresses = await self._get_big_map_addresses()
597-
big_map_paths = await self._get_big_map_paths()
596+
big_map_addresses = self._get_big_map_addresses()
597+
big_map_paths = self._get_big_map_paths()
598598

599599
fetcher = BigMapFetcher(
600600
datasource=self._datasource,
@@ -613,37 +613,38 @@ async def _synchronize_level(self, last_level: int, cache: bool = False) -> None
613613
if not self._ctx.config.advanced.early_realtime:
614614
raise ConfigurationError('`skip_history` requires `early_realtime` feature flag to be enabled')
615615

616-
big_map_addresses = await self._get_big_map_addresses()
617-
big_map_paths = await self._get_big_map_paths()
618-
big_map_ids: Tuple[Tuple[int, str], ...] = ()
619-
big_map_data: Tuple[BigMapData, ...] = ()
620-
621-
for address in big_map_addresses:
622-
contract_big_maps = await self._datasource.get_contract_big_maps(address)
623-
for contract_big_map in contract_big_maps:
624-
if contract_big_map['path'] in big_map_paths:
625-
big_map_ids += ((int(contract_big_map['ptr']), contract_big_map['path']),)
626-
627-
for bigmap_id, path in big_map_ids:
628-
big_maps = await self._datasource.get_big_map(bigmap_id, last_level)
629-
big_map_data = big_map_data + tuple(
630-
BigMapData(
631-
id=big_map['id'],
632-
level=last_level,
633-
operation_id=last_level,
634-
timestamp=datetime.now(),
635-
bigmap=bigmap_id,
636-
contract_address=address,
637-
path=path,
638-
action=BigMapAction.ADD_KEY,
639-
active=big_map['active'],
640-
key=big_map['key'],
641-
value=big_map['value'],
642-
)
643-
for big_map in big_maps
644-
)
616+
big_map_pairs = self._get_big_map_pairs()
617+
big_map_ids: Set[Tuple[int, str, str]] = set()
645618

646-
await self._process_level_big_maps(big_map_data)
619+
for address, path in big_map_pairs:
620+
async for contract_big_maps in self._datasource.iter_contract_big_maps(address):
621+
for contract_big_map in contract_big_maps:
622+
if contract_big_map['path'] == path:
623+
big_map_ids.add((int(contract_big_map['ptr']), address, path))
624+
625+
# NOTE: Do not use `_process_level_big_maps` here; we want to maintain transaction manually.
626+
async with in_global_transaction():
627+
for big_map_id, address, path in big_map_ids:
628+
async for big_map_keys in self._datasource.iter_big_map(big_map_id, last_level):
629+
big_map_data = tuple(
630+
BigMapData(
631+
id=big_map_key['id'],
632+
level=last_level,
633+
operation_id=last_level,
634+
timestamp=datetime.now(),
635+
bigmap=big_map_id,
636+
contract_address=address,
637+
path=path,
638+
action=BigMapAction.ADD_KEY,
639+
active=big_map_key['active'],
640+
key=big_map_key['key'],
641+
value=big_map_key['value'],
642+
)
643+
for big_map_key in big_map_keys
644+
)
645+
matched_handlers = await self._match_big_maps(big_map_data)
646+
for handler_config, big_map_diff in matched_handlers:
647+
await self._call_matched_handler(handler_config, big_map_diff)
647648

648649
async def _process_level_big_maps(self, big_maps: Tuple[BigMapData, ...]):
649650
if not big_maps:
@@ -714,8 +715,8 @@ async def _match_big_maps(self, big_maps: Iterable[BigMapData]) -> Deque[Matched
714715
"""Try to match big map diffs in cache with all patterns from indexes."""
715716
matched_handlers: Deque[MatchedBigMapsT] = deque()
716717

717-
for big_map in big_maps:
718-
for handler_config in self._config.handlers:
718+
for handler_config in self._config.handlers:
719+
for big_map in big_maps:
719720
big_map_matched = await self._match_big_map(handler_config, big_map)
720721
if big_map_matched:
721722
arg = await self._prepare_handler_args(handler_config, big_map)
@@ -736,20 +737,32 @@ async def _call_matched_handler(self, handler_config: BigMapHandlerConfig, big_m
736737
big_map_diff,
737738
)
738739

739-
async def _get_big_map_addresses(self) -> Set[str]:
740+
def _get_big_map_addresses(self) -> Set[str]:
740741
"""Get addresses to fetch big map diffs from during initial synchronization"""
741742
addresses = set()
742743
for handler_config in self._config.handlers:
743744
addresses.add(cast(ContractConfig, handler_config.contract).address)
744745
return addresses
745746

746-
async def _get_big_map_paths(self) -> Set[str]:
747+
def _get_big_map_paths(self) -> Set[str]:
747748
"""Get addresses to fetch big map diffs from during initial synchronization"""
748749
paths = set()
749750
for handler_config in self._config.handlers:
750751
paths.add(handler_config.path)
751752
return paths
752753

754+
def _get_big_map_pairs(self) -> Set[Tuple[str, str]]:
755+
"""Get address-path pairs for fetch big map diffs during sync with `skip_history`"""
756+
pairs = set()
757+
for handler_config in self._config.handlers:
758+
pairs.add(
759+
(
760+
cast(ContractConfig, handler_config.contract).address,
761+
handler_config.path,
762+
)
763+
)
764+
return pairs
765+
753766

754767
class HeadIndex(Index):
755768
_config: HeadIndexConfig
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
from contextlib import asynccontextmanager
2+
from typing import AsyncIterator
3+
from unittest import IsolatedAsyncioTestCase
4+
5+
from dipdup.config import HTTPConfig
6+
from dipdup.datasources.tzkt.datasource import TzktDatasource
7+
8+
# from unittest import skip
9+
10+
11+
@asynccontextmanager
12+
async def with_tzkt(batch_size: int):
13+
config = HTTPConfig(batch_size=batch_size)
14+
datasource = TzktDatasource('https://api.tzkt.io', config)
15+
async with datasource:
16+
yield datasource
17+
18+
19+
async def take_two(iterable: AsyncIterator):
20+
result = ()
21+
left = 2
22+
async for batch in iterable:
23+
result = result + batch
24+
left -= 1
25+
if not left:
26+
return result
27+
raise RuntimeError
28+
29+
30+
class TzktDatasourceTest(IsolatedAsyncioTestCase):
31+
async def test_get_similar_contracts(self) -> None:
32+
async with with_tzkt(2) as tzkt:
33+
contracts = await tzkt.get_similar_contracts(
34+
address='KT1WBLrLE2vG8SedBqiSJFm4VVAZZBytJYHc',
35+
strict=False,
36+
)
37+
self.assertEqual(
38+
('KT1W3VGRUjvS869r4ror8kdaxqJAZUbPyjMT', 'KT1K4EwTpbvYN9agJdjpyJm4ZZdhpUNKB3F6'),
39+
contracts,
40+
)
41+
42+
contracts = await tzkt.get_similar_contracts(
43+
address='KT1WBLrLE2vG8SedBqiSJFm4VVAZZBytJYHc',
44+
strict=True,
45+
)
46+
self.assertEqual(
47+
('KT1W3VGRUjvS869r4ror8kdaxqJAZUbPyjMT', 'KT1K4EwTpbvYN9agJdjpyJm4ZZdhpUNKB3F6'),
48+
contracts,
49+
)
50+
51+
async def test_iter_similar_contracts(self):
52+
async with with_tzkt(1) as tzkt:
53+
contracts = await take_two(
54+
tzkt.iter_similar_contracts(
55+
address='KT1WBLrLE2vG8SedBqiSJFm4VVAZZBytJYHc',
56+
strict=False,
57+
)
58+
)
59+
self.assertEqual(
60+
('KT1W3VGRUjvS869r4ror8kdaxqJAZUbPyjMT', 'KT1K4EwTpbvYN9agJdjpyJm4ZZdhpUNKB3F6'),
61+
contracts,
62+
)
63+
64+
contracts = await take_two(
65+
tzkt.iter_similar_contracts(
66+
address='KT1WBLrLE2vG8SedBqiSJFm4VVAZZBytJYHc',
67+
strict=True,
68+
)
69+
)
70+
self.assertEqual(
71+
('KT1W3VGRUjvS869r4ror8kdaxqJAZUbPyjMT', 'KT1K4EwTpbvYN9agJdjpyJm4ZZdhpUNKB3F6'),
72+
contracts,
73+
)
74+
75+
async def test_get_originated_contracts(self) -> None:
76+
async with with_tzkt(2) as tzkt:
77+
contracts = await tzkt.get_originated_contracts(
78+
address='KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
79+
)
80+
self.assertEqual(
81+
'KT1X1LgNkQShpF9nRLYw3Dgdy4qp38MX617z',
82+
contracts[0]['address'],
83+
)
84+
self.assertEqual(
85+
'KT1BgezWwHBxA9NrczwK9x3zfgFnUkc7JJ4b',
86+
contracts[1]['address'],
87+
)
88+
89+
async def iter_originated_contracts(self):
90+
async with with_tzkt(1) as tzkt:
91+
contracts = await take_two(
92+
tzkt.iter_originated_contracts(
93+
address='KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
94+
)
95+
)
96+
self.assertEqual(
97+
'KT1X1LgNkQShpF9nRLYw3Dgdy4qp38MX617z',
98+
contracts[0]['address'],
99+
)
100+
self.assertEqual(
101+
'KT1BgezWwHBxA9NrczwK9x3zfgFnUkc7JJ4b',
102+
contracts[1]['address'],
103+
)
104+
105+
async def test_get_contract_summary(self):
106+
async with with_tzkt(1) as tzkt:
107+
contract = await tzkt.get_contract_summary(
108+
address='KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
109+
)
110+
self.assertEqual(
111+
'KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
112+
contract['address'],
113+
)
114+
115+
async def test_get_contract_storage(self):
116+
async with with_tzkt(1) as tzkt:
117+
storage = await tzkt.get_contract_storage(
118+
address='KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
119+
)
120+
self.assertEqual(
121+
1451,
122+
storage['token_lambdas'],
123+
)
124+
125+
async def test_get_jsonschemas(self):
126+
async with with_tzkt(1) as tzkt:
127+
jsonschemas = await tzkt.get_jsonschemas(
128+
address='KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
129+
)
130+
self.assertEqual(
131+
'string',
132+
jsonschemas['storageSchema']['properties']['baker_validator']['type'],
133+
)
134+
135+
async def test_get_big_map(self):
136+
async with with_tzkt(2) as tzkt:
137+
big_map_keys = await tzkt.get_big_map(
138+
big_map_id=55031,
139+
level=550310,
140+
)
141+
self.assertEqual(
142+
(12392933, 12393108),
143+
(big_map_keys[0]['id'], big_map_keys[1]['id']),
144+
)
145+
146+
async def test_iter_big_map(self):
147+
async with with_tzkt(1) as tzkt:
148+
big_map_keys = await take_two(
149+
tzkt.iter_big_map(
150+
big_map_id=55031,
151+
level=550310,
152+
)
153+
)
154+
self.assertEqual(
155+
(12392933, 12393108),
156+
(big_map_keys[0]['id'], big_map_keys[1]['id']),
157+
)
158+
159+
async def test_get_contract_big_maps(self):
160+
async with with_tzkt(2) as tzkt:
161+
big_maps = await tzkt.get_contract_big_maps(
162+
address='KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
163+
)
164+
self.assertEqual(
165+
('votes', 'voters'),
166+
(big_maps[0]['path'], big_maps[1]['path']),
167+
)
168+
169+
async def test_iter_contract_big_maps(self):
170+
async with with_tzkt(1) as tzkt:
171+
big_maps = await take_two(
172+
tzkt.iter_contract_big_maps(
173+
address='KT1Lw8hCoaBrHeTeMXbqHPG4sS4K1xn7yKcD',
174+
)
175+
)
176+
self.assertEqual(
177+
('votes', 'voters'),
178+
(big_maps[0]['path'], big_maps[1]['path']),
179+
)
180+
181+
async def test_get_migration_originations(self):
182+
async with with_tzkt(2) as tzkt:
183+
originations = await tzkt.get_migration_originations()
184+
self.assertEqual(67955553, originations[0].id)
185+
self.assertEqual(67955554, originations[1].id)
186+
187+
async def test_iter_migration_originations(self):
188+
async with with_tzkt(1) as tzkt:
189+
originations = await take_two(tzkt.iter_migration_originations())
190+
self.assertEqual(67955553, originations[0].id)
191+
self.assertEqual(67955554, originations[1].id)

0 commit comments

Comments
 (0)