[SPARK-55349][PYTHON] Consolidate pandas-to-Arrow conversion utilities in serializers#54125
Conversation
JIRA Issue Information=== Umbrella SPARK-55159 === This comment was automatically generated by GitHub Actions |
| timezone=self._timezone, | ||
| prefers_large_types=self._prefers_large_types, | ||
| ) | ||
| if spark_type is not None |
There was a problem hiding this comment.
in what case will the spark_type be none?
There was a problem hiding this comment.
There is no case now! all inputs are valid spark type. I initially did not want to make this change in this PR (just want to move the method out). I have now updated it.
| coerced_array = self._create_array(original_array, field.type) | ||
| coerced_arrays.append(coerced_array) | ||
| coerced_arrays = [ | ||
| ArrowBatchTransformer.cast_array( |
There was a problem hiding this comment.
should cast_array be in ArrowBatchTransformer?
is it for batch?
There was a problem hiding this comment.
good catch! it is NOT for batch. I had todo comments. In POC, this method will be replaced by enforece_schema transformer. In this PR I wanted to move this method outside. Next I will make it a transformer.
There was a problem hiding this comment.
I moved this out of transformer.
| int_to_decimal_coercion_enabled: bool = False, | ||
| prefers_large_types: bool = False, | ||
| ignore_unexpected_complex_type_values: bool = False, | ||
| error_class: Optional[str] = None, |
There was a problem hiding this comment.
where is the error_class from?
There was a problem hiding this comment.
it is needed for UDTF which requires a different error message. I added comments here. This can be cleaned when we move transform/convert logic out of serializers, then UDTF will be able to set the error message locally.
There was a problem hiding this comment.
I changed this to have a is_udtf flag. The logic for UDTF cast error handling is quite different from other UDFs. See comments for details. We can align the logics in the future.
|
also cc @gaogaotiantian please help review |
|
I want to include more changes. turn it into draft for now. |
python/pyspark/sql/conversion.py
Outdated
| errorClass=error_class, | ||
| messageParameters={ | ||
| "expected": str(target_type), | ||
| "actual": str(arr.type), |
There was a problem hiding this comment.
There's a strong assumption here that the template has expected and actual, which feels a bit weird to me.
There was a problem hiding this comment.
that is unfortunately the current behavior on master. I would prefer to keep it the same for this PR, we can definitely improve it later.
| series_tuples: List[Tuple["pd.Series", DataType]] = [packed] | ||
| else: | ||
| # multiple UDF results: already iterable of tuples | ||
| series_tuples = list(packed) |
There was a problem hiding this comment.
We have a lot of random conversions to list - why is it preferred? I think tuple should be used when possible (or keep it what it is if conversion is unnecessary). Immutable objects are always better - including the input data - we should at least take either.
There was a problem hiding this comment.
the callsites of this method (i.e., eval types wrappers in worker.py) currently return list. I agree we can refactor this, but the change would be too large to include in this PR. we can gradually change it when we move this logic out of serializer?
7d30905 to
8549ab6
Compare
8549ab6 to
a10f02f
Compare
| def create_batch( | ||
| packed: Union[ | ||
| Tuple["pd.Series", DataType], | ||
| Tuple[Tuple["pd.Series", DataType], ...], |
There was a problem hiding this comment.
Can we clear define the input type and avoid the usage of Union here?
There was a problem hiding this comment.
The mixture of input types is also the source of confusion
There was a problem hiding this comment.
Addressed — extracted _normalize_packed helper to normalize the input upfront. Now create_batch always receives a uniform tuple-of-tuples form. Changing all the callsites (eval type wrappers in worker.py) to always produce uniform output is a larger change that would be out of scope for this PR. We'll extract those logic out to each eval type in the future.
|
|
||
| Parameters | ||
| ---------- | ||
| data : pd.DataFrame or list of pd.Series/pd.DataFrame |
There was a problem hiding this comment.
in what case the input is a list of DataFrames?
There was a problem hiding this comment.
A list of DataFrames is used in stateful processing (e.g., applyInPandasWithState), where the batch contains multiple DataFrames representing different parts of the output (count, data, and state), each wrapped as a StructArray column. Updated the docstring to clarify this.
| Whether to enable int to decimal coercion (default False) | ||
| ignore_unexpected_complex_type_values : bool | ||
| Whether to ignore unexpected complex type values in converter (default False) | ||
| is_udtf : bool |
There was a problem hiding this comment.
please add a TODO with JIRA to unify it in the future
python/pyspark/sql/conversion.py
Outdated
|
|
||
| # Handle empty schema (0 columns) | ||
| # Use dummy column + select([]) to preserve row count (PyArrow limitation workaround) | ||
| if not schema.fields: |
There was a problem hiding this comment.
what does not schema.fields means? fields is None?
There was a problem hiding this comment.
schema.fields is a list, so not schema.fields checks for an empty list (0 columns). Changed to len(schema.fields) == 0 for clarity.
| def create_batch( | ||
| packed: Union[ | ||
| Tuple["pa.Array", "pa.DataType"], | ||
| List[Tuple["pa.Array", "pa.DataType"]], |
There was a problem hiding this comment.
can we unify the input type to always match the multiple UDF cases?
There was a problem hiding this comment.
Same as above — added a local normalize helper so create_batch always receives a list of (arr, type) tuples. Unifying the callsites to always produce the multi-UDF form will be done when we extract the logic to each eval type.
| # Basic DataFrame conversion | ||
| df = pd.DataFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]}) | ||
| schema = StructType([StructField("a", IntegerType()), StructField("b", DoubleType())]) | ||
| result = PandasToArrowConversion.convert(df, schema) |
There was a problem hiding this comment.
I see you add a batch of tests here, is the convert method the final version?
There was a problem hiding this comment.
The convert API is stable. Future PRs will focus on internal improvements (e.g., SPARK-55502 to eliminate is_udtf, elevating coerce_arrow_array to batch-level). The tests cover the core paths and will remain valid through those changes.
|
|
||
| # TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema | ||
| # instead of per-column coercion. | ||
| def coerce_arrow_array( |
There was a problem hiding this comment.
we have a ArrowArrayConversion, should this function in it?
There was a problem hiding this comment.
There's already a TODO to elevate this to ArrowBatchTransformer to operate on full RecordBatch schema instead of per-column coercion. Moving it to ArrowArrayConversion first would just add an extra migration step. I'll address this when we do the batch-level coercion refactor.
|
|
||
| mask = None if hasattr(series.array, "__arrow_array__") else series.isnull() | ||
|
|
||
| if is_udtf: |
There was a problem hiding this comment.
shouldn't the is_udtf handling be inside coerce_arrow_array?
There was a problem hiding this comment.
The is_udtf special handling is in the from_pandas stage (catching broader ArrowException instead of just ArrowInvalid), not in the .cast() stage that coerce_arrow_array handles. So it fits better in series_to_array. This flag will be eliminated via SPARK-55502 when we unify UDTF and regular UDF conversion paths.
python/pyspark/sql/conversion.py
Outdated
| ) | ||
| raise PySparkValueError(error_msg) from e | ||
|
|
||
| def convert_column( |
There was a problem hiding this comment.
can we consolidate convert_column and series_to_array?
python/pyspark/sql/conversion.py
Outdated
| ignore_unexpected_complex_type_values=ignore_unexpected_complex_type_values, | ||
| is_udtf=is_udtf, | ||
| ) | ||
| return ArrowBatchTransformer.wrap_struct(nested_batch).column(0) |
There was a problem hiding this comment.
this line really takes me some seconds to remember what it does
There was a problem hiding this comment.
Understand. Added a comment to clarify. We can revisit the wrap_struct transformer in the future.
…tor/consolidate-pandas-to-arrow # Conflicts: # python/pyspark/sql/pandas/serializers.py
gaogaotiantian
left a comment
There was a problem hiding this comment.
I have two very minor comments but overall it's good to me.
| @classmethod | ||
| def convert( | ||
| cls, | ||
| data: Union["pd.DataFrame", Sequence[Union["pd.Series", "pd.DataFrame"]]], |
There was a problem hiding this comment.
nit:
Should the type be Sequence[Union["pd.Series", "pd.DataFrame"]] or Union[Sequence["pd.Series"], Sequence["pd.DataFrame"]]?
There was a problem hiding this comment.
it actually can support
- a single data frame;
- a sequence of series;
- a sequence of data frames (each df will be wrapped into a single column).
We could further optimize this method in the future, but currently it needs to support all use cases.
There was a problem hiding this comment.
Then type hint wise it's the latter one. Sequence[Union["pd.Series", "pd.DataFrame"]] means it supports a sequence of series or dataframes. aka [serie, dataframe]. The latter one means either sequence of series or sequence of dataframes, but not sequence of elements that can be either.
| # then extract columns as a list for uniform iteration. | ||
| columns: List[Union["pd.Series", "pd.DataFrame"]] | ||
| if isinstance(data, pd.DataFrame): | ||
| if assign_cols_by_name and any(isinstance(c, str) for c in data.columns): |
There was a problem hiding this comment.
If assign_cols_by_name is True but columns does not have name, what happens? Is the fallback behavior to ignore assign_cols_by_name expected?
There was a problem hiding this comment.
it will fall back to use position (index) based column reference. This is intended, same as master.
|
Merged to master. |
What changes were proposed in this pull request?
Introduce
PandasToArrowConversion.convert()inconversion.pyto centralize the pandas-to-Arrow conversion logic previously duplicated across multiple serializers. Also extractcast_arrow_array()as a standalone utility for Arrow array type casting.Serializers (
ArrowStreamPandasSerializer,ArrowStreamPandasUDFSerializer,ArrowStreamPandasUDTFSerializer, etc.) now delegate to these shared utilities instead of maintaining their own_create_array,_create_batch, and_create_struct_arraymethods.Why are the changes needed?
Part of SPARK-55159. The same conversion logic was duplicated across 5+ serializer classes, making it hard to maintain. This reduces ~450 lines of duplication.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit tests in
test_conversion.pyforPandasToArrowConversion, plus existing UDF/UDTF tests.Was this patch authored or co-authored using generative AI tooling?
No.