Skip to content

Commit 049f0e9

Browse files
Caching TzKT request proxy (#39)
1 parent d4acf10 commit 049f0e9

File tree

10 files changed

+86
-34
lines changed

10 files changed

+86
-34
lines changed

poetry.lock

Lines changed: 16 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ datamodel-code-generator = "^0.11.1"
3030
tortoise-orm = "^0.17.1"
3131
pydantic = "^1.8.1"
3232
aiosignalrcore = "^0.9.2"
33+
fcache = "^0.4.7"
3334

3435
[tool.poetry.dev-dependencies]
3536
black = "^20.8b1"

src/dipdup/cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import List
88

99
import click
10+
from fcache.cache import FileCache # type: ignore
1011

1112
from dipdup import __version__
1213
from dipdup.config import DipDupConfig, LoggingConfig
@@ -70,3 +71,10 @@ async def init(ctx):
7071
config: DipDupConfig = ctx.obj.config
7172
dipdup = DipDup(config)
7273
await dipdup.init()
74+
75+
76+
@cli.command(help='Clear development request cache')
77+
@click.pass_context
78+
@click_async
79+
async def clear_cache(ctx):
80+
FileCache('dipdup', flag='cs').clear()

src/dipdup/codegen.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def get(
5050
contract_config: ContractConfig,
5151
) -> Dict[str, Any]:
5252
if datasource_config not in self._datasources:
53-
self._datasources[datasource_config] = TzktDatasource(datasource_config.url)
53+
self._datasources[datasource_config] = TzktDatasource(datasource_config.url, True)
5454
self._schemas[datasource_config] = {}
5555
if contract_config.address not in self._schemas[datasource_config]:
5656
self._logger.info('Fetching schemas for contract `%s`', contract_config.address)

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from dipdup.datasources.tzkt.cache import BigMapCache, OperationCache
2222
from dipdup.datasources.tzkt.enums import TzktMessageType
23+
from dipdup.datasources.tzkt.proxy import TzktRequestProxy
2324
from dipdup.models import (
2425
BigMapAction,
2526
BigMapContext,
@@ -69,7 +70,7 @@
6970

7071

7172
class TzktDatasource:
72-
def __init__(self, url: str):
73+
def __init__(self, url: str, cache: bool):
7374
super().__init__()
7475
self._url = url.rstrip('/')
7576
self._logger = logging.getLogger(__name__)
@@ -85,6 +86,7 @@ def __init__(self, url: str):
8586
self._operation_cache = OperationCache()
8687
self._big_map_cache = BigMapCache()
8788
self._rollback_fn: Optional[Callable[[int, int], Awaitable[None]]] = None
89+
self._proxy = TzktRequestProxy(cache)
8890

8991
async def add_index(self, index_name: str, index_config: Union[OperationIndexConfig, BigMapIndexConfig, BlockIndexConfig]):
9092
self._logger.info('Adding index `%s`', index_name)
@@ -203,7 +205,7 @@ async def subscribe_to_big_maps(self, address: Address, path: Path) -> None:
203205
async def _fetch_operations(self, addresses: List[str], offset: int, first_level: int, last_level: int) -> List[Dict[str, Any]]:
204206
self._logger.info('Fetching levels %s-%s with offset %s', first_level, last_level, offset)
205207

206-
async with http_request(
208+
operations = await self._proxy.http_request(
207209
'get',
208210
url=f'{self._url}/v1/operations/transactions',
209211
params={
@@ -215,10 +217,9 @@ async def _fetch_operations(self, addresses: List[str], offset: int, first_level
215217
"select": ','.join(OPERATION_FIELDS),
216218
"status": "applied",
217219
},
218-
) as resp:
219-
operations = await resp.json()
220+
)
220221

221-
async with http_request(
222+
target_operations = await self._proxy.http_request(
222223
'get',
223224
url=f'{self._url}/v1/operations/transactions',
224225
params={
@@ -230,8 +231,7 @@ async def _fetch_operations(self, addresses: List[str], offset: int, first_level
230231
"select": ','.join(OPERATION_FIELDS),
231232
"status": "applied",
232233
},
233-
) as resp:
234-
target_operations = await resp.json()
234+
)
235235

236236
sender_operation_keys = {op['id'] for op in operations}
237237
for op in target_operations:
@@ -297,7 +297,7 @@ async def _fetch_big_maps(
297297
) -> List[Dict[str, Any]]:
298298
self._logger.info('Fetching levels %s-%s with offset %s', first_level, last_level, offset)
299299

300-
async with http_request(
300+
big_maps = await self._proxy.http_request(
301301
'get',
302302
url=f'{self._url}/v1/bigmaps/updates',
303303
params={
@@ -308,8 +308,7 @@ async def _fetch_big_maps(
308308
"level.gt": first_level,
309309
"level.le": last_level,
310310
},
311-
) as resp:
312-
big_maps = await resp.json()
311+
)
313312

314313
self._logger.info('%s big map updates fetched', len(big_maps))
315314
self._logger.debug(big_maps)
@@ -369,11 +368,10 @@ async def _process_level_big_maps(big_maps):
369368

370369
async def fetch_jsonschemas(self, address: str) -> Dict[str, Any]:
371370
self._logger.info('Fetching jsonschemas for address `%s', address)
372-
async with http_request(
371+
jsonschemas = await self._proxy.http_request(
373372
'get',
374373
url=f'{self._url}/v1/contracts/{address}/interface',
375-
) as response:
376-
jsonschemas = await response.json()
374+
)
377375
self._logger.debug(jsonschemas)
378376
return jsonschemas
379377

@@ -597,10 +595,10 @@ def convert_big_map(cls, big_map_json: Dict[str, Any]) -> BigMapData:
597595

598596
async def get_latest_block(self) -> Dict[str, Any]:
599597
self._logger.info('Fetching latest block')
600-
async with http_request(
598+
block = await self._proxy.http_request(
601599
'get',
602600
url=f'{self._url}/v1/head',
603-
) as resp:
604-
block = await resp.json()
601+
skip_cache=True,
602+
)
605603
self._logger.debug(block)
606604
return block
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import hashlib
2+
import logging
3+
import pickle
4+
5+
from fcache.cache import FileCache # type: ignore
6+
7+
from dipdup.utils import http_request
8+
9+
10+
class TzktRequestProxy:
11+
def __init__(self, cache: bool = False) -> None:
12+
self._logger = logging.getLogger(__name__)
13+
self._cache = FileCache('dipdup', flag='cs') if cache else None
14+
15+
async def http_request(self, method: str, skip_cache: bool = False, **kwargs):
16+
if self._cache is not None and not skip_cache:
17+
key = hashlib.sha256(pickle.dumps([method, kwargs])).hexdigest()
18+
try:
19+
return self._cache[key]
20+
except KeyError:
21+
response = await http_request(method, **kwargs)
22+
self._cache[key] = response
23+
return response
24+
else:
25+
response = await http_request(method, **kwargs)
26+
return response

src/dipdup/dipdup.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,14 @@
1010

1111
import dipdup.codegen as codegen
1212
from dipdup import __version__
13-
from dipdup.config import ROLLBACK_HANDLER, DipDupConfig, IndexTemplateConfig, PostgresDatabaseConfig, TzktDatasourceConfig
13+
from dipdup.config import (
14+
ROLLBACK_HANDLER,
15+
DipDupConfig,
16+
IndexTemplateConfig,
17+
PostgresDatabaseConfig,
18+
SqliteDatabaseConfig,
19+
TzktDatasourceConfig,
20+
)
1421
from dipdup.datasources.tzkt.datasource import TzktDatasource
1522
from dipdup.hasura import configure_hasura
1623
from dipdup.models import IndexType, State
@@ -31,6 +38,7 @@ async def init(self) -> None:
3138

3239
async def run(self) -> None:
3340
url = self._config.database.connection_string
41+
cache = isinstance(self._config.database, SqliteDatabaseConfig)
3442
models = f'{self._config.package}.models'
3543
rollback_fn = getattr(importlib.import_module(f'{self._config.package}.handlers.{ROLLBACK_HANDLER}'), ROLLBACK_HANDLER)
3644

@@ -48,7 +56,7 @@ async def run(self) -> None:
4856
self._logger.info('Processing index `%s`', index_name)
4957
if isinstance(index_config.datasource, TzktDatasourceConfig):
5058
if index_config.datasource_config not in datasources:
51-
datasources[index_config.datasource_config] = TzktDatasource(index_config.datasource_config.url)
59+
datasources[index_config.datasource_config] = TzktDatasource(index_config.datasource_config.url, cache)
5260
datasources[index_config.datasource_config].set_rollback_fn(rollback_fn)
5361
await datasources[index_config.datasource_config].add_index(index_name, index_config)
5462
else:

src/dipdup/hasura.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,8 @@ async def configure_hasura(config: DipDupConfig):
148148
_logger.info('Waiting for Hasura instance to be healthy')
149149
for _ in range(60):
150150
with suppress(ClientConnectorError, ClientOSError):
151-
async with http_request('get', url=f'{url}/healthz') as response:
152-
if response.status == 200:
153-
break
151+
await http_request('get', url=f'{url}/healthz')
152+
break
154153
await asyncio.sleep(1)
155154
else:
156155
_logger.error('Hasura instance not responding for 60 seconds')
@@ -161,7 +160,7 @@ async def configure_hasura(config: DipDupConfig):
161160
headers['X-Hasura-Admin-Secret'] = config.hasura.admin_secret
162161

163162
_logger.info('Fetching existing metadata')
164-
async with http_request(
163+
existing_hasura_metadata = await http_request(
165164
'post',
166165
url=f'{url}/v1/query',
167166
data=json.dumps(
@@ -171,8 +170,7 @@ async def configure_hasura(config: DipDupConfig):
171170
},
172171
),
173172
headers=headers,
174-
) as response:
175-
existing_hasura_metadata = await response.json()
173+
)
176174

177175
_logger.info('Merging existing metadata')
178176
hasura_metadata_tables = [table['table'] for table in hasura_metadata['tables']]
@@ -181,7 +179,7 @@ async def configure_hasura(config: DipDupConfig):
181179
hasura_metadata['tables'].append(table)
182180

183181
_logger.info('Sending replace metadata request')
184-
async with http_request(
182+
result = await http_request(
185183
'post',
186184
url=f'{url}/v1/query',
187185
data=json.dumps(
@@ -191,7 +189,6 @@ async def configure_hasura(config: DipDupConfig):
191189
},
192190
),
193191
headers=headers,
194-
) as response:
195-
result = await response.json()
196-
if not result.get('message') == 'success':
197-
_logger.error('Can\'t configure Hasura instance: %s', result)
192+
)
193+
if not result.get('message') == 'success':
194+
_logger.error('Can\'t configure Hasura instance: %s', result)

src/dipdup/utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ async def tortoise_wrapper(url: str, models: Optional[str] = None):
4646
await Tortoise.close_connections()
4747

4848

49-
@asynccontextmanager
5049
async def http_request(method: str, **kwargs):
5150
async with aiohttp.ClientSession() as session:
5251
headers = {
@@ -60,7 +59,7 @@ async def http_request(method: str, **kwargs):
6059
) as response:
6160
request_string = kwargs['url'] + '?' + '&'.join([f'{key}={value}' for key, value in kwargs.get('params', {}).items()])
6261
_logger.debug('Calling `%s`', request_string)
63-
yield response
62+
return await response.json()
6463

6564

6665
async def reindex():

tests/test_dipdup/test_datasources/test_tzkt/test_datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def asyncSetUp(self):
3535
)
3636
self.index_config.state = State(index_name='test', index_type=IndexType.operation, hash='')
3737
self.index_config.handlers[0].pattern[0].parameter_type_cls = CollectParameter
38-
self.datasource = TzktDatasource('tzkt.test')
38+
self.datasource = TzktDatasource('tzkt.test', True)
3939
await self.datasource.add_index('test', self.index_config)
4040

4141
async def test_convert_operation(self):

0 commit comments

Comments
 (0)