From 0ffdc9cb94f13f6c7ae26da26bb97aacee89d655 Mon Sep 17 00:00:00 2001 From: Daniel Hahler Date: Mon, 6 May 2019 04:18:09 +0200 Subject: [PATCH 1/4] SyncToAsync: use executor attribute This allows for overriding it easily in derived classes, and it seems to be better than changing the loop's default executor (in the case of ASGI_THREADS being set). Ref: https://github.com/django/channels/issues/1091 --- asgiref/sync.py | 23 +++++++++++++++-------- tests/test_sync.py | 21 +++++++++------------ 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/asgiref/sync.py b/asgiref/sync.py index 53f1900b..0b018249 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -179,13 +179,7 @@ class SyncToAsync: with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, rather than just blocking. """ - - # If they've set ASGI_THREADS, update the default asyncio executor for now - if "ASGI_THREADS" in os.environ: - loop = asyncio.get_event_loop() - loop.set_default_executor( - ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"])) - ) + executor = None # Maps launched threads to the coroutines that spawned them launch_map = {} @@ -205,6 +199,19 @@ def __init__(self, func, thread_sensitive=False): except AttributeError: pass + if self.__class__.executor is None: + if "ASGI_THREADS" in os.environ: + executor_kwargs = { + "max_workers": int(os.environ["ASGI_THREADS"]), + } + else: + executor_kwargs = {} + + if sys.version_info >= (3, 6): + executor_kwargs["thread_name_prefix"] = "sync_to_async" + + self.__class__.executor = ThreadPoolExecutor(**executor_kwargs) + async def __call__(self, *args, **kwargs): loop = asyncio.get_event_loop() @@ -217,7 +224,7 @@ async def __call__(self, *args, **kwargs): # Otherwise, we run it in a fixed single thread executor = self.single_thread_executor else: - executor = None # Use default + executor = self.__class__.executor # Use default if contextvars is not None: context = contextvars.copy_context() diff --git a/tests/test_sync.py b/tests/test_sync.py index 5a6e3547..83ec8026 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -10,7 +10,7 @@ @pytest.mark.asyncio -async def test_sync_to_async(): +async def test_sync_to_async(monkeypatch): """ Tests we can call sync functions from an async thread (even if the number of thread workers is less than the number of calls) @@ -29,18 +29,15 @@ def sync_function(): end = time.monotonic() assert result == 42 assert end - start >= 1 + # Set workers to 1, call it twice and make sure that works right - loop = asyncio.get_event_loop() - old_executor = loop._default_executor - loop.set_default_executor(ThreadPoolExecutor(max_workers=1)) - try: - start = time.monotonic() - await asyncio.wait([async_function(), async_function()]) - end = time.monotonic() - # It should take at least 2 seconds as there's only one worker. - assert end - start >= 2 - finally: - loop.set_default_executor(old_executor) + monkeypatch.setattr(sync_to_async, "executor", ThreadPoolExecutor(max_workers=1)) + async_function = sync_to_async(sync_function) + start = time.monotonic() + await asyncio.wait([async_function(), async_function()]) + end = time.monotonic() + # It should take at least 2 seconds as there's only one worker. + assert end - start >= 2 @pytest.mark.asyncio From b43856f7f29f4999de6a04937aed8cdcf218e279 Mon Sep 17 00:00:00 2001 From: Daniel Hahler Date: Wed, 11 Sep 2019 13:55:36 +0200 Subject: [PATCH 2/4] lint --- asgiref/sync.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/asgiref/sync.py b/asgiref/sync.py index 0b018249..9a43c4a4 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -179,6 +179,7 @@ class SyncToAsync: with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, rather than just blocking. """ + executor = None # Maps launched threads to the coroutines that spawned them @@ -200,16 +201,11 @@ def __init__(self, func, thread_sensitive=False): pass if self.__class__.executor is None: + executor_kwargs = {} if "ASGI_THREADS" in os.environ: - executor_kwargs = { - "max_workers": int(os.environ["ASGI_THREADS"]), - } - else: - executor_kwargs = {} - + executor_kwargs["max_workers"] = int(os.environ["ASGI_THREADS"]) if sys.version_info >= (3, 6): executor_kwargs["thread_name_prefix"] = "sync_to_async" - self.__class__.executor = ThreadPoolExecutor(**executor_kwargs) async def __call__(self, *args, **kwargs): From 8723c0fccc9f2a07e22fc6f06d75b1e34935359d Mon Sep 17 00:00:00 2001 From: Daniel Hahler Date: Wed, 11 Sep 2019 14:04:10 +0200 Subject: [PATCH 3/4] test_sync_to_async_ASGI_THREADS --- tests/test_sync.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/test_sync.py b/tests/test_sync.py index 83ec8026..40d05b55 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -40,6 +40,29 @@ def sync_function(): assert end - start >= 2 +@pytest.mark.asyncio +async def test_sync_to_async_ASGI_THREADS(monkeypatch): + def sync_function(): + time.sleep(0.5) + return 42 + + # Set workers to 1 via env, call it twice and make sure that works right + monkeypatch.setenv("ASGI_THREADS", 1) + monkeypatch.setattr(sync_to_async, "executor", None) + + async_function = sync_to_async(sync_function) + assert async_function.executor._max_workers == 1 + start = time.monotonic() + await asyncio.wait([async_function(), async_function()]) + end = time.monotonic() + # It should take at least 1 second as there's only one worker. + assert end - start >= 1 + + monkeypatch.setenv("ASGI_THREADS", 99) + async_function = sync_to_async(sync_function) + assert async_function.executor._max_workers == 1 + + @pytest.mark.asyncio async def test_sync_to_async_decorator(): """ From 2e57f40362744e94e4d9b96f0ef475e3a4d81ee9 Mon Sep 17 00:00:00 2001 From: Daniel Hahler Date: Wed, 11 Sep 2019 14:07:57 +0200 Subject: [PATCH 4/4] improve test --- tests/test_sync.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test_sync.py b/tests/test_sync.py index 40d05b55..e1f665ba 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -58,9 +58,15 @@ def sync_function(): # It should take at least 1 second as there's only one worker. assert end - start >= 1 + # Uses existing executor instance on class. monkeypatch.setenv("ASGI_THREADS", 99) + orig_executor = async_function.executor async_function = sync_to_async(sync_function) - assert async_function.executor._max_workers == 1 + assert async_function.executor is orig_executor + + monkeypatch.setattr(sync_to_async, "executor", None) + async_function = sync_to_async(sync_function) + assert async_function.executor is not orig_executor @pytest.mark.asyncio