Skip to content

Commit 537afe5

Browse files
committed
Splitting up and creating two functions, copy_objects() and merge_datasets()
1 parent e6d808d commit 537afe5

File tree

6 files changed

+129
-48
lines changed

6 files changed

+129
-48
lines changed

awswrangler/_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def ensure_cpu_count(use_threads: bool = True) -> int:
6969
7070
Note
7171
----
72-
In case of `use_threads=True` the number of process that could be spawned will be get from os.cpu_count().
72+
In case of `use_threads=True` the number of threads that could be spawned will be get from os.cpu_count().
7373
7474
Parameters
7575
----------

awswrangler/athena.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
369369
370370
Note
371371
----
372-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
372+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
373373
374374
Parameters
375375
----------
@@ -605,7 +605,7 @@ def read_sql_table(
605605
606606
Note
607607
----
608-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
608+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
609609
610610
Parameters
611611
----------

awswrangler/db.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def copy_to_redshift( # pylint: disable=too-many-arguments
438438
439439
Note
440440
----
441-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
441+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
442442
443443
Parameters
444444
----------
@@ -576,7 +576,7 @@ def copy_files_to_redshift( # pylint: disable=too-many-locals,too-many-argument
576576
577577
Note
578578
----
579-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
579+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
580580
581581
Parameters
582582
----------
@@ -798,7 +798,7 @@ def write_redshift_copy_manifest(
798798
799799
Note
800800
----
801-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
801+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
802802
803803
Parameters
804804
----------
@@ -908,7 +908,7 @@ def unload_redshift(
908908
909909
Note
910910
----
911-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
911+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
912912
913913
Parameters
914914
----------
@@ -1024,7 +1024,7 @@ def unload_redshift_to_files(
10241024
10251025
Note
10261026
----
1027-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1027+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
10281028
10291029
Parameters
10301030
----------

awswrangler/s3.py

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def delete_objects(
176176
177177
Note
178178
----
179-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
179+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
180180
181181
Parameters
182182
----------
@@ -248,7 +248,7 @@ def describe_objects(
248248
249249
Note
250250
----
251-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
251+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
252252
253253
Parameters
254254
----------
@@ -328,7 +328,7 @@ def size_objects(
328328
329329
Note
330330
----
331-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
331+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
332332
333333
Parameters
334334
----------
@@ -396,7 +396,7 @@ def to_csv( # pylint: disable=too-many-arguments
396396
397397
Note
398398
----
399-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
399+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
400400
401401
Parameters
402402
----------
@@ -426,9 +426,9 @@ def to_csv( # pylint: disable=too-many-arguments
426426
List of column names that will be used to create partitions. Only takes effect if dataset=True.
427427
mode: str, optional
428428
``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True.
429-
database : str
429+
database : str, optional
430430
Glue/Athena catalog: Database name.
431-
table : str
431+
table : str, optional
432432
Glue/Athena catalog: Table name.
433433
dtype: Dict[str, str], optional
434434
Dictionary of columns names and Athena/Glue types to be casted.
@@ -808,7 +808,7 @@ def to_parquet( # pylint: disable=too-many-arguments
808808
809809
Note
810810
----
811-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
811+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
812812
813813
Parameters
814814
----------
@@ -836,9 +836,9 @@ def to_parquet( # pylint: disable=too-many-arguments
836836
List of column names that will be used to create partitions. Only takes effect if dataset=True.
837837
mode: str, optional
838838
``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True.
839-
database : str
839+
database : str, optional
840840
Glue/Athena catalog: Database name.
841-
table : str
841+
table : str, optional
842842
Glue/Athena catalog: Table name.
843843
dtype: Dict[str, str], optional
844844
Dictionary of columns names and Athena/Glue types to be casted.
@@ -1153,7 +1153,7 @@ def read_csv(
11531153
11541154
Note
11551155
----
1156-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1156+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
11571157
11581158
Parameters
11591159
----------
@@ -1236,7 +1236,7 @@ def read_fwf(
12361236
12371237
Note
12381238
----
1239-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1239+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
12401240
12411241
Parameters
12421242
----------
@@ -1319,7 +1319,7 @@ def read_json(
13191319
13201320
Note
13211321
----
1322-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1322+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
13231323
13241324
Parameters
13251325
----------
@@ -1524,7 +1524,7 @@ def read_parquet(
15241524
15251525
Note
15261526
----
1527-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1527+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
15281528
15291529
Parameters
15301530
----------
@@ -1671,7 +1671,7 @@ def read_parquet_metadata(
16711671
16721672
Note
16731673
----
1674-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1674+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
16751675
16761676
Parameters
16771677
----------
@@ -1743,7 +1743,7 @@ def store_parquet_metadata(
17431743
17441744
Note
17451745
----
1746-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1746+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
17471747
17481748
Parameters
17491749
----------
@@ -1843,7 +1843,7 @@ def wait_objects_exist(
18431843
18441844
Note
18451845
----
1846-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1846+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
18471847
18481848
Parameters
18491849
----------
@@ -1895,7 +1895,7 @@ def wait_objects_not_exist(
18951895
18961896
Note
18971897
----
1898-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1898+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
18991899
19001900
Parameters
19011901
----------
@@ -1981,7 +1981,7 @@ def read_parquet_table(
19811981
19821982
Note
19831983
----
1984-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
1984+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
19851985
19861986
Parameters
19871987
----------
@@ -2054,24 +2054,27 @@ def read_parquet_table(
20542054
)
20552055

20562056

2057-
def copy_objects(
2058-
paths: List[str],
2057+
def merge_datasets(
20592058
source_path: str,
20602059
target_path: str,
20612060
mode: str = "append",
20622061
use_threads: bool = True,
20632062
boto3_session: Optional[boto3.Session] = None,
20642063
) -> List[str]:
2065-
"""Copy a list of S3 objects to another S3 directory.
2064+
"""Merge a source dataset into a target dataset.
2065+
2066+
Note
2067+
----
2068+
If you are merging tables (S3 datasets + Glue Catalog metadata),
2069+
remember that you will also need to update your partitions metadata in some cases.
2070+
(e.g. wr.athena.repair_table(table='...', database='...'))
20662071
20672072
Note
20682073
----
2069-
In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count().
2074+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
20702075
20712076
Parameters
20722077
----------
2073-
paths : List[str]
2074-
List of S3 objects paths (e.g. [s3://bucket/dir0/key0, s3://bucket/dir0/key1]).
20752078
source_path : str,
20762079
S3 Path for the source directory.
20772080
target_path : str,
@@ -2092,21 +2095,23 @@ def copy_objects(
20922095
Examples
20932096
--------
20942097
>>> import awswrangler as wr
2095-
>>> wr.s3.copy_objects(
2096-
... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"])
2098+
>>> wr.s3.merge_datasets(
20972099
... source_path="s3://bucket0/dir0/",
20982100
... target_path="s3://bucket1/dir1/",
20992101
... mode="append"
21002102
... )
21012103
["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"]
21022104
21032105
"""
2104-
_logger.debug(f"len(paths): {len(paths)}")
2105-
if len(paths) < 1:
2106-
return []
21072106
source_path = source_path[:-1] if source_path[-1] == "/" else source_path
21082107
target_path = target_path[:-1] if target_path[-1] == "/" else target_path
21092108
session: boto3.Session = _utils.ensure_session(session=boto3_session)
2109+
2110+
paths: List[str] = list_objects(path=f"{source_path}/", boto3_session=session)
2111+
_logger.debug(f"len(paths): {len(paths)}")
2112+
if len(paths) < 1:
2113+
return []
2114+
21102115
if mode == "overwrite":
21112116
_logger.debug(f"Deleting to overwrite: {target_path}/")
21122117
delete_objects(path=f"{target_path}/", use_threads=use_threads, boto3_session=session)
@@ -2121,6 +2126,60 @@ def copy_objects(
21212126
elif mode != "append":
21222127
raise exceptions.InvalidArgumentValue(f"{mode} is a invalid mode option.")
21232128

2129+
new_objects: List[str] = copy_objects(paths=paths, source_path=source_path, target_path=target_path, use_threads=use_threads, boto3_session=session)
2130+
_logger.debug(f"len(new_objects): {len(new_objects)}")
2131+
return new_objects
2132+
2133+
2134+
def copy_objects(
2135+
paths: List[str],
2136+
source_path: str,
2137+
target_path: str,
2138+
use_threads: bool = True,
2139+
boto3_session: Optional[boto3.Session] = None,
2140+
) -> List[str]:
2141+
"""Copy a list of S3 objects to another S3 directory.
2142+
2143+
Note
2144+
----
2145+
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
2146+
2147+
Parameters
2148+
----------
2149+
paths : List[str]
2150+
List of S3 objects paths (e.g. [s3://bucket/dir0/key0, s3://bucket/dir0/key1]).
2151+
source_path : str,
2152+
S3 Path for the source directory.
2153+
target_path : str,
2154+
S3 Path for the target directory.
2155+
use_threads : bool
2156+
True to enable concurrent requests, False to disable multiple threads.
2157+
If enabled os.cpu_count() will be used as the max number of threads.
2158+
boto3_session : boto3.Session(), optional
2159+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
2160+
2161+
Returns
2162+
-------
2163+
List[str]
2164+
List of new objects paths.
2165+
2166+
Examples
2167+
--------
2168+
>>> import awswrangler as wr
2169+
>>> wr.s3.copy_objects(
2170+
... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"])
2171+
... source_path="s3://bucket0/dir0/",
2172+
... target_path="s3://bucket1/dir1/",
2173+
... )
2174+
["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"]
2175+
2176+
"""
2177+
_logger.debug(f"len(paths): {len(paths)}")
2178+
if len(paths) < 1:
2179+
return []
2180+
source_path = source_path[:-1] if source_path[-1] == "/" else source_path
2181+
target_path = target_path[:-1] if target_path[-1] == "/" else target_path
2182+
session: boto3.Session = _utils.ensure_session(session=boto3_session)
21242183
batch: List[Tuple[str, str]] = []
21252184
new_objects: List[str] = []
21262185
for path in paths:

docs/source/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Amazon S3
3030
wait_objects_exist
3131
wait_objects_not_exist
3232
copy_objects
33+
merge_datasets
3334

3435
AWS Glue Catalog
3536
----------------

0 commit comments

Comments
 (0)