Skip to content

Commit 2f5391f

Browse files
Hasura config and table customisation improvements (#312)
* hasura config: add_source * hasura: only customise tables from source adding source with pg_add_source * use_prepared_statements = True * use postgres url instead of connection_parameters * Changelog, rename config field, update tests * Apply review patch Co-authored-by: Lev Gorodetskiy <[email protected]>
1 parent 1e39acc commit 2f5391f

File tree

4 files changed

+78
-16
lines changed

4 files changed

+78
-16
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# Changelog
22

3+
## [unreleased]
4+
5+
### Added
6+
7+
* config: Added `hasura.create_source` flag to create PostgreSQL source if missing.
8+
9+
### Fixed
10+
11+
* hasura: Do not apply table customizations to tables from other sources.
12+
313
## 5.1.7 - 2022-06-15
414

515
### Fixed

src/dipdup/config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,16 @@ def connection_string(self) -> str:
129129
connection_string += f'&schema={self.schema_name}'
130130
return connection_string
131131

132+
@cached_property
133+
def hasura_connection_parameters(self) -> Dict[str, Any]:
134+
return {
135+
'username': self.user,
136+
'password': self.password,
137+
'database': self.database,
138+
'host': self.host,
139+
'port': self.port,
140+
}
141+
132142
@validator('immune_tables')
133143
def _valid_immune_tables(cls, v) -> None:
134144
for table in v:
@@ -992,6 +1002,7 @@ class HasuraConfig:
9921002
9931003
:param url: URL of the Hasura instance.
9941004
:param admin_secret: Admin secret of the Hasura instance.
1005+
:param create_source: Whether source should be added to Hasura if missing.
9951006
:param source: Hasura source for DipDup to configure, others will be left untouched.
9961007
:param select_limit: Row limit for unauthenticated queries.
9971008
:param allow_aggregations: Whether to allow aggregations in unauthenticated queries.
@@ -1002,6 +1013,7 @@ class HasuraConfig:
10021013

10031014
url: str
10041015
admin_secret: Optional[str] = None
1016+
create_source: bool = False
10051017
source: str = 'default'
10061018
select_limit: int = 100
10071019
allow_aggregations: bool = True

src/dipdup/hasura.py

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,23 @@ async def configure(self, force: bool = False) -> None:
121121
self._logger.info('Metadata is up to date, no action required')
122122
return
123123

124-
# NOTE: Find chosen source and overwrite its tables
124+
# NOTE: Find chosen source and overwrite its tables, create if missing and allowed.
125125
source_name = self._hasura_config.source
126-
for source in metadata['sources']:
127-
if source['name'] == source_name:
128-
source['tables'] = await self._generate_source_tables_metadata()
129-
break
130-
else:
131-
raise HasuraError(f'Source `{source_name}` not found in metadata')
126+
127+
if (source := self._get_source(metadata, source_name)) is None:
128+
if not self._hasura_config.create_source:
129+
raise HasuraError(f'Source `{source_name}` not found in metadata. Set `create_source` flag to create it.')
130+
131+
await self._create_source()
132+
metadata = await self._fetch_metadata()
133+
if (source := self._get_source(metadata, source_name)) is None:
134+
raise HasuraError(f'Source `{source_name}` not found in metadata after creation.')
135+
136+
source['tables'] = await self._generate_source_tables_metadata()
132137
await self._replace_metadata(metadata)
133138

134139
# NOTE: Apply table customizations before generating queries
135-
await self._apply_table_customization()
140+
await self._apply_table_customization(source)
136141
metadata = await self._fetch_metadata()
137142

138143
# NOTE: Generate and apply queries and REST endpoints
@@ -161,6 +166,13 @@ async def configure(self, force: bool = False) -> None:
161166

162167
self._logger.info('Hasura instance has been configured')
163168

169+
def _get_source(self, metadata: Dict[str, Any], name: str) -> Optional[Dict[str, Any]]:
170+
for source in metadata['sources']:
171+
if source['name'] == name:
172+
return source
173+
else:
174+
return None
175+
164176
async def _hasura_request(self, endpoint: str, json: Dict[str, Any]) -> Dict[str, Any]:
165177
self._logger.debug('Sending `%s` request: %s', endpoint, dump_json(json))
166178
try:
@@ -203,13 +215,34 @@ async def _healthcheck(self) -> None:
203215

204216
self._logger.info('Connected to Hasura %s', version)
205217

218+
async def _create_source(self) -> Dict[str, Any]:
219+
self._logger.info(f'Adding source `{self._hasura_config.source}`')
220+
return await self._hasura_request(
221+
endpoint='metadata',
222+
json={
223+
'type': 'pg_add_source',
224+
'args': {
225+
'name': self._hasura_config.source,
226+
'configuration': {
227+
'connection_info': {
228+
'database_url': {
229+
'connection_parameters': self._database_config.hasura_connection_parameters,
230+
},
231+
'use_prepared_statements': True,
232+
}
233+
},
234+
'replace_configuration': True,
235+
},
236+
},
237+
)
238+
206239
async def _fetch_metadata(self) -> Dict[str, Any]:
207240
self._logger.info('Fetching existing metadata')
208241
return await self._hasura_request(
209242
endpoint='metadata',
210243
json={
211-
"type": "export_metadata",
212-
"args": {},
244+
'type': 'export_metadata',
245+
'args': {},
213246
},
214247
)
215248

@@ -400,17 +433,22 @@ async def _get_fields(self, name: str = 'query_root') -> List[Field]:
400433

401434
return fields
402435

403-
async def _apply_table_customization(self) -> None:
436+
async def _apply_table_customization(self, source: Dict[str, Any]) -> None:
404437
"""Convert table and column names to camelCase.
405438
406439
Based on https://github.com/m-rgba/hasura-snake-to-camel
407440
"""
408441

442+
# NOTE: Build a set of table names for our source
443+
table_names = {t['table']['name'] for t in source['tables']}
409444
tables = await self._get_fields()
410445

411446
# TODO: Bulk request
412447
for table in tables:
413-
custom_root_fields = self._format_custom_root_fields(table)
448+
if table.root not in table_names:
449+
continue
450+
451+
custom_root_fields = self._format_custom_root_fields(table.root)
414452
columns = await self._get_fields(table.root)
415453
custom_column_names = self._format_custom_column_names(columns)
416454
args: Dict[str, Any] = {
@@ -479,9 +517,7 @@ def _format_rest_endpoint(self, query_name: str) -> Dict[str, Any]:
479517
"comment": None,
480518
}
481519

482-
def _format_custom_root_fields(self, table: Field) -> Dict[str, Any]:
483-
table_name = table.root
484-
520+
def _format_custom_root_fields(self, table_name: str) -> Dict[str, Any]:
485521
def _fmt(fmt: str) -> str:
486522
if self._hasura_config.camel_case:
487523
return humps.camelize(fmt.format(table_name))

tests/integration_tests/test_hasura.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ async def test_configure_hasura(self) -> None:
5555
hasura_container._container.reload()
5656
hasura_ip = hasura_container._container.attrs['NetworkSettings']['IPAddress']
5757

58-
config.hasura = HasuraConfig(f'http://{hasura_ip}:8080')
58+
config.hasura = HasuraConfig(
59+
url=f'http://{hasura_ip}:8080',
60+
source='new_source',
61+
create_source=True,
62+
)
5963
hasura_gateway = HasuraGateway('demo_hic_et_nunc', config.hasura, config.database)
6064
await stack.enter_async_context(hasura_gateway)
6165

0 commit comments

Comments
 (0)