Skip to content

Commit 97ba696

Browse files
committed
Event: implement ev_written event
Generate ev_written after some write buffer data have been written. The amount of data written is available as an additional argument of the ev_written event handler and can be used to track writing. Closes #194. Change-Id: I41db8ac3783143d7e9dc6b32f0daad4d7362b667
1 parent 636ceb7 commit 97ba696

File tree

6 files changed

+81
-15
lines changed

6 files changed

+81
-15
lines changed

lib/ClusterShell/Event.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,25 @@ def ev_error(self, worker):
9090
* :attr:`.Worker.current_errmsg` - read error message
9191
"""
9292

93-
def ev_written(self, worker):
93+
def ev_written(self, worker, node, sname, size):
9494
"""
95-
Called to indicate that writing has been done.
95+
Called to indicate that some writing has been done by the worker to a
96+
node on a given stream. This event is only generated when ``write()``
97+
is previously called on the worker.
98+
99+
This handler may be called very often depending on the number of target
100+
nodes, the amount of data to write and the block size used by the
101+
worker.
102+
103+
Note: up to ClusterShell 1.6, this event handler wasn't implemented. To
104+
properly handle ev_written after 1.6, the method signature must consist
105+
of the following parameters:
96106
97107
:param worker: :class:`.Worker` object
108+
:param node: node (or) key
109+
:param sname: stream name
110+
:param size: amount of bytes that has just been written to node/stream
111+
associated with this event
98112
"""
99113

100114
def ev_hup(self, worker):

lib/ClusterShell/Propagation.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def __init__(self, task, gateway):
231231
self.task = task
232232
self.gateway = gateway
233233
self.workers = {}
234-
self._history = {} # track informations about previous states
234+
self._cfg_write_hist = deque() # track write requests
235235
self._sendq = deque()
236236
self._rc = None
237237
self.logger = logging.getLogger(__name__)
@@ -259,7 +259,6 @@ def start(self):
259259
# Immediately send CFG
260260
cfg = ConfigurationMessage(self.gateway)
261261
cfg.data_encode(self.task.topology)
262-
self._history['cfg_id'] = cfg.msgid
263262
self.send(cfg)
264263

265264
def recv(self, msg):
@@ -305,8 +304,6 @@ def shell(self, nodes, command, worker, timeout, stderr, gw_invoke_cmd,
305304
'remote': remote,
306305
}
307306
ctl.data_encode(ctl_data)
308-
309-
self._history['ctl_id'] = ctl.msgid
310307
self.send_queued(ctl)
311308

312309
def write(self, nodes, buf, worker):
@@ -322,7 +319,7 @@ def write(self, nodes, buf, worker):
322319
'buf': buf,
323320
}
324321
ctl.data_encode(ctl_data)
325-
self._history['ctl_id'] = ctl.msgid
322+
self._cfg_write_hist.appendleft((ctl.msgid, nodes, len(buf), worker))
326323
self.send_queued(ctl)
327324

328325
def set_write_eof(self, nodes, worker):
@@ -333,14 +330,12 @@ def set_write_eof(self, nodes, worker):
333330
ctl = ControlMessage(id(worker))
334331
ctl.action = 'eof'
335332
ctl.target = nodes
336-
337-
self._history['ctl_id'] = ctl.msgid
338333
self.send_queued(ctl)
339334

340335
def recv_cfg(self, msg):
341336
"""handle incoming messages for state 'propagate configuration'"""
342337
self.logger.debug("recv_cfg")
343-
if msg.type == 'ACK': # and msg.ack == self._history['cfg_id']:
338+
if msg.type == 'ACK':
344339
self.logger.debug("CTL - connection with gateway fully established")
345340
self.setup = True
346341
self.send_dequeue()
@@ -350,8 +345,15 @@ def recv_cfg(self, msg):
350345
def recv_ctl(self, msg):
351346
"""handle incoming messages for state 'control'"""
352347
self.logger.debug("recv_ctl")
353-
if msg.type == 'ACK': # and msg.ack == self._history['ctl_id']:
348+
if msg.type == 'ACK':
354349
self.logger.debug("got ack (%s)", msg.type)
350+
# check if ack matches write history msgid to generate ev_written
351+
if self._cfg_write_hist and msg.ack == self._cfg_write_hist[-1][0]:
352+
_, nodes, bytes_count, metaworker = self._cfg_write_hist.pop()
353+
for node in nodes:
354+
# we are losing track of the gateway here, we could override
355+
# on_written in WorkerTree if needed (eg. for stats)
356+
metaworker._on_written(node, bytes_count, 'stdin')
355357
self.send_dequeue()
356358
elif isinstance(msg, RoutedMessageBase):
357359
metaworker = self.workers[msg.srcid]

lib/ClusterShell/Worker/EngineClient.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ def _handle_write(self, sname):
324324
return
325325
raise
326326
if wcnt > 0:
327+
self.worker._on_written(self.key, wcnt, sname)
327328
# dequeue written buffer
328329
wfile.wbuf = wfile.wbuf[wcnt:]
329330
# check for possible ending

lib/ClusterShell/Worker/Tree.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,14 @@ def ev_error(self, worker):
8181
worker.current_errmsg,
8282
'stderr')
8383

84-
def ev_written(self, worker):
84+
def ev_written(self, worker, node, sname, size):
8585
"""
8686
Called to indicate that writing has been done.
8787
"""
8888
metaworker = self.metaworker
89-
metaworker.current_node = worker.current_node
90-
metaworker.eh.ev_written(metaworker)
89+
metaworker.current_node = node
90+
metaworker.current_sname = sname
91+
metaworker.eh.ev_written(metaworker, node, sname, size)
9192

9293
def ev_hup(self, worker):
9394
"""

lib/ClusterShell/Worker/Worker.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
A worker is a generic object which provides "grouped" work in a specific task.
3737
"""
3838

39+
import inspect
3940
import warnings
4041

4142
from ClusterShell.Worker.EngineClient import EngineClient
@@ -152,6 +153,17 @@ def _on_rc(self, key, rc):
152153
if self.eh:
153154
self.eh.ev_hup(self)
154155

156+
def _on_written(self, key, bytes_count, sname):
157+
"""Notification of bytes written."""
158+
# set node and stream name (compat only)
159+
self.current_node = key
160+
self.current_sname = sname
161+
162+
# generate event - for ev_written, also check for new signature (1.7)
163+
# NOTE: add DeprecationWarning in 1.8 for old ev_written signature
164+
if self.eh and len(inspect.getargspec(self.eh.ev_written)[0]) == 5:
165+
self.eh.ev_written(self, key, sname, bytes_count)
166+
155167
# Base getters
156168

157169
def last_read(self):

tests/TaskEventTest.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def do_asserts_read_notimeout(self):
3131
assert self.cnt_pickup > 0, "ev_pickup not called"
3232
assert self.did_read, "ev_read not called"
3333
assert not self.did_readerr, "ev_error called"
34+
assert self.cnt_written == 0, "ev_written called"
3435
assert self.cnt_hup > 0, "ev_hup not called"
3536
assert self.did_close, "ev_close not called"
3637
assert not self.did_timeout, "ev_timeout called"
@@ -40,15 +41,27 @@ def do_asserts_timeout(self):
4041
assert self.cnt_pickup > 0, "ev_pickup not called"
4142
assert not self.did_read, "ev_read called"
4243
assert not self.did_readerr, "ev_error called"
44+
assert self.cnt_written == 0, "ev_written called"
4345
assert self.cnt_hup == 0, "ev_hup called"
4446
assert self.did_close, "ev_close not called"
4547
assert self.did_timeout, "ev_timeout not called"
4648

4749
def do_asserts_noread_notimeout(self):
4850
assert self.did_start, "ev_start not called"
4951
assert self.cnt_pickup > 0, "ev_pickup not called"
50-
assert not self.did_read, "ev_read not called"
52+
assert not self.did_read, "ev_read called"
53+
assert not self.did_readerr, "ev_error called"
54+
assert self.cnt_written == 0, "ev_written called"
55+
assert self.cnt_hup > 0, "ev_hup not called"
56+
assert self.did_close, "ev_close not called"
57+
assert not self.did_timeout, "ev_timeout called"
58+
59+
def do_asserts_read_write_notimeout(self):
60+
assert self.did_start, "ev_start not called"
61+
assert self.cnt_pickup > 0, "ev_pickup not called"
62+
assert self.did_read, "ev_read not called"
5163
assert not self.did_readerr, "ev_error called"
64+
assert self.cnt_written > 0, "ev_written not called"
5265
assert self.cnt_hup > 0, "ev_hup not called"
5366
assert self.did_close, "ev_close not called"
5467
assert not self.did_timeout, "ev_timeout called"
@@ -58,6 +71,8 @@ def reset_asserts(self):
5871
self.cnt_pickup = 0
5972
self.did_read = False
6073
self.did_readerr = False
74+
self.cnt_written = 0
75+
self.bytes_written = 0
6176
self.cnt_hup = 0
6277
self.did_close = False
6378
self.did_timeout = False
@@ -78,6 +93,10 @@ def ev_error(self, worker):
7893
assert worker.current_errmsg == "errerrerrerrerrerrerrerr"
7994
assert worker.current_msg != "errerrerrerrerrerrerrerr"
8095

96+
def ev_written(self, worker, node, sname, size):
97+
self.cnt_written += 1
98+
self.bytes_written += size
99+
81100
def ev_hup(self, worker):
82101
self.cnt_hup += 1
83102

@@ -214,3 +233,20 @@ def test_ev_pickup_fanout(self):
214233
self.assertEqual(eh.cnt_hup, 3)
215234
finally:
216235
task.set_info("fanout", fanout)
236+
237+
def test_ev_written(self):
238+
"""test ev_written event"""
239+
task = task_self()
240+
241+
eh = TestHandler()
242+
243+
worker = task.shell("cat", handler=eh)
244+
content = "abcdefghijklmnopqrstuvwxyz\n"
245+
worker.write(content)
246+
worker.set_write_eof()
247+
248+
task.resume()
249+
250+
eh.do_asserts_read_write_notimeout()
251+
self.assertEqual(eh.cnt_written, 1)
252+
self.assertEqual(eh.bytes_written, len(content))

0 commit comments

Comments
 (0)