Skip to content

Commit 12d4ba1

Browse files
authored
Merge pull request #168 from jewelltp/fix_chunk_issue
added validate_schema parameter to read_parquet
2 parents 66a1be0 + 59fc0d3 commit 12d4ba1

File tree

2 files changed

+47
-4
lines changed

2 files changed

+47
-4
lines changed

awswrangler/s3.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,6 +1196,7 @@ def _read_parquet_init(
11961196
path: Union[str, List[str]],
11971197
filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None,
11981198
categories: List[str] = None,
1199+
validate_schema: bool = True,
11991200
dataset: bool = False,
12001201
use_threads: bool = True,
12011202
boto3_session: Optional[boto3.Session] = None,
@@ -1212,7 +1213,12 @@ def _read_parquet_init(
12121213
fs: s3fs.S3FileSystem = _utils.get_fs(session=boto3_session, s3_additional_kwargs=s3_additional_kwargs)
12131214
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
12141215
data: pyarrow.parquet.ParquetDataset = pyarrow.parquet.ParquetDataset(
1215-
path_or_paths=path_or_paths, filesystem=fs, metadata_nthreads=cpus, filters=filters, read_dictionary=categories
1216+
path_or_paths=path_or_paths,
1217+
filesystem=fs,
1218+
metadata_nthreads=cpus,
1219+
filters=filters,
1220+
read_dictionary=categories,
1221+
validate_schema=validate_schema,
12161222
)
12171223
return data
12181224

@@ -1221,6 +1227,7 @@ def read_parquet(
12211227
path: Union[str, List[str]],
12221228
filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None,
12231229
columns: Optional[List[str]] = None,
1230+
validate_schema: bool = True,
12241231
chunked: bool = False,
12251232
dataset: bool = False,
12261233
categories: List[str] = None,
@@ -1244,7 +1251,11 @@ def read_parquet(
12441251
filters: Union[List[Tuple], List[List[Tuple]]], optional
12451252
List of filters to apply, like ``[[('x', '=', 0), ...], ...]``.
12461253
columns : List[str], optional
1247-
Names of columns to read from the file(s)
1254+
Names of columns to read from the file(s).
1255+
validate_schema:
1256+
Check that individual file schemas are all the same / compatible. Schemas within a
1257+
folder prefix should all be the same. Disable if you have schemas that are different
1258+
and want to disable this check.
12481259
chunked : bool
12491260
If True will break the data in smaller DataFrames (Non deterministic number of lines).
12501261
Otherwise return a single DataFrame with the whole data.
@@ -1306,9 +1317,12 @@ def read_parquet(
13061317
use_threads=use_threads,
13071318
boto3_session=boto3_session,
13081319
s3_additional_kwargs=s3_additional_kwargs,
1320+
validate_schema=validate_schema,
13091321
)
13101322
if chunked is False:
1311-
return _read_parquet(data=data, columns=columns, categories=categories, use_threads=use_threads)
1323+
return _read_parquet(
1324+
data=data, columns=columns, categories=categories, use_threads=use_threads, validate_schema=validate_schema
1325+
)
13121326
return _read_parquet_chunked(data=data, columns=columns, categories=categories, use_threads=use_threads)
13131327

13141328

@@ -1317,14 +1331,16 @@ def _read_parquet(
13171331
columns: Optional[List[str]] = None,
13181332
categories: List[str] = None,
13191333
use_threads: bool = True,
1334+
validate_schema: bool = True,
13201335
) -> pd.DataFrame:
13211336
tables: List[pa.Table] = []
13221337
for piece in data.pieces:
13231338
table: pa.Table = piece.read(
13241339
columns=columns, use_threads=use_threads, partitions=data.partitions, use_pandas_metadata=False
13251340
)
13261341
tables.append(table)
1327-
table = pa.lib.concat_tables(tables)
1342+
promote: bool = not validate_schema
1343+
table = pa.lib.concat_tables(tables, promote=promote)
13281344
return table.to_pandas(
13291345
use_threads=use_threads,
13301346
split_blocks=True,

testing/test_awswrangler/test_data_lake.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,3 +664,30 @@ def test_category(bucket, database):
664664
ensure_data_types_category(df2)
665665
wr.s3.delete_objects(path=paths)
666666
assert wr.catalog.delete_table_if_exists(database=database, table="test_category") is True
667+
668+
669+
def test_parquet_validate_schema(bucket, database):
670+
path = f"s3://{bucket}/test_parquet_file_validate/"
671+
wr.s3.delete_objects(path=path)
672+
673+
df = pd.DataFrame({"id": [1, 2, 3]})
674+
path_file = f"s3://{bucket}/test_parquet_file_validate/0.parquet"
675+
wr.s3.to_parquet(df=df, path=path_file)
676+
wr.s3.wait_objects_exist(paths=[path_file])
677+
678+
df2 = pd.DataFrame({"id2": [1, 2, 3], "val": ["foo", "boo", "bar"]})
679+
path_file2 = f"s3://{bucket}/test_parquet_file_validate/1.parquet"
680+
wr.s3.to_parquet(df=df2, path=path_file2)
681+
wr.s3.wait_objects_exist(paths=[path_file2])
682+
683+
df3 = wr.s3.read_parquet(path=path, validate_schema=False)
684+
assert len(df3.index) == 6
685+
assert len(df3.columns) == 3
686+
687+
with pytest.raises(ValueError):
688+
wr.s3.read_parquet(path=path, validate_schema=True)
689+
690+
with pytest.raises(ValueError):
691+
wr.s3.store_parquet_metadata(path=path, database=database, table="test_parquet_validate_schema", dataset=True)
692+
693+
wr.s3.delete_objects(path=path)

0 commit comments

Comments
 (0)