Skip to content

Commit 7173322

Browse files
Redshift serializable isolation (#667)
* Add redshift LOCK to force serializable isolation Co-authored-by: jaidisido <[email protected]>
1 parent 4d74d70 commit 7173322

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

awswrangler/redshift.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ def _copy(
117117
cursor.execute(sql)
118118

119119

120+
def _lock(
121+
cursor: redshift_connector.Cursor,
122+
table_names: List[str],
123+
schema: Optional[str] = None,
124+
) -> None:
125+
fmt = '"{schema}"."{table}"' if schema else '"{table}"'
126+
tables = ", ".join([fmt.format(schema=schema, table=table) for table in table_names])
127+
sql: str = f"LOCK {tables};\n"
128+
_logger.debug("lock query:\n%s", sql)
129+
cursor.execute(sql)
130+
131+
120132
def _upsert(
121133
cursor: redshift_connector.Cursor,
122134
table: str,
@@ -647,6 +659,7 @@ def to_sql(
647659
varchar_lengths_default: int = 256,
648660
varchar_lengths: Optional[Dict[str, int]] = None,
649661
use_column_names: bool = False,
662+
lock: bool = False,
650663
chunksize: int = 200,
651664
) -> None:
652665
"""Write records stored in a DataFrame into Redshift.
@@ -696,6 +709,8 @@ def to_sql(
696709
If set to True, will use the column names of the DataFrame for generating the INSERT SQL Query.
697710
E.g. If the DataFrame has two columns `col1` and `col3` and `use_column_names` is True, data will only be
698711
inserted into the database columns `col1` and `col3`.
712+
lock : bool
713+
True to execute LOCK command inside the transaction to force serializable isolation.
699714
chunksize: int
700715
Number of rows which are inserted with each SQL query. Defaults to inserting 200 rows per query.
701716
@@ -758,6 +773,8 @@ def to_sql(
758773
_logger.debug("sql: %s", sql)
759774
cursor.executemany(sql, (parameters,))
760775
if table != created_table: # upsert
776+
if lock:
777+
_lock(cursor, [table], schema=schema)
761778
_upsert(cursor=cursor, schema=schema, table=table, temp_table=created_table, primary_keys=primary_keys)
762779
con.commit()
763780
except Exception as ex:
@@ -1067,6 +1084,7 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
10671084
path_suffix: Optional[str] = None,
10681085
path_ignore_suffix: Optional[str] = None,
10691086
use_threads: bool = True,
1087+
lock: bool = False,
10701088
boto3_session: Optional[boto3.Session] = None,
10711089
s3_additional_kwargs: Optional[Dict[str, str]] = None,
10721090
) -> None:
@@ -1145,6 +1163,8 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
11451163
use_threads : bool
11461164
True to enable concurrent requests, False to disable multiple threads.
11471165
If enabled os.cpu_count() will be used as the max number of threads.
1166+
lock : bool
1167+
True to execute LOCK command inside the transaction to force serializable isolation.
11481168
boto3_session : boto3.Session(), optional
11491169
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
11501170
s3_additional_kwargs:
@@ -1199,6 +1219,9 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
11991219
boto3_session=boto3_session,
12001220
s3_additional_kwargs=s3_additional_kwargs,
12011221
)
1222+
if lock and table == created_table:
1223+
# Lock before copy if copying into target (not temp) table
1224+
_lock(cursor, [table], schema=schema)
12021225
_copy(
12031226
cursor=cursor,
12041227
path=path,
@@ -1212,6 +1235,8 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
12121235
serialize_to_json=serialize_to_json,
12131236
)
12141237
if table != created_table: # upsert
1238+
if lock:
1239+
_lock(cursor, [table], schema=schema)
12151240
_upsert(cursor=cursor, schema=schema, table=table, temp_table=created_table, primary_keys=primary_keys)
12161241
con.commit()
12171242
except Exception as ex:
@@ -1245,6 +1270,7 @@ def copy( # pylint: disable=too-many-arguments
12451270
serialize_to_json: bool = False,
12461271
keep_files: bool = False,
12471272
use_threads: bool = True,
1273+
lock: bool = False,
12481274
boto3_session: Optional[boto3.Session] = None,
12491275
s3_additional_kwargs: Optional[Dict[str, str]] = None,
12501276
max_rows_by_file: Optional[int] = 10_000_000,
@@ -1324,6 +1350,8 @@ def copy( # pylint: disable=too-many-arguments
13241350
use_threads : bool
13251351
True to enable concurrent requests, False to disable multiple threads.
13261352
If enabled os.cpu_count() will be used as the max number of threads.
1353+
lock : bool
1354+
True to execute LOCK command inside the transaction to force serializable isolation.
13271355
boto3_session : boto3.Session(), optional
13281356
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
13291357
s3_additional_kwargs:
@@ -1397,6 +1425,7 @@ def copy( # pylint: disable=too-many-arguments
13971425
varchar_lengths=varchar_lengths,
13981426
serialize_to_json=serialize_to_json,
13991427
use_threads=use_threads,
1428+
lock=lock,
14001429
boto3_session=session,
14011430
s3_additional_kwargs=s3_additional_kwargs,
14021431
)

tests/test_redshift.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def test_copy_upsert(path, redshift_table, redshift_con, databases_parameters):
208208
assert len(df.index) + len(df3.index) == len(df4.index)
209209
assert len(df.columns) == len(df4.columns)
210210

211-
# UPSERT 2
211+
# UPSERT 2 + lock
212212
wr.redshift.copy(
213213
df=df3,
214214
path=path,
@@ -218,6 +218,7 @@ def test_copy_upsert(path, redshift_table, redshift_con, databases_parameters):
218218
mode="upsert",
219219
index=False,
220220
iam_role=databases_parameters["redshift"]["role"],
221+
lock=True,
221222
)
222223
path = f"{path}upsert/test_redshift_copy_upsert4/"
223224
df4 = wr.redshift.unload(

0 commit comments

Comments
 (0)