2121from typing import TYPE_CHECKING , Any , Callable , List , Optional , Sequence , Union , overload
2222
2323import pyspark
24- from pyspark .errors import PySparkValueError
24+ from pyspark .errors import PySparkTypeError , PySparkValueError
2525from pyspark .sql .pandas .types import (
26- _dedup_names ,
27- _deduplicate_field_names ,
26+ _create_converter_from_pandas ,
2827 _create_converter_to_pandas ,
29- to_arrow_schema ,
28+ _deduplicate_field_names ,
29+ _dedup_names ,
3030 from_arrow_schema ,
31+ to_arrow_schema ,
32+ to_arrow_type ,
3133)
3234from pyspark .sql .pandas .utils import require_minimum_pyarrow_version
3335from pyspark .sql .types import (
@@ -228,7 +230,6 @@ def convert(
228230 assign_cols_by_name : bool = False ,
229231 int_to_decimal_coercion_enabled : bool = False ,
230232 ignore_unexpected_complex_type_values : bool = False ,
231- is_udtf : bool = False ,
232233 ) -> "pa.RecordBatch" :
233234 """
234235 Convert a pandas DataFrame or list of Series/DataFrames to an Arrow RecordBatch.
@@ -255,14 +256,6 @@ def convert(
255256 Whether to enable int to decimal coercion (default False)
256257 ignore_unexpected_complex_type_values : bool
257258 Whether to ignore unexpected complex type values in converter (default False)
258- is_udtf : bool
259- Whether this conversion is for a UDTF. UDTFs use broader Arrow exception
260- handling to allow more type coercions (e.g., struct field casting via
261- ArrowTypeError), and convert errors to UDTF_ARROW_TYPE_CAST_ERROR.
262- # TODO(SPARK-55502): Unify UDTF and regular UDF conversion paths to
263- # eliminate the is_udtf flag.
264- Regular UDFs only catch ArrowInvalid to preserve legacy behavior where
265- e.g. string→decimal must raise an error. (default False)
266259
267260 Returns
268261 -------
@@ -271,9 +264,6 @@ def convert(
271264 import pyarrow as pa
272265 import pandas as pd
273266
274- from pyspark .errors import PySparkTypeError , PySparkValueError , PySparkRuntimeError
275- from pyspark .sql .pandas .types import to_arrow_type , _create_converter_from_pandas
276-
277267 # Handle empty schema (0 columns)
278268 # Use dummy column + select([]) to preserve row count (PyArrow limitation workaround)
279269 if len (schema .fields ) == 0 :
@@ -318,7 +308,6 @@ def convert_column(
318308 assign_cols_by_name = assign_cols_by_name ,
319309 int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled ,
320310 ignore_unexpected_complex_type_values = ignore_unexpected_complex_type_values ,
321- is_udtf = is_udtf ,
322311 )
323312 # Wrap the nested RecordBatch as a single StructArray column
324313 return ArrowBatchTransformer .wrap_struct (nested_batch ).column (0 )
@@ -343,60 +332,38 @@ def convert_column(
343332
344333 mask = None if hasattr (series .array , "__arrow_array__" ) else series .isnull ()
345334
346- if is_udtf :
347- # UDTF path: broad ArrowException catch so that both ArrowInvalid
348- # AND ArrowTypeError (e.g. dict→struct) trigger the cast fallback.
335+ # Unified conversion path: broad ArrowException catch so that both ArrowInvalid
336+ # AND ArrowTypeError (e.g. dict→struct) trigger the cast fallback.
337+ try :
349338 try :
350- try :
351- return pa .Array .from_pandas (
352- series , mask = mask , type = arrow_type , safe = safecheck
353- )
354- except pa .lib .ArrowException : # broad: includes ArrowTypeError
355- if arrow_cast :
356- return pa .Array .from_pandas (series , mask = mask ).cast (
357- target_type = arrow_type , safe = safecheck
358- )
359- raise
360- except pa .lib .ArrowException : # convert any Arrow error to user-friendly message
361- raise PySparkRuntimeError (
362- errorClass = "UDTF_ARROW_TYPE_CAST_ERROR" ,
363- messageParameters = {
364- "col_name" : field_name ,
365- "col_type" : str (series .dtype ),
366- "arrow_type" : str (arrow_type ),
367- },
368- ) from None
369- else :
370- # UDF path: only ArrowInvalid triggers the cast fallback.
371- # ArrowTypeError (e.g. string→decimal) must NOT be silently cast.
372- try :
373- try :
374- return pa .Array .from_pandas (
375- series , mask = mask , type = arrow_type , safe = safecheck
376- )
377- except pa .lib .ArrowInvalid : # narrow: skip ArrowTypeError
378- if arrow_cast :
379- return pa .Array .from_pandas (series , mask = mask ).cast (
380- target_type = arrow_type , safe = safecheck
381- )
382- raise
383- except TypeError as e : # includes pa.lib.ArrowTypeError
384- raise PySparkTypeError (
385- f"Exception thrown when converting pandas.Series ({ series .dtype } ) "
386- f"with name '{ field_name } ' to Arrow Array ({ arrow_type } )."
387- ) from e
388- except ValueError as e : # includes pa.lib.ArrowInvalid
389- error_msg = (
390- f"Exception thrown when converting pandas.Series ({ series .dtype } ) "
391- f"with name '{ field_name } ' to Arrow Array ({ arrow_type } )."
339+ return pa .Array .from_pandas (
340+ series , mask = mask , type = arrow_type , safe = safecheck
392341 )
393- if safecheck :
394- error_msg += (
395- " It can be caused by overflows or other unsafe conversions "
396- "warned by Arrow. Arrow safe type check can be disabled by using "
397- "SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`."
342+ except pa .lib .ArrowException : # broad: includes ArrowTypeError and ArrowInvalid
343+ if arrow_cast :
344+ return pa .Array .from_pandas (series , mask = mask ).cast (
345+ target_type = arrow_type , safe = safecheck
398346 )
399- raise PySparkValueError (error_msg ) from e
347+ raise
348+ except (TypeError , pa .lib .ArrowTypeError ) as e :
349+ # ArrowTypeError is a subclass of TypeError
350+ raise PySparkTypeError (
351+ f"Exception thrown when converting pandas.Series ({ series .dtype } ) "
352+ f"with name '{ field_name } ' to Arrow Array ({ arrow_type } )."
353+ ) from e
354+ except (ValueError , pa .lib .ArrowInvalid ) as e :
355+ # ArrowInvalid is a subclass of ValueError
356+ error_msg = (
357+ f"Exception thrown when converting pandas.Series ({ series .dtype } ) "
358+ f"with name '{ field_name } ' to Arrow Array ({ arrow_type } )."
359+ )
360+ if safecheck :
361+ error_msg += (
362+ " It can be caused by overflows or other unsafe conversions "
363+ "warned by Arrow. Arrow safe type check can be disabled by using "
364+ "SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`."
365+ )
366+ raise PySparkValueError (error_msg ) from e
400367
401368 arrays = [convert_column (col , field ) for col , field in zip (columns , schema .fields )]
402369 return pa .RecordBatch .from_arrays (arrays , schema .names )
0 commit comments