Skip to content

Commit 8095c39

Browse files
committed
[SPARK-54869][PYTHON][TESTS] Apply the standard import of pyarrow compute in test_arrow_udf_scalar
### What changes were proposed in this pull request? Apply the standard import of pyarrow compute in `test_arrow_udf_scalar` ``` import pyarrow.compute as pc ``` ### Why are the changes needed? There seems to be a bug in cloudpickle when dealing with nested lambdas + submodules like `pa.compute` see #53607, it fixed a failure in Python-Only MacOS26. before: https://github.com/apache/spark/actions/runs/20495264978/job/58904792835 after: https://github.com/apache/spark/actions/runs/20560816127/job/59051168728 This PR applies such change in more tests. ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #53640 from zhengruifeng/import_pc. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 678f314 commit 8095c39

File tree

1 file changed

+38
-31
lines changed

1 file changed

+38
-31
lines changed

python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ def random_udf(it):
8484
return random_udf.asNondeterministic()
8585

8686
def test_arrow_udf_tokenize(self):
87-
import pyarrow as pa
87+
import pyarrow.compute as pc
8888

8989
df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
9090

9191
tokenize = arrow_udf(
92-
lambda s: pa.compute.ascii_split_whitespace(s),
92+
lambda s: pc.ascii_split_whitespace(s),
9393
ArrayType(StringType()),
9494
)
9595

@@ -99,11 +99,12 @@ def test_arrow_udf_tokenize(self):
9999

100100
def test_arrow_udf_output_nested_arrays(self):
101101
import pyarrow as pa
102+
import pyarrow.compute as pc
102103

103104
df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
104105

105106
tokenize = arrow_udf(
106-
lambda s: pa.array([[v] for v in pa.compute.ascii_split_whitespace(s).to_pylist()]),
107+
lambda s: pa.array([[v] for v in pc.ascii_split_whitespace(s).to_pylist()]),
107108
ArrayType(ArrayType(StringType())),
108109
)
109110

@@ -499,14 +500,15 @@ def build_time(h, mi, s):
499500

500501
def test_arrow_udf_input_variant(self):
501502
import pyarrow as pa
503+
import pyarrow.compute as pc
502504

503505
@arrow_udf("int")
504506
def scalar_f(v: pa.Array) -> pa.Array:
505507
assert isinstance(v, pa.Array)
506508
assert isinstance(v, pa.StructArray)
507509
assert isinstance(v.field("metadata"), pa.BinaryArray)
508510
assert isinstance(v.field("value"), pa.BinaryArray)
509-
return pa.compute.binary_length(v.field("value"))
511+
return pc.binary_length(v.field("value"))
510512

511513
@arrow_udf("int")
512514
def iter_f(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
@@ -515,7 +517,7 @@ def iter_f(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
515517
assert isinstance(v, pa.StructArray)
516518
assert isinstance(v.field("metadata"), pa.BinaryArray)
517519
assert isinstance(v.field("value"), pa.BinaryArray)
518-
yield pa.compute.binary_length(v.field("value"))
520+
yield pc.binary_length(v.field("value"))
519521

520522
df = self.spark.range(0, 10).selectExpr("parse_json(cast(id as string)) v")
521523
expected = [Row(l=2) for i in range(10)]
@@ -703,10 +705,9 @@ def f(x):
703705

704706
def test_udf_register_arrow_udf_basic(self):
705707
import pyarrow as pa
708+
import pyarrow.compute as pc
706709

707-
scalar_original_add = arrow_udf(
708-
lambda x, y: pa.compute.add(x, y).cast(pa.int32()), IntegerType()
709-
)
710+
scalar_original_add = arrow_udf(lambda x, y: pc.add(x, y).cast(pa.int32()), IntegerType())
710711
self.assertEqual(scalar_original_add.evalType, PythonEvalType.SQL_SCALAR_ARROW_UDF)
711712
self.assertEqual(scalar_original_add.deterministic, True)
712713

@@ -730,7 +731,7 @@ def test_udf_register_arrow_udf_basic(self):
730731
@arrow_udf(LongType())
731732
def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) -> Iterator[pa.Array]:
732733
for a, b in it:
733-
yield pa.compute.add(a, b)
734+
yield pc.add(a, b)
734735

735736
with self.temp_func("add1"):
736737
new_add = self.spark.udf.register("add1", scalar_iter_add)
@@ -745,10 +746,9 @@ def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) -> Iterator[pa.Arra
745746

746747
def test_catalog_register_arrow_udf_basic(self):
747748
import pyarrow as pa
749+
import pyarrow.compute as pc
748750

749-
scalar_original_add = arrow_udf(
750-
lambda x, y: pa.compute.add(x, y).cast(pa.int32()), IntegerType()
751-
)
751+
scalar_original_add = arrow_udf(lambda x, y: pc.add(x, y).cast(pa.int32()), IntegerType())
752752
self.assertEqual(scalar_original_add.evalType, PythonEvalType.SQL_SCALAR_ARROW_UDF)
753753
self.assertEqual(scalar_original_add.deterministic, True)
754754

@@ -772,7 +772,7 @@ def test_catalog_register_arrow_udf_basic(self):
772772
@arrow_udf(LongType())
773773
def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) -> Iterator[pa.Array]:
774774
for a, b in it:
775-
yield pa.compute.add(a, b)
775+
yield pc.add(a, b)
776776

777777
with self.temp_func("add1"):
778778
new_add = self.spark.catalog.registerFunction("add1", scalar_iter_add)
@@ -786,10 +786,10 @@ def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) -> Iterator[pa.Arra
786786
self.assertEqual(expected.collect(), res4.collect())
787787

788788
def test_udf_register_nondeterministic_arrow_udf(self):
789-
import pyarrow as pa
789+
import pyarrow.compute as pc
790790

791791
random_arrow_udf = arrow_udf(
792-
lambda x: pa.compute.add(x, random.randint(6, 6)), LongType()
792+
lambda x: pc.add(x, random.randint(6, 6)), LongType()
793793
).asNondeterministic()
794794
self.assertEqual(random_arrow_udf.deterministic, False)
795795
self.assertEqual(random_arrow_udf.evalType, PythonEvalType.SQL_SCALAR_ARROW_UDF)
@@ -805,10 +805,10 @@ def test_udf_register_nondeterministic_arrow_udf(self):
805805
self.assertEqual(row[0], 7)
806806

807807
def test_catalog_register_nondeterministic_arrow_udf(self):
808-
import pyarrow as pa
808+
import pyarrow.compute as pc
809809

810810
random_arrow_udf = arrow_udf(
811-
lambda x: pa.compute.add(x, random.randint(6, 6)), LongType()
811+
lambda x: pc.add(x, random.randint(6, 6)), LongType()
812812
).asNondeterministic()
813813
self.assertEqual(random_arrow_udf.deterministic, False)
814814
self.assertEqual(random_arrow_udf.evalType, PythonEvalType.SQL_SCALAR_ARROW_UDF)
@@ -827,17 +827,17 @@ def test_catalog_register_nondeterministic_arrow_udf(self):
827827

828828
@unittest.skipIf(not have_numpy, numpy_requirement_message)
829829
def test_nondeterministic_arrow_udf(self):
830-
import pyarrow as pa
830+
import pyarrow.compute as pc
831831

832832
# Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations
833833
@arrow_udf("double")
834834
def scalar_plus_ten(v):
835-
return pa.compute.add(v, 10)
835+
return pc.add(v, 10)
836836

837837
@arrow_udf("double", ArrowUDFType.SCALAR_ITER)
838838
def iter_plus_ten(it):
839839
for v in it:
840-
yield pa.compute.add(v, 10)
840+
yield pc.add(v, 10)
841841

842842
for plus_ten in [scalar_plus_ten, iter_plus_ten]:
843843
random_udf = self.nondeterministic_arrow_udf
@@ -984,10 +984,11 @@ def iter_f(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
984984

985985
def test_arrow_udf_named_arguments(self):
986986
import pyarrow as pa
987+
import pyarrow.compute as pc
987988

988989
@arrow_udf("int")
989990
def test_udf(a, b):
990-
return pa.compute.add(a, pa.compute.multiply(b, 10)).cast(pa.int32())
991+
return pc.add(a, pc.multiply(b, 10)).cast(pa.int32())
991992

992993
with self.temp_func("test_udf"):
993994
self.spark.udf.register("test_udf", test_udf)
@@ -1008,10 +1009,11 @@ def test_udf(a, b):
10081009

10091010
def test_arrow_udf_named_arguments_negative(self):
10101011
import pyarrow as pa
1012+
import pyarrow.compute as pc
10111013

10121014
@arrow_udf("int")
10131015
def test_udf(a, b):
1014-
return pa.compute.add(a, b).cast(pa.int32())
1016+
return pc.add(a, b).cast(pa.int32())
10151017

10161018
with self.temp_func("test_udf"):
10171019
self.spark.udf.register("test_udf", test_udf)
@@ -1032,10 +1034,11 @@ def test_udf(a, b):
10321034

10331035
def test_arrow_udf_named_arguments_and_defaults(self):
10341036
import pyarrow as pa
1037+
import pyarrow.compute as pc
10351038

10361039
@arrow_udf("int")
10371040
def test_udf(a, b=0):
1038-
return pa.compute.add(a, pa.compute.multiply(b, 10)).cast(pa.int32())
1041+
return pc.add(a, pc.multiply(b, 10)).cast(pa.int32())
10391042

10401043
with self.temp_func("test_udf"):
10411044
self.spark.udf.register("test_udf", test_udf)
@@ -1070,10 +1073,11 @@ def test_udf(a, b=0):
10701073

10711074
def test_arrow_udf_kwargs(self):
10721075
import pyarrow as pa
1076+
import pyarrow.compute as pc
10731077

10741078
@arrow_udf("int")
10751079
def test_udf(a, **kwargs):
1076-
return pa.compute.add(a, pa.compute.multiply(kwargs["b"], 10)).cast(pa.int32())
1080+
return pc.add(a, pc.multiply(kwargs["b"], 10)).cast(pa.int32())
10771081

10781082
with self.temp_func("test_udf"):
10791083
self.spark.udf.register("test_udf", test_udf)
@@ -1092,11 +1096,12 @@ def test_udf(a, **kwargs):
10921096

10931097
def test_arrow_iter_udf_single_column(self):
10941098
import pyarrow as pa
1099+
import pyarrow.compute as pc
10951100

10961101
@arrow_udf(LongType())
10971102
def add_one(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
10981103
for s in it:
1099-
yield pa.compute.add(s, 1)
1104+
yield pc.add(s, 1)
11001105

11011106
df = self.spark.range(10)
11021107
expected = df.select((F.col("id") + 1).alias("res")).collect()
@@ -1106,11 +1111,12 @@ def add_one(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
11061111

11071112
def test_arrow_iter_udf_two_columns(self):
11081113
import pyarrow as pa
1114+
import pyarrow.compute as pc
11091115

11101116
@arrow_udf(LongType())
11111117
def multiple(it: Iterator[Tuple[pa.Array, pa.Array]]) -> Iterator[pa.Array]:
11121118
for a, b in it:
1113-
yield pa.compute.multiply(a, b)
1119+
yield pc.multiply(a, b)
11141120

11151121
df = self.spark.range(10).select(
11161122
F.col("id").alias("a"),
@@ -1124,11 +1130,12 @@ def multiple(it: Iterator[Tuple[pa.Array, pa.Array]]) -> Iterator[pa.Array]:
11241130

11251131
def test_arrow_iter_udf_three_columns(self):
11261132
import pyarrow as pa
1133+
import pyarrow.compute as pc
11271134

11281135
@arrow_udf(LongType())
11291136
def multiple(it: Iterator[Tuple[pa.Array, pa.Array, pa.Array]]) -> Iterator[pa.Array]:
11301137
for a, b, c in it:
1131-
yield pa.compute.multiply(pa.compute.multiply(a, b), c)
1138+
yield pc.multiply(pc.multiply(a, b), c)
11321139

11331140
df = self.spark.range(10).select(
11341141
F.col("id").alias("a"),
@@ -1142,21 +1149,21 @@ def multiple(it: Iterator[Tuple[pa.Array, pa.Array, pa.Array]]) -> Iterator[pa.A
11421149
self.assertEqual(expected, result.collect())
11431150

11441151
def test_return_type_coercion(self):
1145-
import pyarrow as pa
1152+
import pyarrow.compute as pc
11461153

11471154
df = self.spark.range(10)
11481155

1149-
scalar_long = arrow_udf(lambda x: pa.compute.add(x, 1), LongType())
1156+
scalar_long = arrow_udf(lambda x: pc.add(x, 1), LongType())
11501157
result1 = df.select(scalar_long("id").alias("res"))
11511158
self.assertEqual(10, len(result1.collect()))
11521159

11531160
# long -> int coercion
1154-
scalar_int1 = arrow_udf(lambda x: pa.compute.add(x, 1), IntegerType())
1161+
scalar_int1 = arrow_udf(lambda x: pc.add(x, 1), IntegerType())
11551162
result2 = df.select(scalar_int1("id").alias("res"))
11561163
self.assertEqual(10, len(result2.collect()))
11571164

11581165
# long -> int coercion, overflow
1159-
scalar_int2 = arrow_udf(lambda x: pa.compute.add(x, 2147483647), IntegerType())
1166+
scalar_int2 = arrow_udf(lambda x: pc.add(x, 2147483647), IntegerType())
11601167
result3 = df.select(scalar_int2("id").alias("res"))
11611168
with self.assertRaises(Exception):
11621169
# pyarrow.lib.ArrowInvalid:

0 commit comments

Comments
 (0)