Skip to content

Commit b0e6e97

Browse files
committed
Removing file and row-group concurrency level.
1 parent ace223f commit b0e6e97

File tree

3 files changed

+4
-86
lines changed

3 files changed

+4
-86
lines changed

awswrangler/s3/_read_concurrent.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

awswrangler/s3/_read_parquet.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
_get_path_root,
2929
_union,
3030
)
31-
from awswrangler.s3._read_concurrent import _read_concurrent
3231

3332
_logger: logging.Logger = logging.getLogger(__name__)
3433

@@ -384,41 +383,15 @@ def _read_parquet(
384383
s3_additional_kwargs: Optional[Dict[str, str]],
385384
use_threads: bool,
386385
) -> pd.DataFrame:
387-
if use_threads is False:
388-
table: pa.Table = _read_parquet_file(
386+
return _arrowtable2df(
387+
table=_read_parquet_file(
389388
path=path,
390389
columns=columns,
391390
categories=categories,
392391
boto3_session=boto3_session,
393392
s3_additional_kwargs=s3_additional_kwargs,
394393
use_threads=use_threads,
395-
)
396-
else:
397-
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
398-
num_row_groups: int = _count_row_groups(
399-
path=path,
400-
categories=categories,
401-
boto3_session=boto3_session,
402-
s3_additional_kwargs=s3_additional_kwargs,
403-
use_threads=use_threads,
404-
)
405-
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
406-
tables: Tuple[pa.Table, ...] = tuple(
407-
executor.map(
408-
_read_parquet_row_group,
409-
range(num_row_groups),
410-
itertools.repeat(path),
411-
itertools.repeat(columns),
412-
itertools.repeat(categories),
413-
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
414-
itertools.repeat(s3_additional_kwargs),
415-
itertools.repeat(use_threads),
416-
)
417-
)
418-
table = pa.lib.concat_tables(tables, promote=False)
419-
_logger.debug("Converting PyArrow Table to Pandas DataFrame...")
420-
return _arrowtable2df(
421-
table=table,
394+
),
422395
categories=categories,
423396
safe=safe,
424397
use_threads=use_threads,
@@ -604,9 +577,6 @@ def read_parquet(
604577
boto3_session=boto3_session,
605578
s3_additional_kwargs=s3_additional_kwargs,
606579
)
607-
if use_threads is True:
608-
args["use_threads"] = True
609-
return _read_concurrent(func=_read_parquet, paths=paths, ignore_index=None, **args)
610580
return _union(dfs=[_read_parquet(path=p, **args) for p in paths], ignore_index=None)
611581

612582

awswrangler/s3/_read_text.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
_get_path_root,
2121
_union,
2222
)
23-
from awswrangler.s3._read_concurrent import _read_concurrent
2423

2524
_logger: logging.Logger = logging.getLogger(__name__)
2625

@@ -137,8 +136,6 @@ def _read_text(
137136
ret = _read_text_chunked(paths=paths, chunksize=chunksize, **args)
138137
elif len(paths) == 1:
139138
ret = _read_text_file(path=paths[0], **args)
140-
elif use_threads is True:
141-
ret = _read_concurrent(func=_read_text_file, paths=paths, ignore_index=ignore_index, **args)
142139
else:
143140
ret = _union(dfs=[_read_text_file(path=p, **args) for p in paths], ignore_index=ignore_index)
144141
return ret
@@ -361,7 +358,7 @@ def read_fwf(
361358
Reading all fixed-width formatted (FWF) files under a prefix
362359
363360
>>> import awswrangler as wr
364-
>>> df = wr.s3.read_fwf(path='s3://bucket/prefix/', widths=[1, 3], names=['c0', 'c1])
361+
>>> df = wr.s3.read_fwf(path='s3://bucket/prefix/', widths=[1, 3], names=['c0', 'c1'])
365362
366363
Reading all fixed-width formatted (FWF) files from a list
367364

0 commit comments

Comments
 (0)