@@ -462,6 +462,7 @@ def _read_parquet_file(
462462 boto3_session : boto3 .Session ,
463463 s3_additional_kwargs : Optional [Dict [str , str ]],
464464 use_threads : Union [bool , int ],
465+ validate_schema : Optional [bool ],
465466 version_id : Optional [str ] = None ,
466467 pyarrow_additional_kwargs : Optional [Dict [str , Any ]] = None ,
467468) -> pa .Table :
@@ -481,6 +482,12 @@ def _read_parquet_file(
481482 read_dictionary = categories ,
482483 coerce_int96_timestamp_unit = pyarrow_args ["coerce_int96_timestamp_unit" ],
483484 )
485+ if validate_schema and pq_file and columns :
486+ pq_file_columns : List [str ] = pq_file .schema .names ()
487+ for column in columns :
488+ if column not in pq_file_columns :
489+ raise exceptions .InvalidArgument (f"column: { column } does not exist" )
490+
484491 if pq_file is None :
485492 raise exceptions .InvalidFile (f"Invalid Parquet file: { path } " )
486493 return pq_file .read (columns = columns , use_threads = False , use_pandas_metadata = False )
@@ -521,6 +528,7 @@ def _read_parquet(
521528 map_types : bool ,
522529 boto3_session : Union [boto3 .Session , _utils .Boto3PrimitivesType ],
523530 dataset : bool ,
531+ validate_schema : Optional [bool ],
524532 path_root : Optional [str ],
525533 s3_additional_kwargs : Optional [Dict [str , str ]],
526534 use_threads : Union [bool , int ],
@@ -537,6 +545,7 @@ def _read_parquet(
537545 s3_additional_kwargs = s3_additional_kwargs ,
538546 use_threads = use_threads ,
539547 version_id = version_id ,
548+ validate_schema = validate_schema ,
540549 pyarrow_additional_kwargs = pyarrow_args ,
541550 ),
542551 categories = categories ,
@@ -750,6 +759,7 @@ def read_parquet(
750759 "boto3_session" : session ,
751760 "dataset" : dataset ,
752761 "path_root" : path_root ,
762+ "validate_schema" : validate_schema ,
753763 "s3_additional_kwargs" : s3_additional_kwargs ,
754764 "use_threads" : use_threads ,
755765 "pyarrow_additional_kwargs" : pyarrow_additional_kwargs ,
@@ -759,14 +769,15 @@ def read_parquet(
759769 return _read_parquet_chunked (
760770 paths = paths ,
761771 chunked = chunked ,
762- validate_schema = validate_schema ,
763772 ignore_index = ignore_index ,
764773 version_ids = versions ,
765774 ** args ,
766775 )
767776 if len (paths ) == 1 :
768777 return _read_parquet (
769- path = paths [0 ], version_id = versions [paths [0 ]] if isinstance (versions , dict ) else None , ** args
778+ path = paths [0 ],
779+ version_id = versions [paths [0 ]] if isinstance (versions , dict ) else None ,
780+ ** args ,
770781 )
771782 if validate_schema is True :
772783 _validate_schemas_from_files (
@@ -779,7 +790,11 @@ def read_parquet(
779790 )
780791 return _union (
781792 dfs = _read_dfs_from_multiple_paths (
782- read_func = _read_parquet , paths = paths , version_ids = versions , use_threads = use_threads , kwargs = args
793+ read_func = _read_parquet ,
794+ paths = paths ,
795+ version_ids = versions ,
796+ use_threads = use_threads ,
797+ kwargs = args ,
783798 ),
784799 ignore_index = ignore_index ,
785800 )
0 commit comments