Skip to content

Commit abb84c8

Browse files
author
=
committed
Bug fixed
1 parent 570a490 commit abb84c8

File tree

1 file changed

+43
-14
lines changed

1 file changed

+43
-14
lines changed

ThreadPoolExecutorPlus/thread.py

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ def run(self):
7777

7878
class _CustomThread(threading.Thread):
7979

80-
def __init__(self , name , executor_reference, work_queue, initializer, initargs , keep_alive_time):
80+
def __init__(self , name , executor_reference, work_queue, initializer, initargs , thread_counter):
8181
super().__init__(name = name)
8282
self._executor_reference = executor_reference
8383
self._work_queue = work_queue
8484
self._initializer = initializer
8585
self._initargs = initargs
8686
self._executor = executor_reference()
87-
self._keep_alive_time = keep_alive_time
87+
self._thread_count_id = thread_counter
8888

8989
def run(self):
9090
if self._initializer is not None:
@@ -101,11 +101,37 @@ def run(self):
101101
self._executor._adjust_free_thread_count(1)
102102
while True:
103103
try:
104-
work_item = self._work_queue.get(block=True , timeout = self._keep_alive_time)
104+
work_item = self._work_queue.get(block=True , timeout = self._executor._keep_alive_time)
105105
except queue.Empty:
106-
# Got lock problem here , may cause dead lock slightly chance , don't know how to fix it.
106+
'''
107+
A bug may cause potential risk here.
108+
109+
Simply put, cause unrigister queue listening (adjust free thread count -= 1) is not atomic operation after
110+
exception thrown, thus there's slightly chance thread swtiching (which is controled by os and not going to be
111+
interfered by user) happens just right the time after listening stopped but before free thread count is adjusted.
112+
113+
At that precisely time point , if there's a new task going to be added in main thread , determine statement
114+
in ThreadPoolExecutor._adjust_thread_count would consider mistakenly there's still enough worker listening ,
115+
and decide not to generate a new thread. That may eventually cause a final result of task added in queue
116+
but no worker takes it out in the meantime. The last task will never run if there's no new tasks trigger ThreadPoolExecutor._adjust_thread_count afterwards.
117+
118+
To fix this problem , the most reasonable , yes high cost , solution would be hacking into cpython's queue
119+
module (which was a .pyd file projected to 'Modules/_queuemodule.c' in source code) , makes
120+
self._executor._free_thread_count adjusted before listening suspended in interrupt of timeout's callback
121+
functions.
122+
123+
If that sounds a little bit hard to implement , the alternatively simplified approach would be preserving
124+
serveral 'core thread' which would still looping in timeout but never halt , with the same quantity as
125+
ThreadPoolExecutor.min_workers.
126+
127+
Based on this implementation , the status refresh not in time bug will still occur whereas there's always
128+
sub-threads working to trigger task out of task queue. This may cause sub-threads block , which was slightly
129+
out of line with expectations, at some paticular situation if existing threads occasionally full loaded.
130+
With another potential symptom is , depends on which task was last triggered by system, size of thread
131+
pool may not be precisely the same as expect just right after thread shrink happened.
132+
'''
107133
with self._executor._free_thread_count_lock:
108-
if self._executor._free_thread_count > self._executor._min_workers:
134+
if self._executor._free_thread_count > self._executor._min_workers and self._thread_count_id >= self._executor._min_workers:
109135
self._executor._free_thread_count -= 1
110136
break
111137
else:
@@ -188,18 +214,20 @@ def __init__(self, max_workers=None, thread_name_prefix='',
188214
("ThreadPoolExecutor-%d" % self._counter()))
189215
self._initializer = initializer
190216
self._initargs = initargs
217+
self._thread_counter = 0
191218

192219
def set_daemon_opts(self , min_workers = None, max_workers = None, keep_alive_time = None):
193-
if min_workers is not None and min_workers < 2:
194-
raise ValueError('min_workers is not allowed to set below 2')
220+
if min_workers is not None and min_workers < 1:
221+
raise ValueError('min_workers is not allowed to set below 1')
195222
if max_workers is not None and max_workers < min_workers:
196223
raise ValueError('max_workers is not allowed to set below min_workers')
197-
if min_workers is not None:
198-
self._min_workers = min_workers
199-
if max_workers is not None:
200-
self._max_workers = max_workers
201-
if keep_alive_time is not None:
202-
self._keep_alive_time = keep_alive_time
224+
with self._free_thread_count_lock:
225+
if min_workers is not None:
226+
self._min_workers = min_workers
227+
if max_workers is not None:
228+
self._max_workers = max_workers
229+
if keep_alive_time is not None:
230+
self._keep_alive_time = keep_alive_time
203231

204232
def submit(self, fn, *args, **kwargs):
205233
with self._shutdown_lock:
@@ -240,12 +268,13 @@ def weakref_cb(_, q=self._work_queue):
240268
work_queue = self._work_queue,
241269
initializer = self._initializer,
242270
initargs = self._initargs,
243-
keep_alive_time = self._keep_alive_time,
271+
thread_counter = self._thread_counter
244272
)
245273
t.daemon = True
246274
t.start()
247275
self._threads.add(t)
248276
_threads_queues[t] = self._work_queue
277+
self._thread_counter += 1
249278

250279
def _adjust_free_thread_count(self , num):
251280
with self._free_thread_count_lock:

0 commit comments

Comments
 (0)