Skip to content

Commit 3080e61

Browse files
[SPARK-53387][PYTHON] Add support for Arrow UDTFs with PARTITION BY
### What changes were proposed in this pull request? This PR adds support for Arrow UDTFs with PARTITION BY functionality. It adds a new `ArrowUDTFWithPartition` wrapper in worker.py that handle partition by semantics. It handles partitioning logic specifically for Arrow UDTFs that receive PyArrow RecordBatch objects (table argument) and efficiently detects partition boundaries. ### Why are the changes needed? To support PARTITION BY for Arrow Python UDTFs. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests in test_arrow_udtfs ### Was this patch authored or co-authored using generative AI tooling? Yes Closes #52317 from allisonwang-db/spark-53387-partition-by-udtf. Authored-by: Allison Wang <[email protected]> Signed-off-by: Allison Wang <[email protected]>
1 parent 010d36f commit 3080e61

File tree

3 files changed

+952
-2
lines changed

3 files changed

+952
-2
lines changed

python/pyspark/errors/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,11 @@
384384
"The return type of the arrow-optimized Python UDTF should be of type 'pandas.DataFrame', but the '<func>' method returned a value of type <return_type> with value: <value>."
385385
]
386386
},
387+
"INVALID_ARROW_UDTF_TABLE_ARGUMENT": {
388+
"message": [
389+
"Arrow UDTFs with PARTITION BY must have a TABLE argument that results in a PyArrow RecordBatch, but got <actual_type>."
390+
]
391+
},
387392
"INVALID_ARROW_UDTF_WITH_ANALYZE": {
388393
"message": [
389394
"The arrow UDTF '<name>' is invalid. Arrow UDTFs do not support the 'analyze' method. Please remove the 'analyze' method from '<name>' and specify a returnType instead."

0 commit comments

Comments
 (0)