Skip to content

Commit 73f3b88

Browse files
committed
Add s3.copy_objects() #186
1 parent e086214 commit 73f3b88

File tree

3 files changed

+151
-0
lines changed

3 files changed

+151
-0
lines changed

awswrangler/_utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ def client(service_name: str, session: Optional[boto3.Session] = None) -> boto3.
2828
)
2929

3030

31+
def resource(service_name: str, session: Optional[boto3.Session] = None) -> boto3.resource:
32+
"""Create a valid boto3.resource."""
33+
return ensure_session(session=session).resource(
34+
service_name=service_name, use_ssl=True, config=botocore.config.Config(retries={"max_attempts": 15})
35+
)
36+
37+
3138
def parse_path(path: str) -> Tuple[str, str]:
3239
"""Split a full S3 path in bucket and key strings.
3340

awswrangler/s3.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import pyarrow.lib # type: ignore
1717
import pyarrow.parquet # type: ignore
1818
import s3fs # type: ignore
19+
from boto3.s3.transfer import TransferConfig # type: ignore
1920
from pandas.io.common import infer_compression # type: ignore
2021

2122
from awswrangler import _data_types, _utils, catalog, exceptions
@@ -2051,3 +2052,99 @@ def read_parquet_table(
20512052
boto3_session=boto3_session,
20522053
s3_additional_kwargs=s3_additional_kwargs,
20532054
)
2055+
2056+
2057+
def copy_objects(
2058+
paths: List[str],
2059+
source_path: str,
2060+
target_path: str,
2061+
mode: str = "append",
2062+
use_threads: bool = True,
2063+
boto3_session: Optional[boto3.Session] = None,
2064+
) -> List[str]:
2065+
"""Copy a list of S3 objects to another S3 directory.
2066+
2067+
Note
2068+
----
2069+
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
2070+
2071+
Parameters
2072+
----------
2073+
paths : List[str]
2074+
List of S3 objects paths (e.g. [s3://bucket/dir0/key0, s3://bucket/dir0/key1]).
2075+
source_path : str,
2076+
S3 Path for the source directory.
2077+
target_path : str,
2078+
S3 Path for the target directory.
2079+
mode: str, optional
2080+
``append`` (Default), ``overwrite``, ``overwrite_partitions``.
2081+
use_threads : bool
2082+
True to enable concurrent requests, False to disable multiple threads.
2083+
If enabled os.cpu_count() will be used as the max number of threads.
2084+
boto3_session : boto3.Session(), optional
2085+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
2086+
2087+
Returns
2088+
-------
2089+
List[str]
2090+
List of new objects paths.
2091+
2092+
Examples
2093+
--------
2094+
>>> import awswrangler as wr
2095+
>>> wr.s3.copy_objects(
2096+
... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"])
2097+
... source_path="s3://bucket0/dir0/",
2098+
... target_path="s3://bucket1/dir1/",
2099+
... mode="append"
2100+
... )
2101+
["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"]
2102+
2103+
"""
2104+
_logger.debug(f"len(paths): {len(paths)}")
2105+
if len(paths) < 1:
2106+
return []
2107+
source_path = source_path[:-1] if source_path[-1] == "/" else source_path
2108+
target_path = target_path[:-1] if target_path[-1] == "/" else target_path
2109+
session: boto3.Session = _utils.ensure_session(session=boto3_session)
2110+
if mode == "overwrite":
2111+
_logger.debug(f"Deleting to overwrite: {target_path}/")
2112+
delete_objects(path=f"{target_path}/", use_threads=use_threads, boto3_session=session)
2113+
elif mode == "overwrite_partitions":
2114+
paths_wo_prefix: List[str] = [x.replace(f"{source_path}/", "") for x in paths]
2115+
paths_wo_filename: List[str] = [f"{x.rpartition('/')[0]}/" for x in paths_wo_prefix]
2116+
partitions_paths: List[str] = list(set(paths_wo_filename))
2117+
target_partitions_paths = [f"{target_path}/{x}" for x in partitions_paths]
2118+
for path in target_partitions_paths:
2119+
_logger.debug(f"Deleting to overwrite_partitions: {path}")
2120+
delete_objects(path=path, use_threads=use_threads, boto3_session=session)
2121+
elif mode != "append":
2122+
raise exceptions.InvalidArgumentValue(f"{mode} is a invalid mode option.")
2123+
2124+
batch: List[Tuple[str, str]] = []
2125+
new_objects: List[str] = []
2126+
for path in paths:
2127+
path_wo_prefix: str = path.replace(f"{source_path}/", "")
2128+
path_final: str = f"{target_path}/{path_wo_prefix}"
2129+
new_objects.append(path_final)
2130+
batch.append((path, path_final))
2131+
_logger.debug(f"len(new_objects): {len(new_objects)}")
2132+
_copy_objects(batch=batch, use_threads=use_threads, boto3_session=session)
2133+
return new_objects
2134+
2135+
2136+
def _copy_objects(batch: List[Tuple[str, str]], use_threads: bool, boto3_session: boto3.Session) -> None:
2137+
_logger.debug(f"len(batch): {len(batch)}")
2138+
client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session)
2139+
resource_s3: boto3.resource = _utils.resource(service_name="s3", session=boto3_session)
2140+
for source, target in batch:
2141+
source_bucket, source_key = _utils.parse_path(path=source)
2142+
copy_source: Dict[str, str] = {"Bucket": source_bucket, "Key": source_key}
2143+
target_bucket, target_key = _utils.parse_path(path=target)
2144+
resource_s3.meta.client.copy(
2145+
CopySource=copy_source,
2146+
Bucket=target_bucket,
2147+
Key=target_key,
2148+
SourceClient=client_s3,
2149+
Config=TransferConfig(num_download_attempts=15, use_threads=use_threads),
2150+
)

testing/test_awswrangler/test_data_lake.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,3 +1010,50 @@ def test_parquet_char_length(bucket, database, external_schema):
10101010

10111011
wr.s3.delete_objects(path=path)
10121012
assert wr.catalog.delete_table_if_exists(database=database, table=table) is True
1013+
1014+
1015+
def test_copy(bucket):
1016+
path = f"s3://{bucket}/test_copy/"
1017+
df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]})
1018+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=["par"], mode="overwrite")["paths"]
1019+
wr.s3.wait_objects_exist(paths=paths)
1020+
df = wr.s3.read_parquet(path=path, dataset=True)
1021+
assert df.id.sum() == 6
1022+
assert df.par.astype("Int64").sum() == 6
1023+
1024+
path2 = f"s3://{bucket}/test_copy2/"
1025+
df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]})
1026+
paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"]
1027+
wr.s3.wait_objects_exist(paths=paths)
1028+
paths = wr.s3.copy_objects(paths, source_path=path2, target_path=path, mode="append", use_threads=True)
1029+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1030+
df = wr.s3.read_parquet(path=path, dataset=True)
1031+
assert df.id.sum() == 12
1032+
assert df.par.astype("Int64").sum() == 12
1033+
1034+
paths = wr.s3.copy_objects(
1035+
wr.s3.list_objects(path2), source_path=path2, target_path=path, mode="overwrite", use_threads=False
1036+
)
1037+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1038+
df = wr.s3.read_parquet(path=path, dataset=True)
1039+
assert df.id.sum() == 6
1040+
assert df.par.astype("Int64").sum() == 6
1041+
1042+
df = pd.DataFrame({"id": [4], "par": [3]})
1043+
paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"]
1044+
wr.s3.wait_objects_exist(paths=paths)
1045+
paths = wr.s3.copy_objects(
1046+
paths, source_path=path2, target_path=path, mode="overwrite_partitions", use_threads=True
1047+
)
1048+
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1049+
df = wr.s3.read_parquet(path=path, dataset=True)
1050+
assert df.id.sum() == 7
1051+
assert df.par.astype("Int64").sum() == 6
1052+
1053+
with pytest.raises(wr.exceptions.InvalidArgumentValue):
1054+
wr.s3.copy_objects(["foo"], source_path="boo", target_path="bar", mode="WRONG")
1055+
1056+
assert len(wr.s3.copy_objects([], source_path="boo", target_path="bar")) == 0
1057+
1058+
wr.s3.delete_objects(path=path)
1059+
wr.s3.delete_objects(path=path2)

0 commit comments

Comments
 (0)