Skip to content

Commit 8e6f03e

Browse files
authored
Fix Python 3.12+ fork warning in async connection tests (apache#56019)
1 parent 8168358 commit 8e6f03e

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed

task-sdk/tests/conftest.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,48 @@ def _disable_ol_plugin():
175175
airflow.plugins_manager.plugins = None
176176

177177

178+
@pytest.fixture(autouse=True)
179+
def _cleanup_async_resources(request):
180+
"""
181+
Clean up async resources that can cause Python 3.12 fork warnings.
182+
183+
Problem: asgiref.sync.sync_to_async (used in _async_get_connection) creates
184+
ThreadPoolExecutors that persist between tests. When supervisor.py calls
185+
os.fork() in subsequent tests, Python 3.12+ warns about forking a
186+
multi-threaded process.
187+
188+
Solution: Clean up asgiref's ThreadPoolExecutors after async tests to ensure
189+
subsequent tests start with a clean thread environment.
190+
"""
191+
yield
192+
193+
# Only clean up after async tests to avoid unnecessary overhead
194+
if "asyncio" in request.keywords:
195+
# Clean up asgiref ThreadPoolExecutors that persist between tests
196+
# These are created by sync_to_async() calls in async connection retrieval
197+
try:
198+
from asgiref.sync import SyncToAsync
199+
200+
# SyncToAsync maintains a class-level executor for performance
201+
# We need to shut it down to prevent multi-threading warnings on fork()
202+
if hasattr(SyncToAsync, "single_thread_executor") and SyncToAsync.single_thread_executor:
203+
if not SyncToAsync.single_thread_executor._shutdown:
204+
SyncToAsync.single_thread_executor.shutdown(wait=True)
205+
SyncToAsync.single_thread_executor = None
206+
207+
# SyncToAsync also maintains a WeakKeyDictionary of context-specific executors
208+
# Clean these up too to ensure complete thread cleanup
209+
if hasattr(SyncToAsync, "context_to_thread_executor"):
210+
for executor in list(SyncToAsync.context_to_thread_executor.values()):
211+
if hasattr(executor, "shutdown") and not getattr(executor, "_shutdown", True):
212+
executor.shutdown(wait=True)
213+
SyncToAsync.context_to_thread_executor.clear()
214+
215+
except (ImportError, AttributeError):
216+
# If asgiref structure changes, fail gracefully
217+
pass
218+
219+
178220
class MakeTIContextCallable(Protocol):
179221
def __call__(
180222
self,

0 commit comments

Comments
 (0)