Skip to content

Commit 42e6228

Browse files
author
=
committed
First commit
1 parent 9b3ed8d commit 42e6228

File tree

7 files changed

+444
-1
lines changed

7 files changed

+444
-1
lines changed

README.md

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,88 @@
11
# ThreadPoolExecutorPlus
2-
A fully replaceable executor that makes it possible to reuse idle threads and shrink thread list when there's no heavy load.
2+
[![fury](https://badge.fury.io/py/ThreadPoolExecutorPlus.svg)](https://badge.fury.io/py/ThreadPoolExecutorPlus)
3+
[![licence](https://img.shields.io/github/license/GoodManWEN/ThreadPoolExecutorPlus)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/blob/master/LICENSE)
4+
[![pyversions](https://img.shields.io/pypi/pyversions/ThreadPoolExecutorPlus.svg)](https://pypi.org/project/ThreadPoolExecutorPlus/)
5+
[![Publish](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/workflows/Publish/badge.svg)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/actions?query=workflow:Publish)
6+
[![Build](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/workflows/Build/badge.svg)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/actions?query=workflow:Build)
7+
8+
This package provides you a duck typing of concurrent.futures.ThreadPoolExecutor , which has the very similar api and could fully replace ThreadPoolExecutor in your code.
9+
10+
The reason why this pack exists is we would like to solve several specific pain spot in native python library of memory control.
11+
12+
## Feature
13+
- Fully replaceable with concurrent.futures.ThreadPoolExecutor , for example in asyncio.
14+
- Whenever submit a new task , executor will perfer to use existing idle thread rather than create a new one.
15+
- Executor will automatically shrink itself duriung leisure time in order to achieve less memory and higher efficiency.
16+
17+
## Install
18+
19+
pip install ThreadPoolExecutorPlus
20+
21+
## Usage
22+
Same api as concurrent.futures.ThreadPoolExecutor , with some more control function added.
23+
24+
#####set_daemon_opts(min_workers = None, max_workers = None, keep_alive_time = None)
25+
In order to guarantee same api interface , new features should be modfied after object created.
26+
Could change minimum/maximum activate worker num , and set after how many seconds will the idle thread terminated.
27+
By default , min_workers = 4 , max_workers = 256 on windows and 512 on linux , keep_alive_time = 100s.
28+
29+
30+
## Example
31+
32+
Very the same code in official doc [https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example) , with executor replaced:
33+
```Python3
34+
# requests_test.py
35+
import concurrent.futures
36+
import ThreadPoolExecutorPlus
37+
import urllib.request
38+
39+
URLS = ['http://www.foxnews.com/',
40+
'http://www.cnn.com/',
41+
'http://europe.wsj.com/',
42+
'http://www.bbc.co.uk/',
43+
'http://some-made-up-domain.com/']
44+
45+
def load_url(url, timeout):
46+
with urllib.request.urlopen(url, timeout=timeout) as conn:
47+
return conn.read()
48+
49+
with ThreadPoolExecutorPlus.ThreadPoolExecutor(max_workers=5) as executor:
50+
# Try modify deamon options
51+
executor.set_daemon_opts(min_workers = 2 , max_workers = 10 , keep_alive_time = 60)
52+
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
53+
for future in concurrent.futures.as_completed(future_to_url):
54+
url = future_to_url[future]
55+
try:
56+
data = future.result()
57+
except Exception as exc:
58+
print('%r generated an exception: %s' % (url, exc))
59+
else:
60+
print('%r page is %d bytes' % (url, len(data)))
61+
```
62+
63+
Same code in [https://docs.python.org/3/library/asyncio-eventloop.html?highlight=asyncio%20run_in_executor#executing-code-in-thread-or-process-pools](https://docs.python.org/3/library/asyncio-eventloop.html?highlight=asyncio%20run_in_executor#executing-code-in-thread-or-process-pools) with executor replaced:
64+
```Python3
65+
# Runs on python version above 3.7
66+
import asyncio
67+
import concurrent.futures
68+
import ThreadPoolExecutorPlus
69+
70+
def blocking_io():
71+
with open('/dev/urandom', 'rb') as f:
72+
return f.read(100)
73+
74+
def cpu_bound():
75+
return sum(i * i for i in range(10 ** 7))
76+
77+
async def main():
78+
loop = asyncio.get_running_loop()
79+
80+
with ThreadPoolExecutorPlus.ThreadPoolExecutor() as pool:
81+
result = await loop.run_in_executor(
82+
pool, blocking_io)
83+
print('custom thread pool', result)
84+
85+
asyncio.run(main())
86+
```
87+
88+

ThreadPoolExecutorPlus/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
__version__ = ''
2+
3+
from .thread import *
4+
5+
__all__ = (
6+
'BrokenThreadPool',
7+
'ThreadPoolExecutor',
8+
)

ThreadPoolExecutorPlus/thread.py

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
# Copyright 2009 Brian Quinlan. All Rights Reserved.
2+
# Licensed to PSF under a Contributor Agreement.
3+
4+
"""Implements ThreadPoolExecutor."""
5+
6+
__author__ = 'WEN (github.com/GoodManWEN)'
7+
__original_author__ = 'Brian Quinlan ([email protected])'
8+
9+
import atexit
10+
from concurrent.futures import _base
11+
import itertools
12+
import queue
13+
import threading
14+
import weakref
15+
import os
16+
17+
# Workers are created as daemon threads. This is done to allow the interpreter
18+
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
19+
# pool (i.e. shutdown() was not called). However, allowing workers to die with
20+
# the interpreter has two undesirable properties:
21+
# - The workers would still be running during interpreter shutdown,
22+
# meaning that they would fail in unpredictable ways.
23+
# - The workers could be killed while evaluating a work item, which could
24+
# be bad if the callable being evaluated has external side-effects e.g.
25+
# writing to a file.
26+
#
27+
# To work around this problem, an exit handler is installed which tells the
28+
# workers to exit when their work queues are empty and then waits until the
29+
# threads finish.
30+
31+
_threads_queues = weakref.WeakKeyDictionary()
32+
_shutdown = False
33+
34+
35+
def _python_exit():
36+
global _shutdown
37+
_shutdown = True
38+
items = list(_threads_queues.items())
39+
for t, q in items:
40+
q.put(None)
41+
for t, q in items:
42+
t.join()
43+
44+
45+
atexit.register(_python_exit)
46+
47+
48+
class _WorkItem(object):
49+
def __init__(self, future, fn, args, kwargs):
50+
self.future = future
51+
self.fn = fn
52+
self.args = args
53+
self.kwargs = kwargs
54+
55+
def run(self):
56+
if not self.future.set_running_or_notify_cancel():
57+
return
58+
59+
try:
60+
result = self.fn(*self.args, **self.kwargs)
61+
except BaseException as exc:
62+
self.future.set_exception(exc)
63+
# Break a reference cycle with the exception 'exc'
64+
self = None
65+
else:
66+
self.future.set_result(result)
67+
68+
69+
class _CustomThread(threading.Thread):
70+
71+
def __init__(self , name , executor_reference, work_queue, initializer, initargs , keep_alive_time):
72+
super().__init__(name = name)
73+
self._executor_reference = executor_reference
74+
self._work_queue = work_queue
75+
self._initializer = initializer
76+
self._initargs = initargs
77+
self._executor = executor_reference()
78+
self._keep_alive_time = keep_alive_time
79+
80+
def run(self):
81+
if self._initializer is not None:
82+
try:
83+
self._initializer(*initargs)
84+
except BaseException:
85+
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
86+
executor = self._executor_reference()
87+
if executor is not None:
88+
executor._initializer_failed()
89+
return
90+
91+
try:
92+
self._executor._adjust_free_thread_count(1)
93+
while True:
94+
try:
95+
work_item = self._work_queue.get(block=True , timeout = self._keep_alive_time)
96+
except queue.Empty:
97+
if self._executor._free_thread_count > self._executor._min_workers:
98+
# Got lock problem here , may cause dead lock slightly chance , don't know how to fix it.
99+
self._executor._adjust_free_thread_count(-1)
100+
self._executor._remove_thread(self)
101+
break
102+
else:
103+
continue
104+
105+
if work_item is not None:
106+
self._executor._adjust_free_thread_count(-1)
107+
work_item.run()
108+
# Delete references to object. See issue16284
109+
del work_item
110+
self._executor._adjust_free_thread_count(1)
111+
continue
112+
executor = self._executor_reference()
113+
# Exit if:
114+
# - The interpreter is shutting down OR
115+
# - The executor that owns the worker has been collected OR
116+
# - The executor that owns the worker has been shutdown.
117+
if _shutdown or executor is None or executor._shutdown:
118+
# Flag the executor as shutting down as early as possible if it
119+
# is not gc-ed yet.
120+
if executor is not None:
121+
executor._shutdown = True
122+
# Notice other workers
123+
self._work_queue.put(None)
124+
return
125+
del executor
126+
except BaseException:
127+
_base.LOGGER.critical('Exception in worker', exc_info=True)
128+
129+
130+
class _CustomWeakSet(weakref.WeakSet):
131+
132+
def __repr__(self):
133+
return repr(self.data)
134+
135+
136+
class BrokenThreadPool(_base.BrokenExecutor):
137+
"""
138+
Raised when a worker thread in a ThreadPoolExecutor failed initializing.
139+
"""
140+
141+
142+
class ThreadPoolExecutor(_base.Executor):
143+
144+
# Used to assign unique thread names when thread_name_prefix is not supplied.
145+
_counter = itertools.count().__next__
146+
147+
def __init__(self, max_workers=None, thread_name_prefix='',
148+
initializer=None, initargs=()):
149+
"""Initializes a new ThreadPoolExecutor instance.
150+
151+
Args:
152+
max_workers: The maximum number of threads that can be used to
153+
execute the given calls.
154+
thread_name_prefix: An optional name prefix to give our threads.
155+
initializer: An callable used to initialize worker threads.
156+
initargs: A tuple of arguments to pass to the initializer.
157+
"""
158+
if max_workers is None:
159+
# Use this number because ThreadPoolExecutor is often
160+
# used to overlap I/O instead of CPU work.
161+
max_workers = (os.cpu_count() or 1) * 5
162+
if max_workers <= 0:
163+
raise ValueError("max_workers must be greater than 0")
164+
165+
if initializer is not None and not callable(initializer):
166+
raise TypeError("initializer must be a callable")
167+
168+
self._max_workers = max_workers
169+
self._work_queue = queue.SimpleQueue()
170+
self._threads = _CustomWeakSet()
171+
self._broken = False
172+
self._shutdown = False
173+
self._shutdown_lock = threading.Lock()
174+
self._free_thread_count = 0
175+
self._free_thread_count_lock = threading.Lock()
176+
self._thread_name_prefix = (thread_name_prefix or
177+
("ThreadPoolExecutor-%d" % self._counter()))
178+
self._initializer = initializer
179+
self._initargs = initargs
180+
self._min_workers = 4
181+
self._keep_alive_time = 100
182+
183+
def set_daemon_opts(self , min_workers = None, max_workers = None, keep_alive_time = None):
184+
if min_workers is not None and min_workers < 1:
185+
raise ValueError('min_workers is not allowed to set below 1')
186+
if max_workers is not None and max_workers < min_workers:
187+
raise ValueError('max_workers is not allowed to set below min_workers')
188+
if min_workers is not None:
189+
self._min_workers = min_workers
190+
if max_workers is not None:
191+
self._max_workers = max_workers
192+
if keep_alive_time is not None:
193+
self._keep_alive_time = keep_alive_time
194+
195+
def submit(self, fn, *args, **kwargs):
196+
with self._shutdown_lock:
197+
if self._broken:
198+
raise BrokenThreadPool(self._broken)
199+
200+
if self._shutdown:
201+
raise RuntimeError('cannot schedule new futures after shutdown')
202+
if _shutdown:
203+
raise RuntimeError('cannot schedule new futures after'
204+
'interpreter shutdown')
205+
206+
f = _base.Future()
207+
w = _WorkItem(f, fn, args, kwargs)
208+
209+
self._work_queue.put(w)
210+
self._adjust_thread_count()
211+
return f
212+
submit.__doc__ = _base.Executor.submit.__doc__
213+
214+
def _adjust_thread_count(self):
215+
# When the executor gets lost, the weakref callback will wake up
216+
# the worker threads.
217+
def weakref_cb(_, q=self._work_queue):
218+
q.put(None)
219+
# TODO(bquinlan): Should avoid creating new threads if there are more
220+
# idle threads than items in the work queue.
221+
num_threads = len(self._threads)
222+
if self._free_thread_count <= self._min_workers and num_threads < self._max_workers:
223+
thread_name = '%s_%d' % (self._thread_name_prefix or self,
224+
num_threads)
225+
t = _CustomThread(name=thread_name,
226+
executor_reference = weakref.ref(self, weakref_cb),
227+
work_queue = self._work_queue,
228+
initializer = self._initializer,
229+
initargs = self._initargs,
230+
keep_alive_time = self._keep_alive_time,
231+
)
232+
t.daemon = True
233+
t.start()
234+
self._threads.add(t)
235+
_threads_queues[t] = self._work_queue
236+
237+
def _adjust_free_thread_count(self , num):
238+
with self._free_thread_count_lock:
239+
self._free_thread_count += num
240+
241+
def _initializer_failed(self):
242+
with self._shutdown_lock:
243+
self._broken = ('A thread initializer failed, the thread pool '
244+
'is not usable anymore')
245+
# Drain work queue and mark pending futures failed
246+
while True:
247+
try:
248+
work_item = self._work_queue.get_nowait()
249+
except queue.Empty:
250+
break
251+
if work_item is not None:
252+
work_item.future.set_exception(BrokenThreadPool(self._broken))
253+
254+
def _remove_thread(self , target):
255+
self._threads.remove(target)
256+
_threads_queues.pop(target)
257+
del target
258+
259+
def shutdown(self, wait=True):
260+
with self._shutdown_lock:
261+
self._shutdown = True
262+
self._work_queue.put(None)
263+
if wait:
264+
for t in self._threads:
265+
t.join()
266+
shutdown.__doc__ = _base.Executor.shutdown.__doc__

docs/doc.rwt

Whitespace-only changes.

requirements.txt

Whitespace-only changes.

0 commit comments

Comments
 (0)