Skip to content

Commit 0a98870

Browse files
Fetch operations before starting WS connection (#7)
1 parent c6fa440 commit 0a98870

File tree

2 files changed

+33
-4
lines changed

2 files changed

+33
-4
lines changed

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,15 @@ def _get_client(self) -> BaseHubConnection:
7272

7373
async def start(self):
7474
self._logger.info('Starting datasource')
75-
for config in self._operation_index_configs.values():
76-
await self.add_subscription(config.contract_config.address)
75+
for operation_index_config in self._operation_index_configs.values():
76+
await self.add_subscription(operation_index_config.contract)
77+
78+
latest_block = await self.get_latest_block()
79+
current_level = latest_block['level']
80+
state_level = operation_index_config.state.level
81+
82+
if current_level != state_level:
83+
await self.fetch_operations(state_level)
7784

7885
self._logger.info('Starting websocket client')
7986
await self._get_client().start()
@@ -299,3 +306,17 @@ def convert_operation(cls, operation_json: Dict[str, Any]) -> OperationData:
299306
initiator_address=operation_json.get('initiator', {}).get('address'),
300307
parameter=operation_json.get('parameters'),
301308
)
309+
310+
async def get_latest_block(self) -> Dict[str, Any]:
311+
self._logger.info('Fetching latest block')
312+
async with aiohttp.ClientSession() as session:
313+
async with session.get(
314+
url=f'{self._url}/v1/blocks',
315+
params={
316+
"limit": 1,
317+
"sort.desc": "id",
318+
},
319+
) as resp:
320+
blocks = await resp.json()
321+
self._logger.debug(blocks)
322+
return blocks[0]

tests/test_dipdup/test_datasources/test_tzkt/test_datasource.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,17 @@ async def test_start(self):
5757
client = self.datasource._get_client()
5858
client.start = AsyncMock()
5959

60-
await self.datasource.start()
60+
fetch_operations_mock = AsyncMock()
61+
self.datasource.fetch_operations = fetch_operations_mock
62+
63+
get_mock = MagicMock()
64+
get_mock.return_value.__aenter__.return_value.json.return_value = [{'level': 1337}]
65+
66+
with patch('aiohttp.ClientSession.get', get_mock):
67+
await self.datasource.start()
6168

62-
self.assertEqual({self.index_config.contract.address: ['transaction']}, self.datasource._subscriptions)
69+
fetch_operations_mock.assert_awaited_with(0)
70+
self.assertEqual({self.index_config.contract: ['transaction']}, self.datasource._subscriptions)
6371
client.start.assert_awaited()
6472

6573
async def test_on_connect_subscribe_to_operations(self):

0 commit comments

Comments
 (0)