Skip to content

Commit 3f59c34

Browse files
Fix TzKT pagination and Prometheus labels (#264)
* Fixed crash in methods that do not support cursor pagination * Fixed invalid metric labels * Fix pagination of gem_big_maps * Edge case
1 parent 4abf62c commit 3f59c34

File tree

3 files changed

+32
-37
lines changed

3 files changed

+32
-37
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
Please use [this](https://docs.gitlab.com/ee/development/changelog.html) document as guidelines to keep a changelog.
44

5+
## [unreleased]
6+
7+
### Fixed
8+
9+
* tzkt: Fixed crash in methods that do not support cursor pagination.
10+
* prometheus: Fixed invalid metric labels.
11+
512
## 5.0.0-rc1 - 2022-03-02
613

714
### Added

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -228,19 +228,16 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, Tuple[BigMa
228228
Resulting data is splitted by level, deduped, sorted and ready to be processed by BigMapIndex.
229229
"""
230230

231-
offset = 0
232231
big_maps: Tuple[BigMapData, ...] = ()
233232

234233
# TODO: Share code between this and OperationFetcher
235-
while True:
236-
fetched_big_maps = await self._datasource.get_big_maps(
237-
self._big_map_addresses,
238-
self._big_map_paths,
239-
self._first_level,
240-
self._last_level,
241-
cache=self._cache,
242-
offset=offset,
243-
)
234+
big_map_iter = self._datasource.iter_big_maps(
235+
self._big_map_addresses,
236+
self._big_map_paths,
237+
self._first_level,
238+
self._last_level,
239+
)
240+
async for fetched_big_maps in big_map_iter:
244241
big_maps = big_maps + fetched_big_maps
245242

246243
# NOTE: Yield big map slices by level except the last one
@@ -256,11 +253,6 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, Tuple[BigMa
256253
else:
257254
break
258255

259-
if len(fetched_big_maps) < self._datasource.request_limit:
260-
break
261-
262-
offset += self._datasource.request_limit
263-
264256
if big_maps:
265257
yield big_maps[0].level, big_maps
266258

@@ -425,7 +417,7 @@ async def get_contract_big_maps(
425417
'get',
426418
url=f'v1/contracts/{address}/bigmaps',
427419
params={
428-
'offset.cr': offset,
420+
'offset': offset,
429421
'limit': limit,
430422
},
431423
)
@@ -435,7 +427,7 @@ async def iter_contract_big_maps(
435427
self,
436428
address: str,
437429
) -> AsyncIterator[Tuple[Dict[str, Any], ...]]:
438-
async for batch in self._iter_batches(self.get_contract_big_maps, address):
430+
async for batch in self._iter_batches(self.get_contract_big_maps, address, cursor=False):
439431
yield batch
440432

441433
async def get_head_block(self) -> HeadBlockData:
@@ -583,7 +575,7 @@ async def get_big_maps(
583575
"path.in": ",".join(paths),
584576
"level.gt": first_level,
585577
"level.le": last_level,
586-
"offset.cr": offset,
578+
"offset": offset,
587579
"limit": limit,
588580
},
589581
cache=cache,
@@ -605,6 +597,7 @@ async def iter_big_maps(
605597
first_level,
606598
last_level,
607599
cache,
600+
cursor=False,
608601
):
609602
yield batch
610603

@@ -711,15 +704,22 @@ async def _on_subscribe(message: CompletionMessage) -> None:
711704
await self._send(method, request, _on_subscribe)
712705
await event.wait()
713706

714-
async def _iter_batches(self, fn, *args, **kwargs) -> AsyncIterator:
707+
async def _iter_batches(self, fn, *args, cursor: bool = True, **kwargs) -> AsyncIterator:
715708
if 'offset' in kwargs or 'limit' in kwargs:
716709
raise ValueError('`offset` and `limit` arguments are not allowed')
717710
size, offset = self.request_limit, 0
718711
while size == self.request_limit:
719712
result = await fn(*args, offset=offset, **kwargs)
713+
if not result:
714+
return
715+
720716
yield result
721-
offset = result[-1]['id']
717+
722718
size = len(result)
719+
if cursor:
720+
offset = result[-1]['id']
721+
else:
722+
offset += self.request_limit
723723

724724
def _get_ws_client(self) -> SignalRClient:
725725
"""Create SignalR client, register message callbacks"""

src/dipdup/prometheus.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import time
2-
from collections import deque
32
from contextlib import contextmanager
43

54
from prometheus_client import Counter # type: ignore
@@ -15,27 +14,24 @@
1514
_index_level_sync_duration = Histogram(
1615
'dipdup_index_level_sync_duration_seconds',
1716
'Duration of indexing a single level',
18-
['field'],
1917
)
2018
_index_level_realtime_duration = Histogram(
2119
'dipdup_index_level_realtime_duration_seconds',
2220
'Duration of last index syncronization',
23-
['field'],
2421
)
2522
_index_total_sync_duration = Histogram(
2623
'dipdup_index_total_sync_duration_seconds',
2724
'Duration of the last index syncronization',
28-
['field'],
2925
)
3026
_index_total_realtime_duration = Histogram(
3127
'dipdup_index_total_realtime_duration_seconds',
3228
'Duration of the last index realtime syncronization',
33-
['field'],
3429
)
3530

3631
_index_levels_to_sync = Histogram(
3732
'dipdup_index_levels_to_sync_total',
3833
'Number of levels to reach synced state',
34+
['index'],
3935
)
4036
_index_levels_to_realtime = Histogram(
4137
'dipdup_index_levels_to_realtime_total',
@@ -70,14 +66,6 @@
7066
)
7167

7268

73-
@contextmanager
74-
def _average_duration(queue: deque):
75-
start = time.perf_counter()
76-
yield
77-
end = time.perf_counter()
78-
queue.appendleft(end - start)
79-
80-
8169
class Metrics:
8270
enabled = False
8371

@@ -87,25 +75,25 @@ def __new__(cls):
8775
@classmethod
8876
@contextmanager
8977
def measure_level_sync_duration(cls):
90-
with _average_duration(cls._level_sync_durations):
78+
with _index_level_sync_duration.time():
9179
yield
9280

9381
@classmethod
9482
@contextmanager
9583
def measure_level_realtime_duration(cls):
96-
with _average_duration(cls._level_realtime_durations):
84+
with _index_level_realtime_duration.time():
9785
yield
9886

9987
@classmethod
10088
@contextmanager
10189
def measure_total_sync_duration(cls):
102-
with _average_duration(cls._total_sync_durations):
90+
with _index_total_sync_duration.time():
10391
yield
10492

10593
@classmethod
10694
@contextmanager
10795
def measure_total_realtime_duration(cls):
108-
with _average_duration(cls._total_realtime_durations):
96+
with _index_total_realtime_duration.time():
10997
yield
11098

11199
@classmethod

0 commit comments

Comments
 (0)