Skip to content

Commit 1aa4b15

Browse files
gaogaotiantianHyukjinKwon
authored andcommitted
[SPARK-54763][TEST] Accelerate test_udf_return_types with multi-threading
### What changes were proposed in this pull request? Use multi-threading to accelerate `test_udf_return_types`. Locally it has more than 2x speed up (113s -> 50s). ### Why are the changes needed? `test_udf_return_types` is one of the slowest test we have. It took 300s on normal CI and even slower on coverage. This simple and straightforward fix can save us >50% of the time spent on this test. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Local speed up. ### Was this patch authored or co-authored using generative AI tooling? No Closes #53533 from gaogaotiantian/udfreturntype-accelerate. Authored-by: Tian Gao <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 80b4486 commit 1aa4b15

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

python/pyspark/sql/tests/udf_type_tests/test_udf_return_types.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import array
19+
import concurrent.futures
1920
import datetime
2021
import os
2122
import platform
@@ -217,9 +218,7 @@ def _compare_or_create_golden_file(self, actual_output, golden_file, test_name):
217218
self.fail(f"Golden file created for {test_name}. Please review and re-run the test.")
218219

219220
def _generate_udf_return_type_coercion_results(self, use_arrow):
220-
results = []
221-
222-
for spark_type in self.test_types:
221+
def work(spark_type):
223222
result = [spark_type.simpleString()]
224223
for value in self.test_data:
225224
try:
@@ -233,9 +232,10 @@ def _generate_udf_return_type_coercion_results(self, use_arrow):
233232
except Exception:
234233
result_value = "X"
235234
result.append(result_value)
236-
results.append(result)
235+
return result
237236

238-
return results
237+
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
238+
return list(executor.map(work, self.test_types))
239239

240240
def test_pandas_udf_return_type_coercion(self):
241241
golden_file = os.path.join(
@@ -252,9 +252,7 @@ def test_pandas_udf_return_type_coercion(self):
252252
self._compare_or_create_golden_file(actual_output, golden_file, test_name)
253253

254254
def _generate_pandas_udf_type_coercion_results(self):
255-
results = []
256-
257-
for spark_type in self.test_types:
255+
def work(spark_type):
258256
result = [spark_type.simpleString()]
259257
for value in self.pandas_test_data:
260258
try:
@@ -276,9 +274,10 @@ def pandas_udf_func(series: pd.Series) -> pd.Series:
276274
except Exception:
277275
ret_str = "X"
278276
result.append(ret_str)
279-
results.append(result)
277+
return result
280278

281-
return results
279+
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
280+
return list(executor.map(work, self.test_types))
282281

283282

284283
if __name__ == "__main__":

0 commit comments

Comments
 (0)