Skip to content

Commit dcac3f5

Browse files
committed
[SPARK-52076][CONNECT] Explicitly closes ExecutePlanResponseReattachableIterator after usage
### What changes were proposed in this pull request? This PR proposes to explicitly close `ExecutePlanResponseReattachableIterator` after usage. ### Why are the changes needed? There might be a corner case deadlock as below. The main reason is that the point of calling `__del__` can be arbitrary and it could depend on each other. ``` Dumping Threads.... File "/.../versions/3.9.21/lib/python3.9/threading.py", line 937, in _bootstrap self._bootstrap_inner() File "/.../versions/3.9.21/lib/python3.9/threading.py", line 980, in _bootstrap_inner self.run() File "/.../versions/3.9.21/lib/python3.9/threading.py", line 917, in run self._target(*self._args, **self._kwargs) File "/.../versions/3.9.21/lib/python3.9/concurrent/futures/thread.py", line 85, in _worker del work_item File "/.../python/pyspark/sql/connect/client/reattach.py", line 347, in __del__ return self.close() File "/.../python/pyspark/sql/connect/client/reattach.py", line 343, in close self._release_all() File "/.../python/pyspark/sql/connect/client/reattach.py", line 241, in _release_all with self._lock: --------------- File "/.../versions/3.9.21/lib/python3.9/threading.py", line 937, in _bootstrap self._bootstrap_inner() File "/.../versions/3.9.21/lib/python3.9/threading.py", line 980, in _bootstrap_inner self.run() File "/.../versions/3.9.21/lib/python3.9/threading.py", line 917, in run self._target(*self._args, **self._kwargs) File "/.../versions/pyspark-dev-3.9/lib/python3.9/site-packages/grpc/_channel.py", line 1751, in channel_spin event = state.channel.next_call_event() --------------- File "<string>", line 44, in <module> File "/.../python/pyspark/sql/connect/session.py", line 890, in stop self.client.close() File "/.../python/pyspark/sql/connect/client/core.py", line 1234, in close ExecutePlanResponseReattachableIterator.shutdown() File "/.../python/pyspark/sql/connect/client/reattach.py", line 82, in shutdown cls._get_or_create_release_thread_pool().shutdown() File "/.../versions/3.9.21/lib/python3.9/concurrent/futures/thread.py", line 235, in shutdown t.join() File "/.../versions/3.9.21/lib/python3.9/threading.py", line 1060, in join self._wait_for_tstate_lock() File "/.../versions/3.9.21/lib/python3.9/threading.py", line 1080, in _wait_for_tstate_lock if lock.acquire(block, timeout): File "<string>", line 1, in <module> File "<string>", line 1, in <module> ``` ### Does this PR introduce _any_ user-facing change? For corner cases, yes. There might be a deadlock, and this PR fixes it. ### How was this patch tested? Difficult to test. It was more to just remove the cause itself. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50855 from HyukjinKwon/explicitly-close-generator. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent b50690a commit dcac3f5

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

python/pyspark/sql/connect/client/core.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,8 +1374,11 @@ def handle_response(b: pb2.ExecutePlanResponse) -> None:
13741374
generator = ExecutePlanResponseReattachableIterator(
13751375
req, self._stub, self._retrying, self._builder.metadata()
13761376
)
1377-
for b in generator:
1378-
handle_response(b)
1377+
try:
1378+
for b in generator:
1379+
handle_response(b)
1380+
finally:
1381+
generator.close()
13791382
else:
13801383
for attempt in self._retrying():
13811384
with attempt:
@@ -1531,8 +1534,11 @@ def handle_response(
15311534
generator = ExecutePlanResponseReattachableIterator(
15321535
req, self._stub, self._retrying, self._builder.metadata()
15331536
)
1534-
for b in generator:
1535-
yield from handle_response(b)
1537+
try:
1538+
for b in generator:
1539+
yield from handle_response(b)
1540+
finally:
1541+
generator.close()
15361542
else:
15371543
for attempt in self._retrying():
15381544
with attempt:

python/pyspark/sql/connect/client/reattach.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,3 @@ def throw(self, type: Any = None, value: Any = None, traceback: Any = None) -> A
342342
def close(self) -> None:
343343
self._release_all()
344344
return super().close()
345-
346-
def __del__(self) -> None:
347-
try:
348-
return self.close()
349-
except Exception:
350-
pass

0 commit comments

Comments
 (0)