Skip to content

Commit ea33562

Browse files
Fixed loss of real-time subscriptions occurred after TzKT API outage (#163)
* Fixed loss of real-time subscriptions occurred after TzKT API outage * Changelog
1 parent 950b086 commit ea33562

File tree

6 files changed

+63
-8
lines changed

6 files changed

+63
-8
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
## 3.1.1 - [unreleased]
4+
5+
### Fixed
6+
7+
* Fixed loss of real-time subscriptions occurred after TzKT API outage.
8+
* Fixed updating schema hash in `schema approve` command.
9+
* Fixed possible crash occurred while Hasura is not ready.
10+
311
## 3.1.0 - 2021-10-12
412

513
### Added

src/dipdup/cli.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from dipdup.migrations import DipDupMigrationManager, deprecated_handlers
2828
from dipdup.models import Schema
2929
from dipdup.utils import iter_files
30-
from dipdup.utils.database import set_decimal_context, tortoise_wrapper, wipe_schema
30+
from dipdup.utils.database import get_schema_hash, set_decimal_context, tortoise_wrapper, wipe_schema
3131

3232
_logger = logging.getLogger('dipdup.cli')
3333

@@ -238,6 +238,9 @@ async def schema(ctx):
238238
...
239239

240240

241+
from dipdup.enums import ReindexingReason
242+
243+
241244
@schema.command(name='approve', help='Continue indexing with the same schema after crashing with `ReindexingRequiredError`')
242245
@click.pass_context
243246
@cli_wrapper
@@ -247,7 +250,15 @@ async def schema_approve(ctx):
247250
models = f'{config.package}.models'
248251

249252
async with tortoise_wrapper(url, models):
250-
await Schema.filter().update(reindex=None)
253+
schema = await Schema.filter().get()
254+
if not schema.reindex:
255+
return
256+
257+
if schema.reindex == ReindexingReason.SCHEMA_HASH_MISMATCH:
258+
conn = get_connection(None)
259+
schema.hash = get_schema_hash(conn)
260+
schema.reindex = None
261+
await schema.save()
251262

252263

253264
@schema.command(name='wipe', help='Drop all database tables, functions and views')

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from dipdup.enums import MessageType
3232
from dipdup.models import BigMapAction, BigMapData, BlockData, HeadBlockData, OperationData, QuoteData
3333
from dipdup.utils import split_by_chunks
34+
from dipdup.utils.watchdog import Watchdog
3435

3536
TZKT_ORIGINATIONS_REQUEST_LIMIT = 100
3637

@@ -266,9 +267,11 @@ def __init__(
266267
self,
267268
url: str,
268269
http_config: Optional[HTTPConfig] = None,
270+
watchdog: Optional[Watchdog] = None,
269271
) -> None:
270272
super().__init__(url, self._default_http_config.merge(http_config))
271273
self._logger = logging.getLogger('dipdup.tzkt')
274+
self._watchdog = watchdog
272275

273276
self._transaction_subscriptions: Set[str] = set()
274277
self._origination_subscriptions: bool = False
@@ -546,11 +549,13 @@ def _get_ws_client(self) -> BaseHubConnection:
546549
return self._ws_client
547550

548551
async def run(self) -> None:
549-
"""Main loop. Sync indexes via REST, start WS connection"""
550-
self._logger.info('Starting datasource')
552+
self._logger.info('Establishing realtime connection')
553+
tasks = [asyncio.create_task(self._get_ws_client().start())]
551554

552-
self._logger.info('Starting websocket client')
553-
await self._get_ws_client().start()
555+
if self._watchdog:
556+
tasks.append(asyncio.create_task(self._watchdog.run()))
557+
558+
await asyncio.gather(*tasks)
554559

555560
async def _on_connect(self) -> None:
556561
"""Subscribe to all required channels on established WS connection"""
@@ -680,6 +685,9 @@ async def _on_big_maps_message(self, message: List[Dict[str, Any]]) -> None:
680685
async def _on_head_message(self, message: List[Dict[str, Any]]) -> None:
681686
"""Parse and emit raw head block from WS"""
682687
async for data in self._extract_message_data(MessageType.head, message):
688+
if self._watchdog:
689+
self._watchdog.reset()
690+
683691
block = self.convert_head_block(data)
684692
await self.emit_head(block)
685693

src/dipdup/dipdup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from dipdup.scheduler import add_job, create_scheduler
3939
from dipdup.utils import slowdown
4040
from dipdup.utils.database import generate_schema, get_schema_hash, prepare_models, set_schema, tortoise_wrapper, validate_models
41+
from dipdup.utils.watchdog import Watchdog
4142

4243

4344
class IndexDispatcher:
@@ -293,6 +294,7 @@ async def _create_datasources(self) -> None:
293294
datasource = TzktDatasource(
294295
url=datasource_config.url,
295296
http_config=datasource_config.http,
297+
watchdog=Watchdog(120.0, self._ctx.restart),
296298
)
297299
elif isinstance(datasource_config, BcdDatasourceConfig):
298300
datasource = BcdDatasource(

src/dipdup/hasura.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple
99

1010
import humps # type: ignore
11-
from aiohttp import ClientConnectorError, ClientOSError
11+
from aiohttp import ClientConnectorError, ClientOSError, ServerDisconnectedError
1212
from pydantic.dataclasses import dataclass
1313
from tortoise import fields
1414
from tortoise.transactions import get_connection
@@ -145,7 +145,7 @@ async def _healthcheck(self) -> None:
145145
self._logger.info('Waiting for Hasura instance to be ready')
146146
timeout = self._http_config.connection_timeout or 60
147147
for _ in range(timeout):
148-
with suppress(ClientConnectorError, ClientOSError):
148+
with suppress(ClientConnectorError, ClientOSError, ServerDisconnectedError):
149149
response = await self._http._session.get(f'{self._hasura_config.url}/healthz')
150150
if response.status == 200:
151151
break

src/dipdup/utils/watchdog.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import asyncio
2+
import logging
3+
from typing import Awaitable, Callable
4+
5+
6+
class Watchdog:
7+
"""Simple keepalive task."""
8+
9+
def __init__(self, timeout: float, fn: Callable[..., Awaitable[None]]) -> None:
10+
self._logger = logging.getLogger('watchdog')
11+
self._timeout = timeout
12+
self._fn = fn
13+
self._event = asyncio.Event()
14+
15+
async def run(self) -> None:
16+
while True:
17+
try:
18+
await asyncio.wait_for(self._event.wait(), self._timeout)
19+
await asyncio.sleep(0)
20+
except asyncio.TimeoutError:
21+
self._logger.critical('Watchdog has timed out.')
22+
await self._fn()
23+
24+
def reset(self) -> None:
25+
self._event.set()
26+
self._event.clear()

0 commit comments

Comments
 (0)