Skip to content

Commit b14c199

Browse files
authored
Revert "Distribute describe_objects S3 method (#1744)" (#1745)
This reverts commit 4fd0914.
1 parent 0a3e4d5 commit b14c199

File tree

3 files changed

+74
-20
lines changed

3 files changed

+74
-20
lines changed

awswrangler/distributed/ray/_register.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from awswrangler.distributed.ray import ray_remote
77
from awswrangler.lakeformation._read import _get_work_unit_results
88
from awswrangler.s3._delete import _delete_objects
9-
from awswrangler.s3._describe import _describe_object
109
from awswrangler.s3._read_parquet import _read_parquet, _read_parquet_metadata_file
1110
from awswrangler.s3._read_text import _read_text
1211
from awswrangler.s3._select import _select_object_content, _select_query
@@ -21,7 +20,6 @@ def register_ray() -> None:
2120
"""Register dispatched Ray and Modin (on Ray) methods."""
2221
for func in [
2322
_get_work_unit_results,
24-
_describe_object,
2523
_delete_objects,
2624
_read_parquet_metadata_file,
2725
_select_query,

awswrangler/s3/_describe.py

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Amazon S3 Describe Module (INTERNAL)."""
22

3+
import concurrent.futures
34
import datetime
45
import itertools
56
import logging
@@ -8,19 +9,15 @@
89
import boto3
910

1011
from awswrangler import _utils
11-
from awswrangler._distributed import engine
12-
from awswrangler._threading import _get_executor
13-
from awswrangler.distributed.ray import ray_get
1412
from awswrangler.s3 import _fs
1513
from awswrangler.s3._list import _path2list
1614

1715
_logger: logging.Logger = logging.getLogger(__name__)
1816

1917

20-
@engine.dispatch_on_engine
2118
def _describe_object(
22-
boto3_session: boto3.Session,
2319
path: str,
20+
boto3_session: boto3.Session,
2421
s3_additional_kwargs: Optional[Dict[str, Any]],
2522
version_id: Optional[str] = None,
2623
) -> Tuple[str, Dict[str, Any]]:
@@ -43,6 +40,18 @@ def _describe_object(
4340
return path, desc
4441

4542

43+
def _describe_object_concurrent(
44+
path: str,
45+
boto3_primitives: _utils.Boto3PrimitivesType,
46+
s3_additional_kwargs: Optional[Dict[str, Any]],
47+
version_id: Optional[str] = None,
48+
) -> Tuple[str, Dict[str, Any]]:
49+
boto3_session = _utils.boto3_from_primitives(primitives=boto3_primitives)
50+
return _describe_object(
51+
path=path, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, version_id=version_id
52+
)
53+
54+
4655
def describe_objects(
4756
path: Union[str, List[str]],
4857
version_id: Optional[Union[str, Dict[str, str]]] = None,
@@ -118,22 +127,41 @@ def describe_objects(
118127
last_modified_end=last_modified_end,
119128
s3_additional_kwargs=s3_additional_kwargs,
120129
)
121-
122130
if len(paths) < 1:
123131
return {}
124132
resp_list: List[Tuple[str, Dict[str, Any]]]
125-
126-
executor = _get_executor(use_threads=use_threads)
127-
resp_list = ray_get(
128-
executor.map(
129-
_describe_object,
130-
boto3_session,
131-
paths,
132-
itertools.repeat(s3_additional_kwargs),
133-
[version_id.get(p) if isinstance(version_id, dict) else version_id for p in paths],
134-
)
135-
)
136-
133+
if len(paths) == 1:
134+
resp_list = [
135+
_describe_object(
136+
path=paths[0],
137+
version_id=version_id.get(paths[0]) if isinstance(version_id, dict) else version_id,
138+
boto3_session=boto3_session,
139+
s3_additional_kwargs=s3_additional_kwargs,
140+
)
141+
]
142+
elif use_threads is False:
143+
resp_list = [
144+
_describe_object(
145+
path=p,
146+
version_id=version_id.get(p) if isinstance(version_id, dict) else version_id,
147+
boto3_session=boto3_session,
148+
s3_additional_kwargs=s3_additional_kwargs,
149+
)
150+
for p in paths
151+
]
152+
else:
153+
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
154+
versions = [version_id.get(p) if isinstance(version_id, dict) else version_id for p in paths]
155+
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
156+
resp_list = list(
157+
executor.map(
158+
_describe_object_concurrent,
159+
paths,
160+
versions,
161+
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
162+
itertools.repeat(s3_additional_kwargs),
163+
)
164+
)
137165
desc_dict: Dict[str, Dict[str, Any]] = dict(resp_list)
138166
return desc_dict
139167

awswrangler/s3/_read.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Amazon S3 Read Module (PRIVATE)."""
22

3+
import concurrent.futures
34
import logging
5+
from functools import partial
46
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast
57

68
import numpy as np
@@ -9,6 +11,7 @@
911

1012
from awswrangler import exceptions
1113
from awswrangler._arrow import _extract_partitions_from_path
14+
from awswrangler._utils import boto3_to_primitives, ensure_cpu_count
1215
from awswrangler.s3._list import _prefix_cleanup
1316

1417
_logger: logging.Logger = logging.getLogger(__name__)
@@ -107,3 +110,28 @@ def _union(dfs: List[pd.DataFrame], ignore_index: Optional[bool]) -> pd.DataFram
107110
for df in dfs:
108111
df[col] = pd.Categorical(df[col].values, categories=cat.categories)
109112
return pd.concat(objs=dfs, sort=False, copy=False, ignore_index=ignore_index)
113+
114+
115+
def _read_dfs_from_multiple_paths(
116+
read_func: Callable[..., pd.DataFrame],
117+
paths: List[str],
118+
version_ids: Optional[Dict[str, str]],
119+
use_threads: Union[bool, int],
120+
kwargs: Dict[str, Any],
121+
) -> List[pd.DataFrame]:
122+
cpus = ensure_cpu_count(use_threads)
123+
if cpus < 2:
124+
return [
125+
read_func(
126+
path,
127+
version_id=version_ids.get(path) if version_ids else None,
128+
**kwargs,
129+
)
130+
for path in paths
131+
]
132+
133+
with concurrent.futures.ThreadPoolExecutor(max_workers=ensure_cpu_count(use_threads)) as executor:
134+
kwargs["boto3_session"] = boto3_to_primitives(kwargs["boto3_session"])
135+
partial_read_func = partial(read_func, **kwargs)
136+
versions = [version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths]
137+
return list(df for df in executor.map(partial_read_func, paths, versions))

0 commit comments

Comments
 (0)