Skip to content

Commit 0e068fe

Browse files
committed
Parquet chunksize now paginating on Pandas instead of PyArrow
1 parent ad22aea commit 0e068fe

File tree

1 file changed

+23
-15
lines changed

1 file changed

+23
-15
lines changed

awswrangler/s3.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,7 +1132,7 @@ def _to_parquet_dataset(
11321132
schema: pa.Schema = _data_types.pyarrow_schema_from_pandas(
11331133
df=df, index=index, ignore_cols=partition_cols, dtype=dtype
11341134
)
1135-
_logger.debug("schema: %s", schema)
1135+
_logger.debug("schema: \n%s", schema)
11361136
if not partition_cols:
11371137
file_path: str = f"{path}{uuid.uuid4().hex}{compression_ext}.parquet"
11381138
_to_parquet_file(
@@ -1733,24 +1733,32 @@ def _read_parquet_chunked(
17331733
use_threads: bool = True,
17341734
) -> Iterator[pd.DataFrame]:
17351735
promote: bool = not validate_schema
1736-
next_slice: Optional[pa.Table] = None
1736+
next_slice: Optional[pd.DataFrame] = None
17371737
for piece in data.pieces:
1738-
table: pa.Table = piece.read(
1739-
columns=columns, use_threads=use_threads, partitions=data.partitions, use_pandas_metadata=False
1738+
df: pd.DataFrame = _table2df(
1739+
table=piece.read(
1740+
columns=columns,
1741+
use_threads=use_threads,
1742+
partitions=data.partitions,
1743+
use_pandas_metadata=False
1744+
),
1745+
categories=categories,
1746+
use_threads=use_threads
17401747
)
17411748
if chunked is True:
1742-
yield _table2df(table=table, categories=categories, use_threads=use_threads)
1749+
yield df
17431750
else:
1744-
if next_slice:
1745-
table = pa.lib.concat_tables([next_slice, table], promote=promote)
1746-
while len(table) >= chunked:
1747-
yield _table2df(
1748-
table=table.slice(offset=0, length=chunked), categories=categories, use_threads=use_threads
1749-
)
1750-
table = table.slice(offset=chunked, length=None)
1751-
next_slice = table
1752-
if next_slice:
1753-
yield _table2df(table=next_slice, categories=categories, use_threads=use_threads)
1751+
if next_slice is not None:
1752+
df = pd.concat(objs=[next_slice, df], ignore_index=True, sort=False)
1753+
while len(df.index) >= chunked:
1754+
yield df.iloc[:chunked]
1755+
df = df.iloc[chunked:]
1756+
if df.empty:
1757+
next_slice = None
1758+
else:
1759+
next_slice = df
1760+
if next_slice is not None:
1761+
yield next_slice
17541762

17551763

17561764
def _table2df(table: pa.Table, categories: List[str] = None, use_threads: bool = True) -> pd.DataFrame:

0 commit comments

Comments
 (0)