|
3 | 3 | import logging |
4 | 4 | import re |
5 | 5 | from contextlib import suppress |
| 6 | +from http import HTTPStatus |
6 | 7 | from os.path import dirname, join |
7 | 8 | from typing import Any, Dict, Iterator, List, Optional, Tuple |
8 | 9 |
|
| 10 | +import aiohttp |
9 | 11 | import humps # type: ignore |
10 | 12 | from aiohttp import ClientConnectorError, ClientOSError |
11 | 13 | from pydantic.dataclasses import dataclass |
@@ -60,26 +62,32 @@ class HasuraError(RuntimeError): |
60 | 62 |
|
61 | 63 |
|
62 | 64 | class HasuraGateway(HTTPGateway): |
| 65 | + _default_http_config = HTTPConfig( |
| 66 | + cache=False, |
| 67 | + retry_count=3, |
| 68 | + retry_sleep=1, |
| 69 | + ) |
| 70 | + |
63 | 71 | def __init__( |
64 | 72 | self, |
65 | 73 | package: str, |
66 | 74 | hasura_config: HasuraConfig, |
67 | 75 | database_config: PostgresDatabaseConfig, |
68 | 76 | http_config: Optional[HTTPConfig] = None, |
69 | 77 | ) -> None: |
70 | | - super().__init__(hasura_config.url, http_config) |
| 78 | + super().__init__(hasura_config.url, self._default_http_config.merge(http_config)) |
71 | 79 | self._logger = logging.getLogger('dipdup.hasura') |
72 | 80 | self._package = package |
73 | 81 | self._hasura_config = hasura_config |
74 | 82 | self._database_config = database_config |
75 | 83 |
|
76 | | - async def configure(self, reset: bool = False) -> None: |
| 84 | + async def configure(self) -> None: |
77 | 85 | """Generate Hasura metadata and apply to instance with credentials from `hasura` config section.""" |
78 | 86 |
|
79 | 87 | self._logger.info('Configuring Hasura') |
80 | 88 | await self._healthcheck() |
81 | 89 |
|
82 | | - if reset: |
| 90 | + if self._hasura_config.reset is True: |
83 | 91 | await self._reset_metadata() |
84 | 92 |
|
85 | 93 | metadata = await self._fetch_metadata() |
@@ -132,15 +140,6 @@ async def configure(self, reset: bool = False) -> None: |
132 | 140 |
|
133 | 141 | self._logger.info('Hasura instance has been configured') |
134 | 142 |
|
135 | | - def _default_http_config(self) -> HTTPConfig: |
136 | | - return HTTPConfig( |
137 | | - cache=False, |
138 | | - retry_sleep=1, |
139 | | - retry_multiplier=1.1, |
140 | | - ratelimit_rate=100, |
141 | | - ratelimit_period=1, |
142 | | - ) |
143 | | - |
144 | 143 | async def _hasura_request(self, endpoint: str, json: Dict[str, Any]) -> Dict[str, Any]: |
145 | 144 | self._logger.debug('Sending `%s` request: %s', endpoint, json) |
146 | 145 | result = await self._http.request( |
@@ -188,13 +187,22 @@ async def _fetch_metadata(self) -> Dict[str, Any]: |
188 | 187 |
|
189 | 188 | async def _replace_metadata(self, metadata: Dict[str, Any]) -> None: |
190 | 189 | self._logger.info('Replacing metadata') |
191 | | - await self._hasura_request( |
192 | | - endpoint='query', |
193 | | - json={ |
194 | | - "type": "replace_metadata", |
195 | | - "args": metadata, |
196 | | - }, |
197 | | - ) |
| 190 | + endpoint, json = 'query', { |
| 191 | + "type": "replace_metadata", |
| 192 | + "args": metadata, |
| 193 | + } |
| 194 | + try: |
| 195 | + await self._hasura_request(endpoint, json) |
| 196 | + except aiohttp.ClientResponseError as e: |
| 197 | + # NOTE: 400 from Hasura means we failed either to generate or to merge existing metadata. |
| 198 | + # NOTE: Reset metadata and retry if not forbidden by config. |
| 199 | + print(e.status, self._hasura_config.reset) |
| 200 | + if e.status != HTTPStatus.BAD_REQUEST or self._hasura_config.reset is False: |
| 201 | + print('raise') |
| 202 | + raise |
| 203 | + self._logger.warning('Failed to replace metadata, resetting') |
| 204 | + await self._reset_metadata() |
| 205 | + await self._hasura_request(endpoint, json) |
198 | 206 |
|
199 | 207 | async def _get_views(self) -> List[str]: |
200 | 208 | return [ |
|
0 commit comments