1313_logger : logging .Logger = logging .getLogger (__name__ )
1414
1515
16- def _copy_objects (batch : List [Tuple [str , str ]], use_threads : bool , boto3_session : boto3 .Session ) -> None :
16+ def _copy_objects (batch : List [Tuple [str , str ]], s3_additional_kwargs : Optional [ Dict [ str , str ]], use_threads : bool , boto3_session : boto3 .Session ) -> None :
1717 _logger .debug ("len(batch): %s" , len (batch ))
1818 client_s3 : boto3 .client = _utils .client (service_name = "s3" , session = boto3_session )
1919 resource_s3 : boto3 .resource = _utils .resource (service_name = "s3" , session = boto3_session )
@@ -26,6 +26,7 @@ def _copy_objects(batch: List[Tuple[str, str]], use_threads: bool, boto3_session
2626 Bucket = target_bucket ,
2727 Key = target_key ,
2828 SourceClient = client_s3 ,
29+ ExtraArgs = s3_additional_kwargs ,
2930 Config = TransferConfig (num_download_attempts = 15 , use_threads = use_threads ),
3031 )
3132
@@ -36,6 +37,7 @@ def merge_datasets(
3637 mode : str = "append" ,
3738 use_threads : bool = True ,
3839 boto3_session : Optional [boto3 .Session ] = None ,
40+ s3_additional_kwargs : Optional [Dict [str , str ]] = None ,
3941) -> List [str ]:
4042 """Merge a source dataset into a target dataset.
4143
@@ -67,6 +69,10 @@ def merge_datasets(
6769 If enabled os.cpu_count() will be used as the max number of threads.
6870 boto3_session : boto3.Session(), optional
6971 Boto3 Session. The default boto3 session will be used if boto3_session receive None.
72+ s3_additional_kwargs:
73+ Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass",
74+ "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
75+ e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'}
7076
7177 Returns
7278 -------
@@ -108,7 +114,7 @@ def merge_datasets(
108114 raise exceptions .InvalidArgumentValue (f"{ mode } is a invalid mode option." )
109115
110116 new_objects : List [str ] = copy_objects (
111- paths = paths , source_path = source_path , target_path = target_path , use_threads = use_threads , boto3_session = session
117+ paths = paths , source_path = source_path , target_path = target_path , use_threads = use_threads , boto3_session = session , s3_additional_kwargs = s3_additional_kwargs
112118 )
113119 _logger .debug ("len(new_objects): %s" , len (new_objects ))
114120 return new_objects
@@ -119,6 +125,7 @@ def copy_objects(
119125 source_path : str ,
120126 target_path : str ,
121127 replace_filenames : Optional [Dict [str , str ]] = None ,
128+ s3_additional_kwargs : Optional [Dict [str , str ]] = None ,
122129 use_threads : bool = True ,
123130 boto3_session : Optional [boto3 .Session ] = None ,
124131) -> List [str ]:
@@ -144,6 +151,10 @@ def copy_objects(
144151 If enabled os.cpu_count() will be used as the max number of threads.
145152 boto3_session : boto3.Session(), optional
146153 Boto3 Session. The default boto3 session will be used if boto3_session receive None.
154+ s3_additional_kwargs:
155+ Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass",
156+ "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
157+ e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'}
147158
148159 Returns
149160 -------
@@ -154,9 +165,13 @@ def copy_objects(
154165 --------
155166 >>> import awswrangler as wr
156167 >>> wr.s3.copy_objects(
157- ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"])
168+ ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"],
158169 ... source_path="s3://bucket0/dir0/",
159170 ... target_path="s3://bucket1/dir1/",
171+ ... s3_additional_kwargs={
172+ ... 'ServerSideEncryption': 'aws:kms',
173+ ... 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'
174+ ... }
160175 ... )
161176 ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"]
162177
@@ -184,5 +199,5 @@ def copy_objects(
184199 new_objects .append (path_final )
185200 batch .append ((path , path_final ))
186201 _logger .debug ("len(new_objects): %s" , len (new_objects ))
187- _copy_objects (batch = batch , use_threads = use_threads , boto3_session = session )
202+ _copy_objects (batch = batch , use_threads = use_threads , boto3_session = session , s3_additional_kwargs = s3_additional_kwargs )
188203 return new_objects
0 commit comments