Skip to content

Commit 9699152

Browse files
committed
Port: port improvements
- avoid exception when sending message on closed port (closes #307) - port.msg()/msg_send() now return False if message cannot be sent - publicize task.remove_port() for coherency - add test Change-Id: I6431cb3869acf9fa39b611593f79ef5ff28c6a8e
1 parent 48d4546 commit 9699152

File tree

3 files changed

+45
-11
lines changed

3 files changed

+45
-11
lines changed

lib/ClusterShell/Task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,8 @@ def _add_port(self, port):
655655
self._engine.add(port)
656656

657657
@tasksyncmethod()
658-
def _remove_port(self, port):
659-
"""Remove a port from Engine (private method)."""
658+
def remove_port(self, port):
659+
"""Close and remove a port from task previously created with port()."""
660660
self._engine.remove(port)
661661

662662
def port(self, handler=None, autoclose=False):

lib/ClusterShell/Worker/EngineClient.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -478,12 +478,11 @@ def __repr__(self):
478478
id(self), fd_in, fd_out)
479479

480480
def _start(self):
481+
"""Start port."""
481482
return self
482483

483484
def _close(self, abort, timeout):
484-
"""
485-
Close port pipes.
486-
"""
485+
"""Close port."""
487486
if not self._msgq.empty():
488487
# purge msgq
489488
try:
@@ -513,7 +512,15 @@ def msg(self, send_msg, send_once=False):
513512
"""
514513
Port message send method that will wait for acknowledgement
515514
unless the send_once parameter if set.
515+
516+
May be called from another thread. Will generate ev_msg() on
517+
Port event handler (in Port task/thread).
518+
519+
Return False if the message cannot be sent (eg. port closed).
516520
"""
521+
if self._msgq is None: # called after port closed?
522+
return False
523+
517524
pmsg = EnginePort._Msg(send_msg, not send_once)
518525
self._msgq.put(pmsg, block=True, timeout=None)
519526
try:
@@ -525,6 +532,8 @@ def msg(self, send_msg, send_once=False):
525532

526533
def msg_send(self, send_msg):
527534
"""
528-
Port message send-once method (no acknowledgement).
535+
Port message send-once method (no acknowledgement). See msg().
536+
537+
Return False if the message cannot be sent (eg. port closed).
529538
"""
530-
self.msg(send_msg, send_once=True)
539+
return self.msg(send_msg, send_once=True)

tests/TaskPortTest.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pickle
99
import sys
1010
import threading
11+
import time
1112
import unittest
1213

1314
sys.path.insert(0, '../lib')
@@ -46,15 +47,39 @@ def ev_msg(self, port, msg):
4647
self.assert_(TaskPortTest.got_msg)
4748

4849
def testPortRemove(self):
49-
"""test port remove [private as of 1.2]"""
50-
51-
task = Task()
50+
"""test remove_port()"""
5251

5352
class PortHandler(EventHandler):
5453
def ev_msg(self, port, msg):
5554
pass
5655

56+
task = Task() # new thread
5757
port = task.port(handler=PortHandler(), autoclose=True)
5858
task.resume()
59-
task._remove_port(port)
59+
task.remove_port(port)
6060
task_wait()
61+
62+
def testPortClosed(self):
63+
"""test port msg on closed port"""
64+
# test sending message to "stillborn" port
65+
self.port_msg_result = None
66+
67+
# thread will wait a bit and send a port message
68+
def test_thread_start(port, test):
69+
time.sleep(0.5)
70+
test.port_msg_result = port.msg('foobar')
71+
72+
class TestHandler(EventHandler):
73+
pass
74+
75+
task = task_self()
76+
test_handler = TestHandler()
77+
task.timer(0.2, handler=test_handler, autoclose=False)
78+
port = task.port(handler=test_handler, autoclose=True)
79+
thread = threading.Thread(None, test_thread_start, args=(port, self))
80+
thread.setDaemon(True)
81+
thread.start()
82+
task.resume()
83+
task.abort(kill=True) # will remove_port()
84+
thread.join()
85+
self.assertEqual(self.port_msg_result, False) # test vs. None and True

0 commit comments

Comments
 (0)