Skip to content

Commit 152877e

Browse files
committed
feat (Postgres cache): add option to drop tables with CASCADE
1 parent d36a616 commit 152877e

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

airbyte/_processors/sql/postgres.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ class PostgresConfig(SqlConfig):
2424
database: str
2525
username: str
2626
password: SecretString | str
27+
drop_cascade: bool = False
28+
"""Whether to use CASCADE when dropping tables."""
2729

2830
@overrides
2931
def get_sql_alchemy_url(self) -> SecretString:
@@ -73,3 +75,33 @@ class PostgresSqlProcessor(SqlProcessorBase):
7375

7476
normalizer = PostgresNormalizer
7577
"""A Postgres-specific name normalizer for table and column name normalization."""
78+
79+
def _swap_temp_table_with_final_table(
80+
self,
81+
stream_name: str,
82+
temp_table_name: str,
83+
final_table_name: str,
84+
) -> None:
85+
"""Merge the temp table into the main one.
86+
87+
This implementation requires MERGE support in the SQL DB.
88+
Databases that do not support this syntax can override this method.
89+
"""
90+
if final_table_name is None:
91+
raise exc.PyAirbyteInternalError(message="Arg 'final_table_name' cannot be None.")
92+
if temp_table_name is None:
93+
raise exc.PyAirbyteInternalError(message="Arg 'temp_table_name' cannot be None.")
94+
95+
_ = stream_name
96+
deletion_name = f"{final_table_name}_deleteme"
97+
commands = "\n".join(
98+
[
99+
f"ALTER TABLE {self._fully_qualified(final_table_name)} RENAME "
100+
f"TO {deletion_name};",
101+
f"ALTER TABLE {self._fully_qualified(temp_table_name)} RENAME "
102+
f"TO {final_table_name};",
103+
f"DROP TABLE {self._fully_qualified(deletion_name)}"
104+
f"{(' CASCADE' if self.sql_config.drop_cascade else '')};",
105+
]
106+
)
107+
self._execute_sql(commands)

0 commit comments

Comments
 (0)