Skip to content

Commit 5298aaf

Browse files
committed
Add kms_key_id, max_file_size and region to Redshift Unload
1 parent 2a26d4f commit 5298aaf

File tree

1 file changed

+62
-5
lines changed

1 file changed

+62
-5
lines changed

awswrangler/db.py

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import json
44
import logging
5+
import time
56
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
67
from urllib.parse import quote_plus
78

@@ -91,7 +92,16 @@ def to_sql(df: pd.DataFrame, con: sqlalchemy.engine.Engine, **pandas_kwargs) ->
9192
)
9293
pandas_kwargs["dtype"] = dtypes
9394
pandas_kwargs["con"] = con
94-
df.to_sql(**pandas_kwargs)
95+
max_attempts: int = 3
96+
for attempt in range(max_attempts):
97+
try:
98+
df.to_sql(**pandas_kwargs)
99+
except sqlalchemy.exc.InternalError as ex: # pragma: no cover
100+
if attempt == (max_attempts - 1):
101+
raise ex
102+
time.sleep(1)
103+
else:
104+
break
95105

96106

97107
def read_sql_query(
@@ -887,6 +897,9 @@ def unload_redshift(
887897
path: str,
888898
con: sqlalchemy.engine.Engine,
889899
iam_role: str,
900+
region: Optional[str] = None,
901+
max_file_size: Optional[float] = None,
902+
kms_key_id: Optional[str] = None,
890903
categories: List[str] = None,
891904
chunked: Union[bool, int] = False,
892905
keep_files: bool = False,
@@ -937,6 +950,19 @@ def unload_redshift(
937950
wr.db.get_engine(), wr.db.get_redshift_temp_engine() or wr.catalog.get_engine()
938951
iam_role : str
939952
AWS IAM role with the related permissions.
953+
region : str, optional
954+
Specifies the AWS Region where the target Amazon S3 bucket is located.
955+
REGION is required for UNLOAD to an Amazon S3 bucket that isn't in the
956+
same AWS Region as the Amazon Redshift cluster. By default, UNLOAD
957+
assumes that the target Amazon S3 bucket is located in the same AWS
958+
Region as the Amazon Redshift cluster.
959+
max_file_size : float, optional
960+
Specifies the maximum size (MB) of files that UNLOAD creates in Amazon S3.
961+
Specify a decimal value between 5.0 MB and 6200.0 MB. If None, the default
962+
maximum file size is 6200.0 MB.
963+
kms_key_id : str, optional
964+
Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be
965+
used to encrypt data files on Amazon S3.
940966
categories: List[str], optional
941967
List of columns names that should be returned as pandas.Categorical.
942968
Recommended for memory restricted environments.
@@ -973,7 +999,15 @@ def unload_redshift(
973999
"""
9741000
session: boto3.Session = _utils.ensure_session(session=boto3_session)
9751001
paths: List[str] = unload_redshift_to_files(
976-
sql=sql, path=path, con=con, iam_role=iam_role, use_threads=use_threads, boto3_session=session
1002+
sql=sql,
1003+
path=path,
1004+
con=con,
1005+
iam_role=iam_role,
1006+
region=region,
1007+
max_file_size=max_file_size,
1008+
kms_key_id=kms_key_id,
1009+
use_threads=use_threads,
1010+
boto3_session=session,
9771011
)
9781012
s3.wait_objects_exist(paths=paths, use_threads=False, boto3_session=session)
9791013
if chunked is False:
@@ -1032,6 +1066,9 @@ def unload_redshift_to_files(
10321066
path: str,
10331067
con: sqlalchemy.engine.Engine,
10341068
iam_role: str,
1069+
region: Optional[str] = None,
1070+
max_file_size: Optional[float] = None,
1071+
kms_key_id: Optional[str] = None,
10351072
use_threads: bool = True,
10361073
manifest: bool = False,
10371074
partition_cols: Optional[List] = None,
@@ -1056,6 +1093,19 @@ def unload_redshift_to_files(
10561093
wr.db.get_engine(), wr.db.get_redshift_temp_engine() or wr.catalog.get_engine()
10571094
iam_role : str
10581095
AWS IAM role with the related permissions.
1096+
region : str, optional
1097+
Specifies the AWS Region where the target Amazon S3 bucket is located.
1098+
REGION is required for UNLOAD to an Amazon S3 bucket that isn't in the
1099+
same AWS Region as the Amazon Redshift cluster. By default, UNLOAD
1100+
assumes that the target Amazon S3 bucket is located in the same AWS
1101+
Region as the Amazon Redshift cluster.
1102+
max_file_size : float, optional
1103+
Specifies the maximum size (MB) of files that UNLOAD creates in Amazon S3.
1104+
Specify a decimal value between 5.0 MB and 6200.0 MB. If None, the default
1105+
maximum file size is 6200.0 MB.
1106+
kms_key_id : str, optional
1107+
Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be
1108+
used to encrypt data files on Amazon S3.
10591109
use_threads : bool
10601110
True to enable concurrent requests, False to disable multiple threads.
10611111
If enabled os.cpu_count() will be used as the max number of threads.
@@ -1086,19 +1136,26 @@ def unload_redshift_to_files(
10861136
session: boto3.Session = _utils.ensure_session(session=boto3_session)
10871137
s3.delete_objects(path=path, use_threads=use_threads, boto3_session=session)
10881138
with con.connect() as _con:
1089-
partition_str: str = f"PARTITION BY ({','.join(partition_cols)})\n" if partition_cols else ""
1139+
partition_str: str = f"\nPARTITION BY ({','.join(partition_cols)})" if partition_cols else ""
10901140
manifest_str: str = "\nmanifest" if manifest is True else ""
1141+
region_str: str = f"\nREGION AS '{region}'" if region is not None else ""
1142+
max_file_size_str: str = f"\nMAXFILESIZE AS {max_file_size} MB" if max_file_size is not None else ""
1143+
kms_key_id_str: str = f"\nKMS_KEY_ID '{kms_key_id}'" if kms_key_id is not None else ""
10911144
sql = (
10921145
f"UNLOAD ('{sql}')\n"
10931146
f"TO '{path}'\n"
10941147
f"IAM_ROLE '{iam_role}'\n"
10951148
"ALLOWOVERWRITE\n"
10961149
"PARALLEL ON\n"
1097-
"ENCRYPTED\n"
1150+
"FORMAT PARQUET\n"
1151+
"ENCRYPTED"
1152+
f"{kms_key_id_str}"
10981153
f"{partition_str}"
1099-
"FORMAT PARQUET"
1154+
f"{region_str}"
1155+
f"{max_file_size_str}"
11001156
f"{manifest_str};"
11011157
)
1158+
_logger.debug("sql: \n%s", sql)
11021159
_con.execute(sql)
11031160
sql = "SELECT pg_last_query_id() AS query_id"
11041161
query_id: int = _con.execute(sql).fetchall()[0][0]

0 commit comments

Comments
 (0)