Skip to content

Commit 70efe6a

Browse files
Wrap schema wiping in a transaction to avoid orphaned tables in the immune schema (#461)
1 parent 4b1ee02 commit 70efe6a

File tree

4 files changed

+27
-14
lines changed

4 files changed

+27
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning].
1111

1212
- codegen: Fixed invalid `models.py` template.
1313
- context: Do not wrap known exceptions with `CallbackError`
14+
- database: Wrap schema wiping in a transaction to avoid orphaned tables in the immune schema.
1415
- hasura: Fixed processing M2M relations.
1516
- sentry: Fixed "invalid value `environment`" error.
1617
- sentry: Ignore events from project callbacks when `crash_reporting` is enabled.

src/dipdup/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ async def schema_wipe(ctx, immune: bool, force: bool) -> None:
479479
if isinstance(config.database, PostgresDatabaseConfig):
480480
await wipe_schema(
481481
conn=conn,
482-
name=config.database.schema_name,
482+
schema_name=config.database.schema_name,
483483
# NOTE: Don't be confused by the name of `--immune` flag, we want to drop all tables if it's set.
484484
immune_tables=config.database.immune_tables if not immune else set(),
485485
)

src/dipdup/context.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,11 @@ async def reindex(self, reason: Optional[Union[str, ReindexingReason]] = None, *
207207
elif action == ReindexingAction.wipe:
208208
conn = get_connection()
209209
if isinstance(self.config.database, PostgresDatabaseConfig):
210-
await wipe_schema(conn, self.config.database.schema_name, self.config.database.immune_tables)
210+
await wipe_schema(
211+
conn=conn,
212+
schema_name=self.config.database.schema_name,
213+
immune_tables=self.config.database.immune_tables,
214+
)
211215
else:
212216
await Tortoise._drop_databases()
213217
await self.restart()

src/dipdup/utils/database.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,22 +164,29 @@ async def truncate_schema(conn: BaseDBAsyncClient, name: str) -> None:
164164
await conn.execute_script(f"SELECT truncate_schema('{name}')")
165165

166166

167-
async def wipe_schema(conn: BaseDBAsyncClient, name: str, immune_tables: Set[str]) -> None:
167+
async def wipe_schema(
168+
conn: BaseDBAsyncClient,
169+
schema_name: str,
170+
immune_tables: Set[str],
171+
) -> None:
172+
"""Truncate schema preserving immune tables. Executes in a transaction"""
168173
if isinstance(conn, SqliteClient):
169174
raise NotImplementedError
170175

171-
immune_schema_name = f'{name}_immune'
172-
if immune_tables:
173-
await create_schema(conn, immune_schema_name)
174-
for table in immune_tables:
175-
await move_table(conn, table, name, immune_schema_name)
176+
immune_schema_name = f'{schema_name}_immune'
176177

177-
await truncate_schema(conn, name)
178+
async with conn._in_transaction() as conn:
179+
if immune_tables:
180+
await create_schema(conn, immune_schema_name)
181+
for table in immune_tables:
182+
await move_table(conn, table, schema_name, immune_schema_name)
178183

179-
if immune_tables:
180-
for table in immune_tables:
181-
await move_table(conn, table, immune_schema_name, name)
182-
await drop_schema(conn, immune_schema_name)
184+
await truncate_schema(conn, schema_name)
185+
186+
if immune_tables:
187+
for table in immune_tables:
188+
await move_table(conn, table, immune_schema_name, schema_name)
189+
await drop_schema(conn, immune_schema_name)
183190

184191

185192
async def drop_schema(conn: BaseDBAsyncClient, name: str) -> None:
@@ -199,8 +206,9 @@ async def move_table(conn: BaseDBAsyncClient, name: str, schema: str, new_schema
199206

200207
def prepare_models(package: Optional[str]) -> None:
201208
"""Prepare TortoiseORM models to use with DipDup.
202-
Generate missing table names, validate models, increase decimal precision.
209+
Generate missing table names, validate models, increase decimal precision if needed.
203210
"""
211+
# NOTE: Circular imports
204212
from dipdup.models import Model
205213

206214
decimal_context = decimal.getcontext()

0 commit comments

Comments
 (0)