11# curio/workers.py
22#
33# Functions for performing work outside of curio. This includes
4- # running functions in threads, processes, and executors from the
5- # concurrent.futures module.
4+ # running functions in threads and processes.
65
7- __all__ = ['run_in_thread' , 'run_in_process' , 'block_in_thread' ]
6+ __all__ = ['run_in_thread' , 'run_in_process' ]
87
98# -- Standard Library
109
1312import threading
1413import traceback
1514import signal
16- from collections import Counter , defaultdict
1715
1816# -- Curio
1917
2826# as originating from curio as opposed to multiprocessing.pool).
2927
3028class RemoteTraceback (Exception ):
31-
3229 def __init__ (self , tb ):
3330 self .tb = tb
3431
@@ -37,7 +34,6 @@ def __str__(self):
3734
3835
3936class ExceptionWithTraceback :
40-
4137 def __init__ (self , exc , tb ):
4238 tb = traceback .format_exception (type (exc ), exc , tb )
4339 tb = '' .join (tb )
@@ -72,6 +68,7 @@ async def run_in_thread(callable, *args, call_on_cancel=None):
7268 executed. If it start running, it will run fully to completion
7369 as a kind of zombie.
7470 '''
71+ assert call_on_cancel is None , call_on_cancel
7572 worker = None
7673 try :
7774 worker = await reserve_thread_worker ()
@@ -80,85 +77,6 @@ async def run_in_thread(callable, *args, call_on_cancel=None):
8077 if worker :
8178 await worker .release ()
8279
83- # Support for blocking in threads.
84- #
85- # Discussion:
86- #
87- # The run_in_thread() function can be used to run any synchronous function
88- # in a separate thread. However, certain kinds of operations are
89- # inherently unsafe. For example, consider a worker task that wants
90- # to wait on a threading Event like this:
91- #
92- # evt = threading.Event() # Foreign Event...
93- #
94- # async def worker():
95- # await run_in_thread(evt.wait)
96- # print('Alive!')
97- #
98- # Now suppose Curio spins up a huge number of workers:
99- #
100- # for n in range(1000):
101- # await spawn(worker())
102- #
103- # At this point, you're in a bad situation. The worker tasks have all
104- # called run_in_thread() and are blocked indefinitely. Because the
105- # pool of worker threads is limited, you've exhausted all available
106- # resources. Nobody can now call run_in_thread() without blocking.
107- # There's a pretty good chance that your code is permanently
108- # deadlocked. There are dark clouds.
109- #
110- # This problem can be solved by wrapping run_in_thread() with a
111- # semaphore. Like this:
112- #
113- # _barrier = curio.Semaphore()
114- #
115- # async def worker():
116- # async with _barrier:
117- # await run_in_thread(evt.wait)
118- #
119- # However, to make it much more convenient, we can take care of
120- # a lot of fiddly details. We can cache the requested callable,
121- # build a set of semaphores and synchronize things in the background.
122- # That's what the block_in_thread() function is doing. For example:
123- #
124- # async def worker():
125- # await block_in_thread(evt.wait)
126- # print('Alive!')
127- #
128- # Unlike run_in_thread(), spawning up 1000 workers creates a
129- # situation where only 1 worker is actually blocked in a thread.
130- # The other 999 workers are blocked on a semaphore waiting for service.
131-
132- _pending = Counter ()
133- _barrier = defaultdict (sync .Semaphore )
134-
135- async def block_in_thread (callable , * args , call_on_cancel = None ):
136- '''
137- Run callable(*args) in a thread with the expectation that the
138- operation is going to block for an indeterminate amount of time.
139- Guarantees that at most only one background thread is used
140- regardless of how many curio tasks are actually waiting on the
141- same callable (e.g., if 1000 Curio tasks all decide to call
142- block_on_thread on the same callable, they'll all be handled by a
143- single thread). Primary use of this function is on foreign locks,
144- queues, and other synchronization primitives where you have to use
145- a thread, but you just don't have any idea when the operation will
146- complete.
147- '''
148- if hasattr (callable , '__self__' ):
149- call_key = (callable .__name__ , id (callable .__self__ ))
150- else :
151- call_key = id (callable )
152- _pending [call_key ] += 1
153- async with _barrier [call_key ]:
154- try :
155- return await run_in_thread (callable , * args , call_on_cancel = call_on_cancel )
156- finally :
157- _pending [call_key ] -= 1
158- if not _pending [call_key ]:
159- del _pending [call_key ]
160- del _barrier [call_key ]
161-
16280
16381MAX_WORKER_PROCESSES = multiprocessing .cpu_count ()
16482
@@ -436,7 +354,6 @@ async def apply(self, func, args=()):
436354# control over the whole process of how workers get managed.
437355
438356class WorkerPool (object ):
439-
440357 def __init__ (self , workercls , nworkers ):
441358 self .nworkers = sync .Semaphore (nworkers )
442359 self .workercls = workercls
@@ -459,7 +376,3 @@ async def release(self, worker):
459376 self .workers .append (worker )
460377 await self .nworkers .release ()
461378
462-
463- # Pool definitions should anyone want to use them directly
464- ProcessPool = lambda nworkers : WorkerPool (ProcessWorker , nworkers )
465- ThreadPool = lambda nworkers : WorkerPool (ThreadWorker , nworkers )
0 commit comments