Skip to content

[SPARK-44856][PYTHON] Improve Python UDTF arrow serializer performance#50099

Closed
HyukjinKwon wants to merge 2 commits intoapache:masterfrom
HyukjinKwon:SPARK-44856
Closed

[SPARK-44856][PYTHON] Improve Python UDTF arrow serializer performance#50099
HyukjinKwon wants to merge 2 commits intoapache:masterfrom
HyukjinKwon:SPARK-44856

Conversation

@HyukjinKwon
Copy link
Copy Markdown
Member

@HyukjinKwon HyukjinKwon commented Feb 27, 2025

What changes were proposed in this pull request?

This PR removes pandas <> Arrow <> pandas conversion in Arrow-optimized Python UDTF by directly using PyArrow.

Why are the changes needed?

Currently, there is a lot of overhead in the arrow serializer for Python UDTFs. The overhead is largely from converting arrow batches into pandas series and converting UDTF's results back to a pandas dataframe.

We should try directly converting Python object into arrow and vice versa to avoid the expensive pandas conversion.

Does this PR introduce any user-facing change?

Yes. Previously the conversion was

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check both error classes?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that, previously, we were able to create a pandas DataFrame with different schema but Arrow does not seem allowing it.

More specifically previous code path:

                        yield verify_result(
                            pd.DataFrame(check_return_value(res))
                        ), arrow_return_type

did not throw an error at pd.DataFrame(...).

However, new code path

convert_to_arrow(func())
                    ret = LocalDataToArrowConversion.convert(
                       data, return_type, prefers_large_var_types
                   ).to_batches()

this throw an error at LocalDataToArrowConversion.convert(...).

We could further improve this but I would prefer to get out of this scope in this PR considering that there are already behaviour differences with/without Arrow.

dongjoon-hyun
dongjoon-hyun previously approved these changes Mar 7, 2025
@HyukjinKwon
Copy link
Copy Markdown
Member Author

Let me add a legacy conf ... to be safer ..

@dongjoon-hyun
Copy link
Copy Markdown
Member

Could you fix the remaining failures?

pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_ARROW_TYPE_CAST_ERROR] Cannot convert the output value of the column 'x' with type 'object' to the specified return type of the column: 'list<element: int32>'. Please check if the data types match and try again.

dongjoon-hyun
dongjoon-hyun previously approved these changes Mar 14, 2025
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you again for making it work, @HyukjinKwon .

Comment on lines 1525 to 1568
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to before the above line?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no .. otherwise, the tests fail. There is a test that throw an exception when the batch is empty. LocalDataToArrowConversion.convert checks if data is empty, and the exception will be wrapped by PySparkRuntimeError.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this return multiple batches? What happens in that case?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if it grows over the default size (https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.to_batches) it can be multiple batches. It should work though - I wrote the codes that it should work via ArrowStreamUDFSerializer.dump_stream.

Comment on lines 1549 to 1594
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, it must return exactly one batch per input row. convert_to_arrow should always return one batch. cc @allisonwang-db

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I read about the codes, it seems fine... but would be great if we can confirm this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For directly UDTF usage like MyUDTF(lit(1), lit(2)) it's fine to return multiple batches, but for lateral joins like SELECT * FROM t, LATERAL MyUDTF(a, b)), we must match each input row with all output of the UDTF for that row. If we return multiple batches, then we can't distinguish which batch to join with which input row.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked. https://github.com/apache/arrow/blob/d2ddee62329eb711572b4d71d6380673d7f7edd1/cpp/src/arrow/table.cc#L612-L638

The batch size will be long max by default, which I believe it's pretty safe. Arrow batch cannot contain # of rows larger than long in any way.

@HyukjinKwon HyukjinKwon force-pushed the SPARK-44856 branch 2 times, most recently from 2632c5d to d417e42 Compare April 3, 2025 01:32
Co-authored-by: Allison Wang <allison.wang@databricks.com>
@HyukjinKwon
Copy link
Copy Markdown
Member Author

Merged to master.

HyukjinKwon added a commit that referenced this pull request May 12, 2025
…rk Connect compatibility test

### What changes were proposed in this pull request?

This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now.

### Why are the changes needed?

After #50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629

In fact, UDTF with Arrow is still under development so we can skip the tests for now

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Will monitor the build.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50856 from HyukjinKwon/SPARK-44856-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request May 12, 2025
…rk Connect compatibility test

This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now.

After #50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629

In fact, UDTF with Arrow is still under development so we can skip the tests for now

No, test-only.

Will monitor the build.

No.

Closes #50856 from HyukjinKwon/SPARK-44856-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit dfc8175)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request May 12, 2025
…rk Connect compatibility test

### What changes were proposed in this pull request?

This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now.

### Why are the changes needed?

After #50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629

In fact, UDTF with Arrow is still under development so we can skip the tests for now

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Will monitor the build.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50856 from HyukjinKwon/SPARK-44856-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit dfc8175)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
### What changes were proposed in this pull request?

This PR removes pandas <> Arrow <> pandas conversion in Arrow-optimized Python UDTF by directly using PyArrow.

### Why are the changes needed?

Currently, there is a lot of overhead in the arrow serializer for Python UDTFs. The overhead is largely from converting arrow batches into pandas series and converting UDTF's results back to a pandas dataframe.

We should try directly converting Python object into arrow and vice versa to avoid the expensive pandas conversion.

### Does this PR introduce _any_ user-facing change?

Yes. Previously the conversion was

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#50099 from HyukjinKwon/SPARK-44856.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
…rk Connect compatibility test

### What changes were proposed in this pull request?

This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now.

### Why are the changes needed?

After apache#50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629

In fact, UDTF with Arrow is still under development so we can skip the tests for now

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Will monitor the build.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#50856 from HyukjinKwon/SPARK-44856-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…rk Connect compatibility test

This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now.

After apache#50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629

In fact, UDTF with Arrow is still under development so we can skip the tests for now

No, test-only.

Will monitor the build.

No.

Closes apache#50856 from HyukjinKwon/SPARK-44856-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9bb0409)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants