Skip to content

Commit 0df4442

Browse files
committed
EngineClient: improve write handling
- generate ev_written in the right places - handle broken pipe errors Closes #196. Change-Id: Ib2d742bce31b6004ca904368eb7037a57eac30b4
1 parent 9699152 commit 0df4442

File tree

4 files changed

+138
-16
lines changed

4 files changed

+138
-16
lines changed

ChangeLog

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
2016-05-22 S. Thiell <[email protected]>
2+
3+
* EngineClient.py: handle broken pipe on write() (ticket #196).
4+
15
2016-04-24 S. Thiell <[email protected]>
26

37
* NodeSet.py: allow empty string as valid argument for empty NodeSet

lib/ClusterShell/Engine/Engine.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#
2-
# Copyright CEA/DAM/DIF (2007-2015)
3-
# Contributor: Stephane THIELL <[email protected]>
2+
# Copyright CEA/DAM/DIF (2007-2016)
3+
# Contributor: Stephane THIELL <[email protected]>
44
#
55
# This file is part of the ClusterShell library.
66
#
@@ -44,13 +44,17 @@
4444
import time
4545
import traceback
4646

47+
48+
LOGGER = logging.getLogger(__name__)
49+
4750
# Engine client fd I/O event interest bits
4851
E_READ = 0x1
4952
E_WRITE = 0x2
5053

5154
# Define epsilon value for time float arithmetic operations
5255
EPSILON = 1.0e-3
5356

57+
5458
class EngineException(Exception):
5559
"""
5660
Base engine exception.
@@ -219,9 +223,8 @@ def arm(self, client):
219223
# Just print a debug message that could help detect issues
220224
# coming from a long-running timer handler.
221225
if self.fire_date < time_current:
222-
logging.getLogger(__name__).debug(
223-
"Warning: passed interval time for %r (long running "
224-
"event handler?)", self.client)
226+
LOGGER.debug("Warning: passed interval time for %r "
227+
"(long running event handler?)", self.client)
225228

226229
def disarm(self):
227230
client = self.client
@@ -421,8 +424,8 @@ def _fd2client(self, fd):
421424
if client._reg_epoch < self._current_loopcnt:
422425
return client, stream
423426
else:
424-
self._debug("ENGINE _fd2client: ignoring just re-used FD %d" \
425-
% stream.fd)
427+
LOGGER.debug("_fd2client: ignoring just re-used FD %d",
428+
stream.fd)
426429
return (None, None)
427430

428431
def add(self, client):
@@ -472,10 +475,18 @@ def remove_stream(self, client, stream):
472475
needed read flush as needed. If no more retainable stream
473476
remains for this client, this method automatically removes the
474477
entire client from engine.
478+
479+
This function does nothing if the stream is not registered.
475480
"""
481+
if stream.fd not in self.reg_clifds:
482+
LOGGER.debug("remove_stream: %s not registered", stream)
483+
return
484+
476485
self.unregister_stream(client, stream)
486+
477487
# _close_stream() will flush pending read buffers so may generate events
478488
client._close_stream(stream.name)
489+
479490
# client may have been removed by previous events, if not check whether
480491
# some retained streams still remain
481492
if client in self._clients and not client.streams.retained():
@@ -596,8 +607,7 @@ def set_events(self, client, stream):
596607
(stream.new_events, stream.events, client, stream.name))
597608

598609
if not client.registered:
599-
logging.getLogger(__name__).debug( \
600-
"set_events: client %s not registered" % self)
610+
LOGGER.debug("set_events: client %s not registered", self)
601611
return
602612

603613
chgbits = stream.new_events ^ stream.events
@@ -705,7 +715,7 @@ def run(self, timeout):
705715
# BaseException. For now, print a backtrace in debug to
706716
# help detect the problem.
707717
tbexc = traceback.format_exception(exc_t, exc_val, exc_tb)
708-
logging.getLogger(__name__).debug(''.join(tbexc))
718+
LOGGER.debug(''.join(tbexc))
709719
raise
710720
raise
711721
finally:
@@ -747,6 +757,6 @@ def exited(self):
747757
return not self.running and self._exited
748758

749759
def _debug(self, s):
750-
"""library engine debugging hook"""
751-
#logging.getLogger(__name__).debug(s)
760+
"""library engine verbose debugging hook"""
761+
#LOGGER.debug(s)
752762
pass

lib/ClusterShell/Worker/EngineClient.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#
2-
# Copyright CEA/DAM/DIF (2009-2014)
3-
# Contributor: Stephane THIELL <[email protected]>
2+
# Copyright CEA/DAM/DIF (2009-2016)
3+
# Contributor: Stephane THIELL <[email protected]>
44
#
55
# This file is part of the ClusterShell library.
66
#
@@ -41,6 +41,7 @@
4141
"""
4242

4343
import errno
44+
import logging
4445
import os
4546
import Queue
4647
import thread
@@ -51,6 +52,9 @@
5152
from ClusterShell.Engine.Engine import EngineBaseTimer, E_READ, E_WRITE
5253

5354

55+
LOGGER = logging.getLogger(__name__)
56+
57+
5458
class EngineClientException(Exception):
5559
"""Generic EngineClient exception."""
5660

@@ -319,20 +323,27 @@ def _handle_write(self, sname):
319323
try:
320324
wcnt = os.write(wfile.fd, wfile.wbuf)
321325
except OSError, exc:
322-
if (exc.errno == errno.EAGAIN):
326+
if exc.errno == errno.EAGAIN:
327+
# _handle_write() is not only called by the engine but also
328+
# by _write(), so this is legit: we just try again later
323329
self._set_writing(sname)
324330
return
331+
if exc.errno == errno.EPIPE:
332+
# broken pipe: log warning message and do NOT retry
333+
LOGGER.warning('%s: %s', self, exc)
334+
return
325335
raise
326336
if wcnt > 0:
327-
self.worker._on_written(self.key, wcnt, sname)
328337
# dequeue written buffer
329338
wfile.wbuf = wfile.wbuf[wcnt:]
330339
# check for possible ending
331340
if wfile.eof and not wfile.wbuf:
341+
self.worker._on_written(self.key, wcnt, sname)
332342
# remove stream from engine (not directly)
333343
self._engine.remove_stream(self, wfile)
334344
else:
335345
self._set_writing(sname)
346+
self.worker._on_written(self.key, wcnt, sname)
336347

337348
def _exec_nonblock(self, commandlist, shell=False, env=None):
338349
"""

tests/StreamWorkerTest.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,100 @@ def ev_timeout(self, worker):
200200
self.assertRaises(OSError, os.close, rfd1)
201201
os.close(wfd1)
202202

203+
def test_006_worker_abort_on_written(self):
204+
"""test StreamWorker abort on ev_written"""
205+
206+
# This test creates a writable StreamWorker that will abort after the
207+
# first write, to check whether ev_written is generated in the right
208+
# place.
209+
210+
class TestH(EventHandler):
211+
def __init__(self, testcase, rfd):
212+
self.testcase = testcase
213+
self.rfd = rfd
214+
self.check_written = 0
215+
216+
def ev_written(self, worker, node, sname, size):
217+
self.check_written += 1
218+
self.testcase.assertEqual(os.read(self.rfd, 1024), "initial")
219+
worker.abort()
220+
221+
rfd, wfd = os.pipe()
222+
223+
hdlr = TestH(self, rfd)
224+
worker = StreamWorker(handler=hdlr)
225+
226+
worker.set_writer("test", wfd) # closefd=True
227+
worker.write("initial", "test")
228+
229+
self.run_worker(worker)
230+
self.assertEqual(hdlr.check_written, 1)
231+
os.close(rfd)
232+
233+
def test_007_worker_abort_on_written_eof(self):
234+
"""test StreamWorker abort on ev_written (with EOF)"""
235+
236+
# This test is similar to previous test test_006 but does
237+
# write() + set_write_eof().
238+
239+
class TestH(EventHandler):
240+
def __init__(self, testcase, rfd):
241+
self.testcase = testcase
242+
self.rfd = rfd
243+
self.check_written = 0
244+
245+
def ev_written(self, worker, node, sname, size):
246+
self.check_written += 1
247+
self.testcase.assertEqual(os.read(self.rfd, 1024), "initial")
248+
worker.abort()
249+
250+
rfd, wfd = os.pipe()
251+
252+
hdlr = TestH(self, rfd)
253+
worker = StreamWorker(handler=hdlr)
254+
255+
worker.set_writer("test", wfd) # closefd=True
256+
worker.write("initial", "test")
257+
worker.set_write_eof()
258+
259+
self.run_worker(worker)
260+
self.assertEqual(hdlr.check_written, 1)
261+
os.close(rfd)
262+
263+
def test_008_broken_pipe_on_write(self):
264+
"""test StreamWorker with broken pipe on write()"""
265+
266+
# This test creates a writable StreamWorker that will close the read
267+
# side of the pipe just after the first write to generate a broken
268+
# pipe error.
269+
270+
class TestH(EventHandler):
271+
def __init__(self, testcase, rfd):
272+
self.testcase = testcase
273+
self.rfd = rfd
274+
self.check_hup = 0
275+
self.check_written = 0
276+
277+
def ev_hup(self, worker):
278+
self.check_hup += 1
279+
280+
def ev_written(self, worker, node, sname, size):
281+
self.check_written += 1
282+
self.testcase.assertEqual(os.read(self.rfd, 1024), "initial")
283+
# close reader, that will stop the StreamWorker
284+
os.close(self.rfd)
285+
# The following write call used to raise broken pipe before
286+
# version 1.7.2.
287+
worker.write("final")
288+
289+
rfd, wfd = os.pipe()
290+
291+
hdlr = TestH(self, rfd)
292+
worker = StreamWorker(handler=hdlr)
293+
294+
worker.set_writer("test", wfd) # closefd=True
295+
worker.write("initial", "test")
296+
297+
self.run_worker(worker)
298+
self.assertEqual(hdlr.check_hup, 1)
299+
self.assertEqual(hdlr.check_written, 1)

0 commit comments

Comments
 (0)