Skip to content

Commit 2fb23f6

Browse files
committed
Remove unnecessary schema inference. #524
1 parent 4e255d4 commit 2fb23f6

File tree

2 files changed

+39
-12
lines changed

2 files changed

+39
-12
lines changed

awswrangler/s3/_read_parquet.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -301,18 +301,19 @@ def _read_parquet_chunked(
301301
)
302302
if pq_file is None:
303303
continue
304-
schema: Dict[str, str] = _data_types.athena_types_from_pyarrow_schema(
305-
schema=pq_file.schema.to_arrow_schema(), partitions=None
306-
)[0]
307-
if validate_schema is True and last_schema is not None:
308-
if schema != last_schema:
309-
raise exceptions.InvalidSchemaConvergence(
310-
f"Was detect at least 2 different schemas:\n"
311-
f" - {last_path} -> {last_schema}\n"
312-
f" - {path} -> {schema}"
313-
)
314-
last_schema = schema
315-
last_path = path
304+
if validate_schema is True:
305+
schema: Dict[str, str] = _data_types.athena_types_from_pyarrow_schema(
306+
schema=pq_file.schema.to_arrow_schema(), partitions=None
307+
)[0]
308+
if last_schema is not None:
309+
if schema != last_schema:
310+
raise exceptions.InvalidSchemaConvergence(
311+
f"Was detect at least 2 different schemas:\n"
312+
f" - {last_path} -> {last_schema}\n"
313+
f" - {path} -> {schema}"
314+
)
315+
last_schema = schema
316+
last_path = path
316317
num_row_groups: int = pq_file.num_row_groups
317318
_logger.debug("num_row_groups: %s", num_row_groups)
318319
for i in range(num_row_groups):

tests/test_s3_parquet.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,3 +428,29 @@ def test_empty_file(path, use_threads):
428428
df2 = wr.s3.read_parquet(path, dataset=True, use_threads=use_threads)
429429
df2["par"] = df2["par"].astype("string")
430430
assert df.equals(df2)
431+
432+
433+
def test_read_chunked(path):
434+
path = f"{path}file.parquet"
435+
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [None, None, None]})
436+
wr.s3.to_parquet(df, path)
437+
df2 = next(wr.s3.read_parquet(path, chunked=True))
438+
assert df.shape == df2.shape
439+
440+
441+
def test_read_chunked_validation_exception(path):
442+
path = f"{path}file.parquet"
443+
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [None, None, None]})
444+
wr.s3.to_parquet(df, path)
445+
with pytest.raises(wr.exceptions.UndetectedType):
446+
next(wr.s3.read_parquet(path, chunked=True, validate_schema=True))
447+
448+
449+
def test_read_chunked_validation_exception2(path):
450+
df = pd.DataFrame({"c0": [0, 1, 2]})
451+
wr.s3.to_parquet(df, f"{path}file0.parquet")
452+
df = pd.DataFrame({"c1": [0, 1, 2]})
453+
wr.s3.to_parquet(df, f"{path}file1.parquet")
454+
with pytest.raises(wr.exceptions.InvalidSchemaConvergence):
455+
for _ in wr.s3.read_parquet(path, dataset=True, chunked=True, validate_schema=True):
456+
pass

0 commit comments

Comments
 (0)