Skip to content

Commit ae9b681

Browse files
committed
Add s3_additional_kwargs to s3.merge_datasets and s3.copy_objects.
1 parent c8b166a commit ae9b681

File tree

8 files changed

+199
-37
lines changed

8 files changed

+199
-37
lines changed

awswrangler/s3/_copy.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
"""Amazon S3 Copy Module (PRIVATE)."""
22

33
import logging
4-
from typing import Dict, List, Optional, Tuple
4+
from typing import Any, Dict, List, Optional, Tuple
55

66
import boto3
77
from boto3.s3.transfer import TransferConfig
88

99
from awswrangler import _utils, exceptions
1010
from awswrangler.s3._delete import delete_objects
11+
from awswrangler.s3._fs import get_botocore_valid_kwargs
1112
from awswrangler.s3._list import list_objects
1213

1314
_logger: logging.Logger = logging.getLogger(__name__)
@@ -17,11 +18,15 @@ def _copy_objects(
1718
batch: List[Tuple[str, str]],
1819
use_threads: bool,
1920
boto3_session: boto3.Session,
20-
s3_additional_kwargs: Optional[Dict[str, str]],
21+
s3_additional_kwargs: Optional[Dict[str, Any]],
2122
) -> None:
2223
_logger.debug("len(batch): %s", len(batch))
2324
client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session)
2425
resource_s3: boto3.resource = _utils.resource(service_name="s3", session=boto3_session)
26+
if s3_additional_kwargs is None:
27+
boto3_kwargs: Optional[Dict[str, Any]] = None
28+
else:
29+
boto3_kwargs = get_botocore_valid_kwargs(function_name="copy_object", s3_additional_kwargs=s3_additional_kwargs)
2530
for source, target in batch:
2631
source_bucket, source_key = _utils.parse_path(path=source)
2732
copy_source: Dict[str, str] = {"Bucket": source_bucket, "Key": source_key}
@@ -31,8 +36,8 @@ def _copy_objects(
3136
Bucket=target_bucket,
3237
Key=target_key,
3338
SourceClient=client_s3,
34-
ExtraArgs=s3_additional_kwargs,
35-
Config=TransferConfig(num_download_attempts=15, use_threads=use_threads),
39+
ExtraArgs=boto3_kwargs,
40+
Config=TransferConfig(num_download_attempts=10, use_threads=use_threads),
3641
)
3742

3843

@@ -42,7 +47,7 @@ def merge_datasets(
4247
mode: str = "append",
4348
use_threads: bool = True,
4449
boto3_session: Optional[boto3.Session] = None,
45-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
50+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
4651
) -> List[str]:
4752
"""Merge a source dataset into a target dataset.
4853
@@ -74,7 +79,7 @@ def merge_datasets(
7479
If enabled os.cpu_count() will be used as the max number of threads.
7580
boto3_session : boto3.Session(), optional
7681
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
77-
s3_additional_kwargs:
82+
s3_additional_kwargs : Optional[Dict[str, Any]]
7883
Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass",
7984
"SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
8085
e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'}
@@ -86,6 +91,8 @@ def merge_datasets(
8691
8792
Examples
8893
--------
94+
Merging
95+
8996
>>> import awswrangler as wr
9097
>>> wr.s3.merge_datasets(
9198
... source_path="s3://bucket0/dir0/",
@@ -94,6 +101,20 @@ def merge_datasets(
94101
... )
95102
["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"]
96103
104+
Merging with a KMS key
105+
106+
>>> import awswrangler as wr
107+
>>> wr.s3.merge_datasets(
108+
... source_path="s3://bucket0/dir0/",
109+
... target_path="s3://bucket1/dir1/",
110+
... mode="append",
111+
... s3_additional_kwargs={
112+
... 'ServerSideEncryption': 'aws:kms',
113+
... 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'
114+
... }
115+
... )
116+
["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"]
117+
97118
"""
98119
source_path = source_path[:-1] if source_path[-1] == "/" else source_path
99120
target_path = target_path[:-1] if target_path[-1] == "/" else target_path
@@ -137,7 +158,7 @@ def copy_objects(
137158
replace_filenames: Optional[Dict[str, str]] = None,
138159
use_threads: bool = True,
139160
boto3_session: Optional[boto3.Session] = None,
140-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
161+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
141162
) -> List[str]:
142163
"""Copy a list of S3 objects to another S3 directory.
143164
@@ -161,7 +182,7 @@ def copy_objects(
161182
If enabled os.cpu_count() will be used as the max number of threads.
162183
boto3_session : boto3.Session(), optional
163184
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
164-
s3_additional_kwargs:
185+
s3_additional_kwargs : Optional[Dict[str, Any]]
165186
Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass",
166187
"SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
167188
e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'}
@@ -173,6 +194,18 @@ def copy_objects(
173194
174195
Examples
175196
--------
197+
Copying
198+
199+
>>> import awswrangler as wr
200+
>>> wr.s3.copy_objects(
201+
... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"],
202+
... source_path="s3://bucket0/dir0/",
203+
... target_path="s3://bucket1/dir1/"
204+
... )
205+
["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"]
206+
207+
Copying with a KMS key
208+
176209
>>> import awswrangler as wr
177210
>>> wr.s3.copy_objects(
178211
... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"],

awswrangler/s3/_fs.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,19 @@
2323
_MIN_WRITE_BLOCK: int = 5_242_880 # 5 MB (5 * 2**20)
2424
_MIN_PARALLEL_READ_BLOCK: int = 5_242_880 # 5 MB (5 * 2**20)
2525

26-
_BOTOCORE_ACCEPTED_KWARGS: Dict[str, Set[str]] = {
26+
BOTOCORE_ACCEPTED_KWARGS: Dict[str, Set[str]] = {
2727
"get_object": {"SSECustomerAlgorithm", "SSECustomerKey"},
28+
"copy_object": {
29+
"ACL",
30+
"Metadata",
31+
"ServerSideEncryption",
32+
"StorageClass",
33+
"SSECustomerAlgorithm",
34+
"SSECustomerKey",
35+
"SSEKMSKeyId",
36+
"SSEKMSEncryptionContext",
37+
"Tagging",
38+
},
2839
"create_multipart_upload": {
2940
"ACL",
3041
"Metadata",
@@ -52,6 +63,11 @@
5263
}
5364

5465

66+
def get_botocore_valid_kwargs(function_name: str, s3_additional_kwargs: Dict[str, Any]) -> Dict[str, Any]:
67+
"""Filter and keep only the valid botocore key arguments."""
68+
return {k: v for k, v in s3_additional_kwargs.items() if k in BOTOCORE_ACCEPTED_KWARGS[function_name]}
69+
70+
5571
def _fetch_range(
5672
range_values: Tuple[int, int],
5773
bucket: str,
@@ -253,17 +269,16 @@ def __iter__(self) -> "_S3Object":
253269
"""Iterate over lines."""
254270
return self
255271

256-
def _get_botocore_valid_kwargs(self, function_name: str) -> Dict[str, Any]:
257-
return {k: v for k, v in self._s3_additional_kwargs.items() if k in _BOTOCORE_ACCEPTED_KWARGS[function_name]}
258-
259272
@staticmethod
260273
def _merge_range(ranges: List[Tuple[int, bytes]]) -> bytes:
261274
return b"".join(data for start, data in sorted(ranges, key=lambda r: r[0]))
262275

263276
def _fetch_range_proxy(self, start: int, end: int) -> bytes:
264277
_logger.debug("Fetching: s3://%s/%s - Range: %s-%s", self._bucket, self._key, start, end)
265278
boto3_primitives: _utils.Boto3PrimitivesType = _utils.boto3_to_primitives(boto3_session=self._boto3_session)
266-
boto3_kwargs: Dict[str, Any] = self._get_botocore_valid_kwargs(function_name="get_object")
279+
boto3_kwargs: Dict[str, Any] = get_botocore_valid_kwargs(
280+
function_name="get_object", s3_additional_kwargs=self._s3_additional_kwargs
281+
)
267282
cpus: int = _utils.ensure_cpu_count(use_threads=self._use_threads)
268283
range_size: int = end - start
269284
if cpus < 2 or range_size < (2 * _MIN_PARALLEL_READ_BLOCK):
@@ -452,7 +467,9 @@ def flush(self, force: bool = False) -> None:
452467
max_num_tries=6,
453468
Bucket=self._bucket,
454469
Key=self._key,
455-
**self._get_botocore_valid_kwargs(function_name="create_multipart_upload"),
470+
**get_botocore_valid_kwargs(
471+
function_name="create_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs
472+
),
456473
)
457474
self._buffer.seek(0)
458475
for chunk_size in _utils.get_even_chunks_sizes(
@@ -467,7 +484,9 @@ def flush(self, force: bool = False) -> None:
467484
upload_id=self._mpu["UploadId"],
468485
data=self._buffer.read(chunk_size),
469486
boto3_session=self._boto3_session,
470-
boto3_kwargs=self._get_botocore_valid_kwargs(function_name="upload_part"),
487+
boto3_kwargs=get_botocore_valid_kwargs(
488+
function_name="upload_part", s3_additional_kwargs=self._s3_additional_kwargs
489+
),
471490
)
472491
self._buffer = io.BytesIO()
473492
return None
@@ -505,7 +524,9 @@ def close(self) -> None:
505524
Key=self._key,
506525
UploadId=self._mpu["UploadId"],
507526
MultipartUpload=part_info,
508-
**self._get_botocore_valid_kwargs(function_name="complete_multipart_upload"),
527+
**get_botocore_valid_kwargs(
528+
function_name="complete_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs
529+
),
509530
)
510531
elif self._buffer.tell() > 0:
511532
_logger.debug("put_object")
@@ -517,7 +538,9 @@ def close(self) -> None:
517538
Bucket=self._bucket,
518539
Key=self._key,
519540
Body=self._buffer.getvalue(),
520-
**self._get_botocore_valid_kwargs(function_name="put_object"),
541+
**get_botocore_valid_kwargs(
542+
function_name="put_object", s3_additional_kwargs=self._s3_additional_kwargs
543+
),
521544
)
522545
self._parts_count = 0
523546
self._buffer.seek(0)

awswrangler/s3/_read_parquet.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ def read_parquet(
395395
last_modified_begin: Optional[datetime.datetime] = None,
396396
last_modified_end: Optional[datetime.datetime] = None,
397397
boto3_session: Optional[boto3.Session] = None,
398-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
398+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
399399
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
400400
"""Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.
401401
@@ -477,7 +477,7 @@ def read_parquet(
477477
The filter is applied only after list all s3 files.
478478
boto3_session : boto3.Session(), optional
479479
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
480-
s3_additional_kwargs : Dict[str, str]
480+
s3_additional_kwargs : Optional[Dict[str, Any]]
481481
Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
482482
483483
Returns
@@ -572,7 +572,7 @@ def read_parquet_table(
572572
chunked: Union[bool, int] = False,
573573
use_threads: bool = True,
574574
boto3_session: Optional[boto3.Session] = None,
575-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
575+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
576576
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
577577
"""Read Apache Parquet table registered on AWS Glue Catalog.
578578
@@ -637,7 +637,7 @@ def read_parquet_table(
637637
If enabled os.cpu_count() will be used as the max number of threads.
638638
boto3_session : boto3.Session(), optional
639639
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
640-
s3_additional_kwargs:
640+
s3_additional_kwargs : Optional[Dict[str, Any]]
641641
Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
642642
643643
Returns
@@ -715,7 +715,7 @@ def read_parquet_metadata(
715715
dataset: bool = False,
716716
use_threads: bool = True,
717717
boto3_session: Optional[boto3.Session] = None,
718-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
718+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
719719
) -> Tuple[Dict[str, str], Optional[Dict[str, str]]]:
720720
"""Read Apache Parquet file(s) metadata from from a received S3 prefix or list of S3 objects paths.
721721
@@ -756,7 +756,7 @@ def read_parquet_metadata(
756756
If enabled os.cpu_count() will be used as the max number of threads.
757757
boto3_session : boto3.Session(), optional
758758
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
759-
s3_additional_kwargs:
759+
s3_additional_kwargs : Optional[Dict[str, Any]]
760760
Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
761761
762762
Returns

awswrangler/s3/_read_text.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def read_csv(
149149
last_modified_begin: Optional[datetime.datetime] = None,
150150
last_modified_end: Optional[datetime.datetime] = None,
151151
boto3_session: Optional[boto3.Session] = None,
152-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
152+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
153153
chunksize: Optional[int] = None,
154154
dataset: bool = False,
155155
partition_filter: Optional[Callable[[Dict[str, str]], bool]] = None,
@@ -194,7 +194,7 @@ def read_csv(
194194
The filter is applied only after list all s3 files.
195195
boto3_session : boto3.Session(), optional
196196
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
197-
s3_additional_kwargs : Dict[str, str]
197+
s3_additional_kwargs : Optional[Dict[str, Any]]
198198
Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
199199
chunksize: int, optional
200200
If specified, return an generator where chunksize is the number of rows to include in each chunk.
@@ -283,7 +283,7 @@ def read_fwf(
283283
last_modified_begin: Optional[datetime.datetime] = None,
284284
last_modified_end: Optional[datetime.datetime] = None,
285285
boto3_session: Optional[boto3.Session] = None,
286-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
286+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
287287
chunksize: Optional[int] = None,
288288
dataset: bool = False,
289289
partition_filter: Optional[Callable[[Dict[str, str]], bool]] = None,
@@ -328,7 +328,7 @@ def read_fwf(
328328
The filter is applied only after list all s3 files.
329329
boto3_session : boto3.Session(), optional
330330
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
331-
s3_additional_kwargs:
331+
s3_additional_kwargs : Optional[Dict[str, Any]]
332332
Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
333333
chunksize: int, optional
334334
If specified, return an generator where chunksize is the number of rows to include in each chunk.
@@ -418,7 +418,7 @@ def read_json(
418418
last_modified_begin: Optional[datetime.datetime] = None,
419419
last_modified_end: Optional[datetime.datetime] = None,
420420
boto3_session: Optional[boto3.Session] = None,
421-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
421+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
422422
chunksize: Optional[int] = None,
423423
dataset: bool = False,
424424
partition_filter: Optional[Callable[[Dict[str, str]], bool]] = None,
@@ -465,7 +465,7 @@ def read_json(
465465
The filter is applied only after list all s3 files.
466466
boto3_session : boto3.Session(), optional
467467
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
468-
s3_additional_kwargs:
468+
s3_additional_kwargs : Optional[Dict[str, Any]]
469469
Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
470470
chunksize: int, optional
471471
If specified, return an generator where chunksize is the number of rows to include in each chunk.

awswrangler/s3/_write_parquet.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
200200
max_rows_by_file: Optional[int] = None,
201201
use_threads: bool = True,
202202
boto3_session: Optional[boto3.Session] = None,
203-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
203+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
204204
sanitize_columns: bool = False,
205205
dataset: bool = False,
206206
partition_cols: Optional[List[str]] = None,
@@ -262,7 +262,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals
262262
If enabled os.cpu_count() will be used as the max number of threads.
263263
boto3_session : boto3.Session(), optional
264264
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
265-
s3_additional_kwargs:
265+
s3_additional_kwargs : Optional[Dict[str, Any]]
266266
Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass",
267267
"SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
268268
e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'}
@@ -590,7 +590,7 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments
590590
projection_values: Optional[Dict[str, str]] = None,
591591
projection_intervals: Optional[Dict[str, str]] = None,
592592
projection_digits: Optional[Dict[str, str]] = None,
593-
s3_additional_kwargs: Optional[Dict[str, str]] = None,
593+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
594594
boto3_session: Optional[boto3.Session] = None,
595595
) -> Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]:
596596
"""Infer and store parquet metadata on AWS Glue Catalog.
@@ -688,7 +688,7 @@ def store_parquet_metadata( # pylint: disable=too-many-arguments
688688
Dictionary of partitions names and Athena projections digits.
689689
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
690690
(e.g. {'col_name': '1', 'col2_name': '2'})
691-
s3_additional_kwargs:
691+
s3_additional_kwargs : Optional[Dict[str, Any]]
692692
Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass",
693693
"SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
694694
e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMY_KEY_ARN'}

0 commit comments

Comments
 (0)