Skip to content

Commit 4b77986

Browse files
committed
[SPARK-54868][PYTHON][INFRA] Fail hanging tests and log the tracebacks
### What changes were proposed in this pull request? Fail hanging tests and log the tracebacks The timeout is set by env `PYSPARK_TEST_TIMEOUT` ### Why are the changes needed? when a test gets stuck, there is no useful information ### Does this PR introduce _any_ user-facing change? no, dev-only ### How was this patch tested? 1, PR builder with ``` PYSPARK_TEST_TIMEOUT: 100 ``` https://github.com/zhengruifeng/spark/actions/runs/20522703690/job/58962106131 2, manually check ``` (spark_dev_313) ➜ spark git:(py_test_timeout) PYSPARK_TEST_TIMEOUT=15 python/run-tests -k --python-executables python3 --testnames 'pyspark.ml.tests.connect.test_parity_clustering' Running PySpark tests. Output is in /Users/ruifeng.zheng/spark/python/unit-tests.log Will test against the following Python executables: ['python3'] Will test the following Python tests: ['pyspark.ml.tests.connect.test_parity_clustering'] python3 python_implementation is CPython python3 version is: Python 3.13.5 Starting test(python3): pyspark.ml.tests.connect.test_parity_clustering (temp output: /Users/ruifeng.zheng/spark/python/target/c014880c-80d2-49db-8fb1-a26ab4e5246d/python3__pyspark.ml.tests.connect.test_parity_clustering__u8n7t6zc.log) Got TimeoutExpired while running pyspark.ml.tests.connect.test_parity_clustering with python3 Traceback (most recent call last): File "/Users/ruifeng.zheng/spark/./python/run-tests.py", line 157, in run_individual_python_test retcode = proc.wait(timeout=timeout) File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/subprocess.py", line 1280, in wait return self._wait(timeout=timeout) ~~~~~~~~~~^^^^^^^^^^^^^^^^^ File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/subprocess.py", line 2058, in _wait raise TimeoutExpired(self.args, timeout) subprocess.TimeoutExpired: Command '['/Users/ruifeng.zheng/spark/bin/pyspark', 'pyspark.ml.tests.connect.test_parity_clustering']' timed out after 15 seconds Running tests... ---------------------------------------------------------------------- WARNING: Using incubator modules: jdk.incubator.vector Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). /Users/ruifeng.zheng/spark/python/pyspark/sql/connect/conf.py:64: UserWarning: Failed to set spark.connect.execute.reattachable.senderMaxStreamDuration to Some(1s) due to [CANNOT_MODIFY_STATIC_CONFIG] Cannot modify the value of the static Spark config: "spark.connect.execute.reattachable.senderMaxStreamDuration". SQLSTATE: 46110 warnings.warn(warn) /Users/ruifeng.zheng/spark/python/pyspark/sql/connect/conf.py:64: UserWarning: Failed to set spark.connect.execute.reattachable.senderMaxStreamSize to Some(123) due to [CANNOT_MODIFY_STATIC_CONFIG] Cannot modify the value of the static Spark config: "spark.connect.execute.reattachable.senderMaxStreamSize". SQLSTATE: 46110 warnings.warn(warn) /Users/ruifeng.zheng/spark/python/pyspark/sql/connect/conf.py:64: UserWarning: Failed to set spark.connect.authenticate.token to Some(deadbeef) due to [CANNOT_MODIFY_STATIC_CONFIG] Cannot modify the value of the static Spark config: "spark.connect.authenticate.token". SQLSTATE: 46110 warnings.warn(warn) test_assert_remote_mode (pyspark.ml.tests.connect.test_parity_clustering.ClusteringParityTests.test_assert_remote_mode) ... ok (0.450s) /Users/ruifeng.zheng/spark/python/pyspark/ml/clustering.py:1016: FutureWarning: Deprecated in 3.0.0. It will be removed in future versions. Use ClusteringEvaluator instead. You can also get the cost on the training dataset in the summary. warnings.warn( ok (6.541s) test_distributed_lda (pyspark.ml.tests.connect.test_parity_clustering.ClusteringParityTests.test_distributed_lda) ... Thread 0x0000000173083000 (most recent call first): File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/grpc/_channel.py", line 1727 in channel_spin File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 994 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1043 in _bootstrap_inner File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1014 in _bootstrap Thread 0x000000017509b000 (most recent call first): File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/concurrent/futures/thread.py", line 90 in _worker File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 994 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1043 in _bootstrap_inner File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1014 in _bootstrap Thread 0x000000017408f000 (most recent call first): File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/concurrent/futures/thread.py", line 90 in _worker File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 994 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1043 in _bootstrap_inner File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1014 in _bootstrap Thread 0x00000001719e7000 (most recent call first): File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/selectors.py", line 398 in select File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/socketserver.py", line 235 in serve_forever File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 994 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1043 in _bootstrap_inner File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1014 in _bootstrap Thread 0x00000001709db000 (most recent call first): File "/Users/ruifeng.zheng/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 58 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1043 in _bootstrap_inner File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 1014 in _bootstrap Current thread 0x00000001f372e200 (most recent call first): File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/threading.py", line 363 in wait File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/grpc/_common.py", line 114 in _wait_once File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/grpc/_common.py", line 154 in wait File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/grpc/_channel.py", line 953 in _next File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/grpc/_channel.py", line 538 in __next__ File "/Users/ruifeng.zheng/spark/python/pyspark/sql/connect/client/reattach.py", line 164 in <lambda> File "/Users/ruifeng.zheng/spark/python/pyspark/sql/connect/client/reattach.py", line 266 in _call_iter File "/Users/ruifeng.zheng/spark/python/pyspark/sql/connect/client/reattach.py", line 163 in _has_next File "/Users/ruifeng.zheng/spark/python/pyspark/sql/connect/client/reattach.py", line 139 in send File "<frozen _collections_abc>", line 360 in __next__ File "/Users/ruifeng.zheng/spark/python/pyspark/sql/connect/client/core.py", line 1625 in _execute_and_fetch_as_iterator File "/Users/ruifeng.zheng/spark/python/pyspark/sql/connect/client/core.py", line 1664 in _execute_and_fetch File "/Users/ruifeng.zheng/spark/python/pyspark/sql/connect/client/core.py", line 1162 in execute_command File "/Users/ruifeng.zheng/spark/python/pyspark/ml/util.py", line 308 in remote_call File "/Users/ruifeng.zheng/spark/python/pyspark/ml/util.py", line 322 in wrapped File "/Users/ruifeng.zheng/spark/python/pyspark/ml/clustering.py", line 1548 in toLocal File "/Users/ruifeng.zheng/spark/python/pyspark/ml/tests/test_clustering.py", line 449 in test_distributed_lda File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/case.py", line 606 in _callTestMethod File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/case.py", line 651 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/case.py", line 707 in __call__ File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/suite.py", line 122 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/suite.py", line 84 in __call__ File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/suite.py", line 122 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/suite.py", line 84 in __call__ File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/xmlrunner/runner.py", line 67 in run File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/main.py", line 270 in runTests File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/unittest/main.py", line 104 in __init__ File "/Users/ruifeng.zheng/spark/python/pyspark/testing/__init__.py", line 30 in unittest_main File "/Users/ruifeng.zheng/spark/python/pyspark/ml/tests/connect/test_parity_clustering.py", line 37 in <module> File "<frozen runpy>", line 88 in _run_code File "<frozen runpy>", line 198 in _run_module_as_main Had test failures in pyspark.ml.tests.connect.test_parity_clustering with python3; see logs. ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes #53528 from zhengruifeng/py_test_timeout. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 1bc5b29 commit 4b77986

File tree

3 files changed

+33
-5
lines changed

3 files changed

+33
-5
lines changed

.github/workflows/build_and_test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ jobs:
566566
SKIP_PACKAGING: true
567567
METASPACE_SIZE: 1g
568568
BRANCH: ${{ inputs.branch }}
569+
PYSPARK_TEST_TIMEOUT: 300
569570
steps:
570571
- name: Checkout Spark repository
571572
uses: actions/checkout@v4

python/pyspark/testing/connectutils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
import shutil
1818
import tempfile
1919
import os
20+
import sys
21+
import signal
22+
import faulthandler
2023
import functools
2124
import unittest
2225
import uuid
@@ -177,6 +180,9 @@ def master(cls):
177180

178181
@classmethod
179182
def setUpClass(cls):
183+
if os.environ.get("PYSPARK_TEST_TIMEOUT"):
184+
faulthandler.register(signal.SIGTERM, file=sys.__stderr__, all_threads=True)
185+
180186
# This environment variable is for interrupting hanging ML-handler and making the
181187
# tests fail fast.
182188
os.environ["SPARK_CONNECT_ML_HANDLER_INTERRUPTION_TIMEOUT_MINUTES"] = "5"
@@ -197,6 +203,9 @@ def setUpClass(cls):
197203

198204
@classmethod
199205
def tearDownClass(cls):
206+
if os.environ.get("PYSPARK_TEST_TIMEOUT"):
207+
faulthandler.unregister(signal.SIGTERM)
208+
200209
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
201210
cls.spark.stop()
202211

python/run-tests.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,27 +234,45 @@ def run_individual_python_test(target_dir, test_name, pyspark_python, keep_test_
234234

235235
env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)
236236

237+
timeout = os.environ.get("PYSPARK_TEST_TIMEOUT")
238+
if timeout is not None:
239+
env["PYSPARK_TEST_TIMEOUT"] = timeout
240+
timeout = int(timeout)
241+
237242
output_prefix = get_valid_filename(pyspark_python + "__" + test_name + "__").lstrip("_")
238243
# Delete is always set to False since the cleanup will be either done by removing the
239244
# whole test dir, or the test output is retained.
240245
per_test_output = tempfile.NamedTemporaryFile(prefix=output_prefix, dir=tmp_dir,
241246
suffix=".log", delete=False)
242247
LOGGER.info(
243248
"Starting test(%s): %s (temp output: %s)", pyspark_python, test_name, per_test_output.name)
249+
cmd = [os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split()
244250
start_time = time.time()
251+
252+
retcode = None
253+
proc = None
245254
try:
246-
retcode = TestRunner(
247-
[os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split(),
248-
env,
249-
per_test_output
250-
).run()
255+
if timeout:
256+
proc = subprocess.Popen(cmd, stderr=per_test_output, stdout=per_test_output, env=env)
257+
retcode = proc.wait(timeout=timeout)
258+
else:
259+
retcode = TestRunner(cmd, env, per_test_output).run()
251260
if not keep_test_output:
252261
# There exists a race condition in Python and it causes flakiness in MacOS
253262
# https://github.com/python/cpython/issues/73885
254263
if platform.system() == "Darwin":
255264
os.system("rm -rf " + tmp_dir)
256265
else:
257266
shutil.rmtree(tmp_dir, ignore_errors=True)
267+
except subprocess.TimeoutExpired:
268+
if timeout and proc:
269+
LOGGER.exception(
270+
"Got TimeoutExpired while running %s with %s", test_name, pyspark_python
271+
)
272+
proc.terminate()
273+
proc.communicate(timeout=60)
274+
else:
275+
raise
258276
except BaseException:
259277
LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python)
260278
# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if

0 commit comments

Comments
 (0)