Skip to content

Commit 71f5945

Browse files
authored
Merge pull request #4 from cherrypy/transition-graph
Give each wait()ing thread its own state transition pipe
2 parents 9897714 + 53f07f4 commit 71f5945

File tree

5 files changed

+63
-29
lines changed

5 files changed

+63
-29
lines changed

.gitattributes

Lines changed: 0 additions & 2 deletions
This file was deleted.

CHANGES.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
v4.1.0
2+
======
3+
4+
* #4: Give each waiting thread its own state transition pipe.
5+
16
v4.0.0
27
======
38

magicbus/base.py

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,7 @@ def __init__(self, transitions=None, errors=None,
169169
id = hex(random.randint(0, sys.maxint))[-8:]
170170
self.id = id
171171
self._priorities = {}
172-
173-
if select:
174-
(self._state_transition_pipe_read,
175-
self._state_transition_pipe_write) = os.pipe()
176-
else:
177-
(self._state_transition_pipe_read,
178-
self._state_transition_pipe_write) = (None, None)
172+
self._state_transition_pipes = set()
179173

180174
@property
181175
def states(self):
@@ -204,8 +198,11 @@ def _transition(self, newstate, *args, **kwargs):
204198
"""
205199
try:
206200
self.state = newstate
207-
if self._state_transition_pipe_write is not None:
208-
os.write(self._state_transition_pipe_write, '1')
201+
202+
# Write to any pipes created by threads calling self.wait().
203+
# Use list() to avoid "Set changed size during iteration" errors.
204+
for read_fd, write_fd in list(self._state_transition_pipes):
205+
os.write(write_fd, "1")
209206

210207
# Note: logging here means 1) the initial transition
211208
# will not be logged if loggers are set up in the initial
@@ -294,28 +291,52 @@ def publish(self, channel, *args, **kwargs):
294291
raise exc
295292
return output
296293

297-
def wait(self, state, interval=0.1, channel=None):
298-
"""Poll for the given state(s) at intervals; publish to channel."""
294+
def wait(self, state, interval=0.1, channel=None, sleep=False):
295+
"""Poll for the given state(s) at intervals; publish to channel.
296+
297+
If sleep is True, the calling thread loops, sleeping for the given
298+
interval each time, then returning only when the bus state is
299+
one of the given states to wait for.
300+
301+
If sleep is False (the default) and the operating system supports
302+
I/O multiplexing via the 'select' module, then an anonymous pipe
303+
will be used to signal the waiting thread to wake up whenever
304+
the state transitions. This allows the waiting thread to return
305+
when the bus shuts down, for example, rather than waiting for
306+
the sleep interval to elapse first. Each thread that calls wait()
307+
creates a new pipe, so if file descriptors are in short supply
308+
on your system you might need to use sleep instead.
309+
"""
299310
if isinstance(state, (tuple, list)):
300311
_states_to_wait_for = state
301312
else:
302313
_states_to_wait_for = [state]
303314

315+
if select:
316+
pipe = os.pipe()
317+
read_fd, write_fd = pipe
318+
self._state_transition_pipes.add(pipe)
319+
304320
def _wait():
305-
while self.state not in _states_to_wait_for:
306-
if self._state_transition_pipe_read is not None:
307-
try:
308-
r, w, x = select.select([self._state_transition_pipe_read], [], [], interval)
309-
if r:
310-
os.read(self._state_transition_pipe_read, 1)
311-
except (select.error, OSError):
312-
# Interrupted due to a signal (being handled by some
313-
# other thread). No need to panic, here, just check
314-
# the new state and proceed/return.
315-
pass
316-
else:
317-
time.sleep(interval)
318-
self.publish(channel)
321+
try:
322+
while self.state not in _states_to_wait_for:
323+
if select:
324+
try:
325+
r, w, x = select.select([read_fd], [], [], interval)
326+
if r:
327+
os.read(read_fd, 1)
328+
except (select.error, OSError):
329+
# Interrupted due to a signal (being handled by some
330+
# other thread). No need to panic, here, just check
331+
# the new state and proceed/return.
332+
pass
333+
else:
334+
time.sleep(interval)
335+
self.publish(channel)
336+
finally:
337+
self._state_transition_pipes.discard(pipe)
338+
os.close(read_fd)
339+
os.close(write_fd)
319340

320341
# From http://psyco.sourceforge.net/psycoguide/bugs.html:
321342
# "The compiled machine code does not include the regular polling

magicbus/process.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def graceful(self):
132132
self.transition('IDLE')
133133
self.transition('RUN')
134134

135-
def block(self, interval=0.1):
135+
def block(self, interval=0.1, sleep=False):
136136
"""Wait for the EXITED state, KeyboardInterrupt or SystemExit.
137137
138138
This function is intended to be called only by the main thread.
@@ -142,7 +142,7 @@ def block(self, interval=0.1):
142142
the actual execv call (required on some platforms).
143143
"""
144144
try:
145-
self.wait('EXITED', interval=interval, channel='main')
145+
self.wait('EXITED', interval=interval, channel='main', sleep=sleep)
146146
except (KeyboardInterrupt, IOError):
147147
# The time.sleep call might raise
148148
# "IOError: [Errno 4] Interrupted function call" on KBInt.

magicbus/test/test_processbus.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ def test_idle_to_exit(self):
192192

193193
def test_wait(self):
194194
b = ProcessBus()
195+
self.log(b)
195196

196197
def f(desired_state):
197198
time.sleep(0.2)
@@ -221,6 +222,11 @@ def f():
221222
def g():
222223
time.sleep(0.4)
223224

225+
def main_listener():
226+
main_calls.append(1)
227+
main_calls = []
228+
b.subscribe("main", main_listener)
229+
224230
f_thread = threading.Thread(target=f, name='f')
225231
f_thread.start()
226232
threading.Thread(target=g, name='g').start()
@@ -250,6 +256,10 @@ def g():
250256
]
251257
)
252258

259+
# While the bus was blocked, it should have published periodically
260+
# to the "main" channel.
261+
self.assertGreater(len(main_calls), 0)
262+
253263
@unittest.skip("Fails intermittently; https://tinyurl.com/ybwwu4gz")
254264
def test_start_with_callback(self):
255265
b = ProcessBus()

0 commit comments

Comments
 (0)