Skip to content

Commit 76803d2

Browse files
Fix Hasura aiohttp session, fix in_global_transaction wrapper, make TemporaryState abstract (#66)
1 parent 90aeffe commit 76803d2

File tree

5 files changed

+76
-59
lines changed

5 files changed

+76
-59
lines changed

src/dipdup/codegen.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ async def generate_types(self) -> None:
224224
if root.split('/')[-1] == 'parameter':
225225
name += '_parameter'
226226

227+
name = snake_to_pascal(name)
227228
self._logger.info('Generating type `%s`', name)
228229
args = [
229230
'datamodel-codegen',
@@ -232,7 +233,7 @@ async def generate_types(self) -> None:
232233
'--output',
233234
output_path,
234235
'--class-name',
235-
snake_to_pascal(name),
236+
name,
236237
'--disable-timestamp',
237238
'--use-default',
238239
]

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,9 @@ async def get_originations(self, addresses: Set[str], offset: int, first_level:
385385
originations.append(self.convert_operation(op))
386386
return originations
387387

388-
async def get_transactions(self, field: str, addresses: Set[str], offset: int, first_level: int, last_level: int) -> List[OperationData]:
388+
async def get_transactions(
389+
self, field: str, addresses: Set[str], offset: int, first_level: int, last_level: int
390+
) -> List[OperationData]:
389391
raw_transactions = await self._proxy.http_request(
390392
'get',
391393
url=f'{self._url}/v1/operations/transactions',

src/dipdup/hasura.py

Lines changed: 50 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
def _is_model_class(obj) -> bool:
2020
"""Is subclass of tortoise.Model, but not the base class"""
21-
return isinstance(obj, type) and issubclass(obj, Model) and obj != Model
21+
return isinstance(obj, type) and issubclass(obj, Model) and obj != Model and not getattr(obj.Meta, 'abstract', False)
2222

2323

2424
def _format_array_relationship(
@@ -148,60 +148,58 @@ async def configure_hasura(config: DipDupConfig):
148148
raise ConfigurationError('`hasura` config section missing')
149149

150150
_logger.info('Configuring Hasura')
151-
152-
session = aiohttp.ClientSession()
153151
url = config.hasura.url.rstrip("/")
154152
hasura_metadata = await generate_hasura_metadata(config)
155153

156-
_logger.info('Waiting for Hasura instance to be healthy')
157-
for _ in range(60):
158-
with suppress(ClientConnectorError, ClientOSError):
159-
async with aiohttp.ClientSession() as session:
154+
async with aiohttp.ClientSession() as session:
155+
_logger.info('Waiting for Hasura instance to be healthy')
156+
for _ in range(60):
157+
with suppress(ClientConnectorError, ClientOSError):
160158
await session.get(f'{url}/healthz')
161159
break
162-
await asyncio.sleep(1)
163-
else:
164-
_logger.error('Hasura instance not responding for 60 seconds')
165-
return
166-
167-
headers = {}
168-
if config.hasura.admin_secret:
169-
headers['X-Hasura-Admin-Secret'] = config.hasura.admin_secret
170-
171-
_logger.info('Fetching existing metadata')
172-
existing_hasura_metadata = await http_request(
173-
session,
174-
'post',
175-
url=f'{url}/v1/query',
176-
data=json.dumps(
177-
{
178-
"type": "export_metadata",
179-
"args": hasura_metadata,
180-
},
181-
),
182-
headers=headers,
183-
)
184-
185-
_logger.info('Merging existing metadata')
186-
hasura_metadata_tables = [table['table'] for table in hasura_metadata['tables']]
187-
for table in existing_hasura_metadata['tables']:
188-
if table['table'] not in hasura_metadata_tables:
189-
hasura_metadata['tables'].append(table)
190-
191-
_logger.info('Sending replace metadata request')
192-
result = await http_request(
193-
session,
194-
'post',
195-
url=f'{url}/v1/query',
196-
data=json.dumps(
197-
{
198-
"type": "replace_metadata",
199-
"args": hasura_metadata,
200-
},
201-
),
202-
headers=headers,
203-
)
204-
if not result.get('message') == 'success':
205-
_logger.error('Can\'t configure Hasura instance: %s', result)
160+
await asyncio.sleep(1)
161+
else:
162+
_logger.error('Hasura instance not responding for 60 seconds')
163+
return
164+
165+
headers = {}
166+
if config.hasura.admin_secret:
167+
headers['X-Hasura-Admin-Secret'] = config.hasura.admin_secret
168+
169+
_logger.info('Fetching existing metadata')
170+
existing_hasura_metadata = await http_request(
171+
session,
172+
'post',
173+
url=f'{url}/v1/query',
174+
data=json.dumps(
175+
{
176+
"type": "export_metadata",
177+
"args": hasura_metadata,
178+
},
179+
),
180+
headers=headers,
181+
)
206182

207-
await session.close()
183+
_logger.info('Merging existing metadata')
184+
hasura_metadata_tables = [table['table'] for table in hasura_metadata['tables']]
185+
for table in existing_hasura_metadata['tables']:
186+
if table['table'] not in hasura_metadata_tables:
187+
hasura_metadata['tables'].append(table)
188+
189+
_logger.info('Sending replace metadata request')
190+
result = await http_request(
191+
session,
192+
'post',
193+
url=f'{url}/v1/query',
194+
data=json.dumps(
195+
{
196+
"type": "replace_metadata",
197+
"args": hasura_metadata,
198+
},
199+
),
200+
headers=headers,
201+
)
202+
if not result.get('message') == 'success':
203+
_logger.error('Can\'t configure Hasura instance: %s', result)
204+
else:
205+
_logger.info('Hasura instance has been configured')

src/dipdup/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class TemporaryState(State):
4444
async def save(self, using_db=None, update_fields=None, force_create=False, force_update=False) -> None:
4545
pass
4646

47+
class Meta:
48+
abstract = True
49+
4750

4851
@dataclass
4952
class OperationData:

src/dipdup/utils.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
import time
77
from contextlib import asynccontextmanager
88
from logging import Logger
9-
from typing import Any, AsyncIterator, Iterator, List, Optional, Sequence
9+
from typing import Any, AsyncIterator, Iterator, List, Optional
1010

1111
import aiohttp
1212
from tortoise import Tortoise
1313
from tortoise.backends.asyncpg.client import AsyncpgDBClient
14+
from tortoise.backends.base.client import TransactionContext
15+
from tortoise.backends.sqlite.client import SqliteClient
1416
from tortoise.transactions import in_transaction
1517

1618
from dipdup import __version__
@@ -78,14 +80,25 @@ async def in_global_transaction():
7880
"""Enforce using transaction for all queries inside wrapped block. Works for a single DB only."""
7981
if list(Tortoise._connections.keys()) != ['default']:
8082
raise RuntimeError('`in_global_transaction` wrapper works only with a single DB connection')
81-
async with in_transaction() as conn:
82-
# NOTE: SQLite hacks
83-
conn.filename = ''
84-
conn.pragmas = {}
8583

84+
async with in_transaction() as conn:
85+
conn: TransactionContext
8686
original_conn = Tortoise._connections['default']
8787
Tortoise._connections['default'] = conn
88+
89+
if isinstance(original_conn, SqliteClient):
90+
conn.filename = original_conn.filename
91+
conn.pragmas = original_conn.pragmas
92+
elif isinstance(original_conn, AsyncpgDBClient):
93+
conn._pool = original_conn._pool
94+
conn._template = original_conn._template
95+
else:
96+
raise NotImplementedError(
97+
'`in_global_transaction` wrapper was not tested with database backends other then aiosqlite and asyncpg'
98+
)
99+
88100
yield
101+
89102
Tortoise._connections['default'] = original_conn
90103

91104

0 commit comments

Comments
 (0)