Skip to content

Commit 10fbf10

Browse files
Refactoring pagination config in list objects (#963)
* Refactoring pagination config in list objects * Introducing chunked param for pagination * Minor - Missing filter * Minor - Adding clarifying comment * Missing emptying the paths list Co-authored-by: kukushking <[email protected]>
1 parent c3473af commit 10fbf10

File tree

4 files changed

+67
-31
lines changed

4 files changed

+67
-31
lines changed

awswrangler/s3/_copy.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ def merge_datasets(
125125
target_path = target_path[:-1] if target_path[-1] == "/" else target_path
126126
session: boto3.Session = _utils.ensure_session(session=boto3_session)
127127

128-
paths: List[str] = list_objects(path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=session)
128+
paths: List[str] = list_objects( # type: ignore
129+
path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=session
130+
)
129131
_logger.debug("len(paths): %s", len(paths))
130132
if len(paths) < 1:
131133
return []

awswrangler/s3/_list.py

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import datetime
44
import fnmatch
55
import logging
6-
from typing import Any, Dict, List, Optional, Sequence, Union
6+
from typing import Any, Dict, Iterator, List, Optional, Sequence, Union
77

88
import boto3
99
import botocore.exceptions
@@ -28,7 +28,7 @@ def _path2list(
2828
_suffix: Optional[List[str]] = [suffix] if isinstance(suffix, str) else suffix
2929
_ignore_suffix: Optional[List[str]] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix
3030
if isinstance(path, str): # prefix
31-
paths: List[str] = list_objects(
31+
paths: List[str] = list_objects( # type: ignore
3232
path=path,
3333
suffix=_suffix,
3434
ignore_suffix=_ignore_suffix,
@@ -79,22 +79,27 @@ def _list_objects( # pylint: disable=too-many-branches
7979
last_modified_end: Optional[datetime.datetime] = None,
8080
boto3_session: Optional[boto3.Session] = None,
8181
ignore_empty: bool = False,
82-
) -> List[str]:
82+
) -> Iterator[List[str]]:
8383
bucket: str
8484
prefix_original: str
8585
bucket, prefix_original = _utils.parse_path(path=path)
8686
prefix: str = _prefix_cleanup(prefix=prefix_original)
8787
_suffix: Union[List[str], None] = [suffix] if isinstance(suffix, str) else suffix
8888
_ignore_suffix: Union[List[str], None] = [ignore_suffix] if isinstance(ignore_suffix, str) else ignore_suffix
8989
client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session)
90+
default_pagination: Dict[str, int] = {"PageSize": 1000}
91+
extra_kwargs: Dict[str, Any] = {"PaginationConfig": default_pagination}
9092
if s3_additional_kwargs:
91-
extra_kwargs: Dict[str, Any] = _fs.get_botocore_valid_kwargs(
93+
extra_kwargs = _fs.get_botocore_valid_kwargs(
9294
function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs
9395
)
94-
else:
95-
extra_kwargs = {}
96+
extra_kwargs["PaginationConfig"] = (
97+
s3_additional_kwargs["PaginationConfig"]
98+
if "PaginationConfig" in s3_additional_kwargs
99+
else default_pagination
100+
)
96101
paginator = client_s3.get_paginator("list_objects_v2")
97-
args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, "PaginationConfig": {"PageSize": 1000}, **extra_kwargs}
102+
args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, **extra_kwargs}
98103
if delimiter is not None:
99104
args["Delimiter"] = delimiter
100105
_logger.debug("args: %s", args)
@@ -127,13 +132,15 @@ def _list_objects( # pylint: disable=too-many-branches
127132
key = pfx["Prefix"]
128133
paths.append(f"s3://{bucket}/{key}")
129134

130-
if prefix != prefix_original:
131-
paths = fnmatch.filter(paths, path)
135+
if prefix != prefix_original:
136+
paths = fnmatch.filter(paths, path)
132137

133-
if _ignore_suffix is not None:
134-
paths = [p for p in paths if p.endswith(tuple(_ignore_suffix)) is False]
138+
if _ignore_suffix is not None:
139+
paths = [p for p in paths if p.endswith(tuple(_ignore_suffix)) is False]
135140

136-
return paths
141+
if paths:
142+
yield paths
143+
paths = []
137144

138145

139146
def does_object_exist(
@@ -203,8 +210,11 @@ def does_object_exist(
203210

204211

205212
def list_directories(
206-
path: str, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None
207-
) -> List[str]:
213+
path: str,
214+
chunked: bool = False,
215+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
216+
boto3_session: Optional[boto3.Session] = None,
217+
) -> Union[List[str], Iterator[List[str]]]:
208218
"""List Amazon S3 objects from a prefix.
209219
210220
This function accepts Unix shell-style wildcards in the path argument.
@@ -217,6 +227,8 @@ def list_directories(
217227
----------
218228
path : str
219229
S3 path (e.g. s3://bucket/prefix).
230+
chunked: bool
231+
If True returns iterator, and a single list otherwise. False by default.
220232
s3_additional_kwargs : Optional[Dict[str, Any]]
221233
Forwarded to botocore requests.
222234
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
@@ -225,7 +237,7 @@ def list_directories(
225237
226238
Returns
227239
-------
228-
List[str]
240+
Union[List[str], Iterator[List[str]]]
229241
List of objects paths.
230242
231243
Examples
@@ -244,9 +256,15 @@ def list_directories(
244256
['s3://bucket/prefix/dir0/', 's3://bucket/prefix/dir1/', 's3://bucket/prefix/dir2/']
245257
246258
"""
247-
return _list_objects(
248-
path=path, delimiter="/", boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
259+
result_iterator = _list_objects(
260+
path=path,
261+
delimiter="/",
262+
boto3_session=boto3_session,
263+
s3_additional_kwargs=s3_additional_kwargs,
249264
)
265+
if chunked:
266+
return result_iterator
267+
return [path for paths in result_iterator for path in paths]
250268

251269

252270
def list_objects(
@@ -256,9 +274,10 @@ def list_objects(
256274
last_modified_begin: Optional[datetime.datetime] = None,
257275
last_modified_end: Optional[datetime.datetime] = None,
258276
ignore_empty: bool = False,
277+
chunked: bool = False,
259278
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
260279
boto3_session: Optional[boto3.Session] = None,
261-
) -> List[str]:
280+
) -> Union[List[str], Iterator[List[str]]]:
262281
"""List Amazon S3 objects from a prefix.
263282
264283
This function accepts Unix shell-style wildcards in the path argument.
@@ -287,6 +306,8 @@ def list_objects(
287306
The filter is applied only after list all s3 files.
288307
ignore_empty: bool
289308
Ignore files with 0 bytes.
309+
chunked: bool
310+
If True returns iterator, and a single list otherwise. False by default.
290311
s3_additional_kwargs : Optional[Dict[str, Any]]
291312
Forwarded to botocore requests.
292313
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
@@ -295,7 +316,7 @@ def list_objects(
295316
296317
Returns
297318
-------
298-
List[str]
319+
Union[List[str], Iterator[List[str]]]
299320
List of objects paths.
300321
301322
Examples
@@ -314,15 +335,24 @@ def list_objects(
314335
['s3://bucket/prefix0', 's3://bucket/prefix1', 's3://bucket/prefix2']
315336
316337
"""
317-
paths: List[str] = _list_objects(
338+
# On top of user provided ignore_suffix input, add "/"
339+
ignore_suffix_acc = set("/")
340+
if isinstance(ignore_suffix, str):
341+
ignore_suffix_acc.add(ignore_suffix)
342+
elif isinstance(ignore_suffix, list):
343+
ignore_suffix_acc.update(ignore_suffix)
344+
345+
result_iterator = _list_objects(
318346
path=path,
319347
delimiter=None,
320348
suffix=suffix,
321-
ignore_suffix=ignore_suffix,
349+
ignore_suffix=list(ignore_suffix_acc),
322350
boto3_session=boto3_session,
323351
last_modified_begin=last_modified_begin,
324352
last_modified_end=last_modified_end,
325353
ignore_empty=ignore_empty,
326354
s3_additional_kwargs=s3_additional_kwargs,
327355
)
328-
return [p for p in paths if not p.endswith("/")]
356+
if chunked:
357+
return result_iterator
358+
return [path for paths in result_iterator for path in paths]

awswrangler/timestream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ def query(
251251
252252
Returns
253253
-------
254-
pd.DataFrame
254+
Union[pd.DataFrame, Iterator[pd.DataFrame]]
255255
Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
256256
257257
Examples

tests/test_s3.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,19 @@ def test_prefix_cleanup():
323323
assert wr.s3._list._prefix_cleanup(glob.escape("foo[]boo")) == glob.escape("foo")
324324

325325

326-
def test_prefix_list(path):
326+
@pytest.mark.parametrize(
327+
"s3_additional_kwargs",
328+
[None, {"FetchOwner": True}, {"PaginationConfig": {"PageSize": 100}}],
329+
)
330+
def test_prefix_list(path, s3_additional_kwargs):
327331
df = pd.DataFrame({"c0": [0]})
328332
prefixes = ["foo1boo", "foo2boo", "foo3boo", "foo10boo", "foo*boo", "abc1boo", "foo1abc"]
329333
paths = [path + p for p in prefixes]
330334
for p in paths:
331335
wr.s3.to_parquet(df=df, path=p)
332-
assert len(wr.s3.list_objects(path + "*")) == 7
333-
assert len(wr.s3.list_objects(path + "foo*")) == 6
334-
assert len(wr.s3.list_objects(path + "*boo")) == 6
335-
assert len(wr.s3.list_objects(path + "foo?boo")) == 4
336-
assert len(wr.s3.list_objects(path + "foo*boo")) == 5
337-
assert len(wr.s3.list_objects(path + "foo[12]boo")) == 2
336+
assert len(wr.s3.list_objects(path + "*", s3_additional_kwargs=s3_additional_kwargs)) == 7
337+
assert len(wr.s3.list_objects(path + "foo*", s3_additional_kwargs=s3_additional_kwargs)) == 6
338+
assert len(wr.s3.list_objects(path + "*boo", s3_additional_kwargs=s3_additional_kwargs)) == 6
339+
assert len(wr.s3.list_objects(path + "foo?boo", s3_additional_kwargs=s3_additional_kwargs)) == 4
340+
assert len(wr.s3.list_objects(path + "foo*boo", s3_additional_kwargs=s3_additional_kwargs)) == 5
341+
assert len(wr.s3.list_objects(path + "foo[12]boo", s3_additional_kwargs=s3_additional_kwargs)) == 2

0 commit comments

Comments
 (0)