Skip to content

Commit 25e1780

Browse files
Read parquet table chunked (#631)
* WIP: read pq table chunked * Fix pandas SettingWithCopyWarning * Update checking for iterator * Linting Co-authored-by: jaidisido <[email protected]>
1 parent 6b93548 commit 25e1780

File tree

1 file changed

+27
-20
lines changed

1 file changed

+27
-20
lines changed

awswrangler/s3/_read_parquet.py

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import concurrent.futures
44
import datetime
5+
import functools
56
import itertools
67
import json
78
import logging
@@ -339,8 +340,8 @@ def _read_parquet_chunked(
339340
if next_slice is not None:
340341
df = _union(dfs=[next_slice, df], ignore_index=ignore_index)
341342
while len(df.index) >= chunked:
342-
yield df.iloc[:chunked]
343-
df = df.iloc[chunked:]
343+
yield df.iloc[:chunked, :].copy()
344+
df = df.iloc[chunked:, :]
344345
if df.empty:
345346
next_slice = None
346347
else:
@@ -773,26 +774,32 @@ def read_parquet_table(
773774
path: str = res["Table"]["StorageDescriptor"]["Location"]
774775
except KeyError as ex:
775776
raise exceptions.InvalidTable(f"Missing s3 location for {database}.{table}.") from ex
776-
return _data_types.cast_pandas_with_athena_types(
777-
df=read_parquet(
778-
path=path,
779-
path_suffix=filename_suffix,
780-
path_ignore_suffix=filename_ignore_suffix,
781-
partition_filter=partition_filter,
782-
columns=columns,
783-
validate_schema=validate_schema,
784-
categories=categories,
785-
safe=safe,
786-
map_types=map_types,
787-
chunked=chunked,
788-
dataset=True,
789-
use_threads=use_threads,
790-
boto3_session=boto3_session,
791-
s3_additional_kwargs=s3_additional_kwargs,
792-
),
793-
dtype=_extract_partitions_dtypes_from_table_details(response=res),
777+
df = read_parquet(
778+
path=path,
779+
path_suffix=filename_suffix,
780+
path_ignore_suffix=filename_ignore_suffix,
781+
partition_filter=partition_filter,
782+
columns=columns,
783+
validate_schema=validate_schema,
784+
categories=categories,
785+
safe=safe,
786+
map_types=map_types,
787+
chunked=chunked,
788+
dataset=True,
789+
use_threads=use_threads,
790+
boto3_session=boto3_session,
791+
s3_additional_kwargs=s3_additional_kwargs,
792+
)
793+
partial_cast_function = functools.partial(
794+
_data_types.cast_pandas_with_athena_types, dtype=_extract_partitions_dtypes_from_table_details(response=res)
794795
)
795796

797+
if isinstance(df, pd.DataFrame):
798+
return partial_cast_function(df)
799+
800+
# df is a generator, so map is needed for casting dtypes
801+
return map(partial_cast_function, df)
802+
796803

797804
@apply_configs
798805
def read_parquet_metadata(

0 commit comments

Comments
 (0)