Skip to content

Commit dfc412c

Browse files
authored
Add timeout option to call_service messages (#994)
* Add timeout option to call_service messages * Don't set call_service timeout by default * Skip test that gets stuck * Add warning about possible service calls blocking indefinitely
1 parent f3a9a7b commit dfc412c

File tree

9 files changed

+242
-47
lines changed

9 files changed

+242
-47
lines changed

ROSBRIDGE_PROTOCOL.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,8 @@ Calls a ROS service.
436436
"service": <string>,
437437
(optional) "args": <list<json>>,
438438
(optional) "fragment_size": <int>,
439-
(optional) "compression": <string>
439+
(optional) "compression": <string>,
440+
(optional) "timeout": <float>
440441
}
441442
```
442443

@@ -449,6 +450,7 @@ Calls a ROS service.
449450
before it is fragmented
450451
* **compression** – an optional string to specify the compression scheme to be
451452
used on messages. Valid values are "none" and "png"
453+
* **timeout** – the time, in seconds, to wait for a response from the server
452454

453455

454456
Stops advertising an external ROS service server

rosbridge_library/src/rosbridge_library/capabilities/call_service.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ def __init__(self, protocol):
5252
# Call superclass constructor
5353
Capability.__init__(self, protocol)
5454

55+
self.default_timeout = (
56+
protocol.node_handle.get_parameter("default_call_service_timeout")
57+
.get_parameter_value()
58+
.double_value
59+
)
60+
5561
# Register the operations that this capability provides
5662
call_services_in_new_thread = (
5763
protocol.node_handle.get_parameter("call_services_in_new_thread")
@@ -81,6 +87,7 @@ def call_service(self, message):
8187
fragment_size = message.get("fragment_size", None)
8288
compression = message.get("compression", "none")
8389
args = message.get("args", [])
90+
timeout = message.get("timeout", self.default_timeout)
8491

8592
if CallService.services_glob is not None and CallService.services_glob:
8693
self.protocol.log(
@@ -112,7 +119,14 @@ def call_service(self, message):
112119
e_cb = partial(self._failure, cid, service)
113120

114121
# Run service caller in the same thread.
115-
ServiceCaller(trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle).run()
122+
ServiceCaller(
123+
trim_servicename(service),
124+
args,
125+
timeout,
126+
s_cb,
127+
e_cb,
128+
self.protocol.node_handle,
129+
).run()
116130

117131
def _success(self, cid, service, fragment_size, compression, message):
118132
outgoing_message = {

rosbridge_library/src/rosbridge_library/internal/services.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@
3030
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
3131
# POSSIBILITY OF SUCH DAMAGE.
3232

33-
import time
34-
from threading import Thread
33+
from threading import Event, Thread
3534
from typing import Any, Callable, Optional
3635

37-
import rclpy
3836
from rclpy.callback_groups import ReentrantCallbackGroup
3937
from rclpy.expand_topic_name import expand_topic_name
4038
from rclpy.node import Node
@@ -58,6 +56,7 @@ def __init__(
5856
self,
5957
service: str,
6058
args: dict,
59+
timeout: float,
6160
success_callback: Callable[[str, str, int, bool, Any], None],
6261
error_callback: Callable[[str, str, Exception], None],
6362
node_handle: Node,
@@ -71,6 +70,8 @@ def __init__(
7170
ordered list, or a dict of name-value pairs. Anything else will be
7271
treated as though no arguments were provided (which is still valid for
7372
some kinds of service)
73+
timeout -- the time, in seconds, to wait for a response from the server.
74+
A non-positive value means no timeout.
7475
success_callback -- a callback to call with the JSON result of the
7576
service call
7677
error_callback -- a callback to call if an error occurs. The
@@ -81,14 +82,22 @@ def __init__(
8182
self.daemon = True
8283
self.service = service
8384
self.args = args
85+
self.timeout = timeout
8486
self.success = success_callback
8587
self.error = error_callback
8688
self.node_handle = node_handle
8789

8890
def run(self) -> None:
8991
try:
9092
# Call the service and pass the result to the success handler
91-
self.success(call_service(self.node_handle, self.service, args=self.args))
93+
self.success(
94+
call_service(
95+
self.node_handle,
96+
self.service,
97+
args=self.args,
98+
server_response_timeout=self.timeout,
99+
)
100+
)
92101
except Exception as e:
93102
# On error, just pass the exception to the error handler
94103
self.error(e)
@@ -114,8 +123,8 @@ def call_service(
114123
node_handle: Node,
115124
service: str,
116125
args: Optional[dict] = None,
117-
server_timeout_time: float = 1.0,
118-
sleep_time: float = 0.001,
126+
server_ready_timeout: float = 1.0,
127+
server_response_timeout: float = 5.0,
119128
) -> dict:
120129
# Given the service name, fetch the type and class of the service,
121130
# and a request instance
@@ -141,20 +150,32 @@ def call_service(
141150
service_class, service, callback_group=ReentrantCallbackGroup()
142151
)
143152

144-
if not client.wait_for_service(server_timeout_time):
153+
if not client.wait_for_service(server_ready_timeout):
145154
node_handle.destroy_client(client)
146155
raise InvalidServiceException(service)
147156

148157
future = client.call_async(inst)
149-
while rclpy.ok() and not future.done():
150-
time.sleep(sleep_time)
151-
result = future.result()
158+
event = Event()
159+
160+
def future_done_callback():
161+
event.set()
162+
163+
future.add_done_callback(lambda _: future_done_callback())
164+
165+
if not event.wait(timeout=(server_response_timeout if server_response_timeout > 0 else None)):
166+
future.cancel()
167+
node_handle.destroy_client(client)
168+
raise Exception("Timeout exceeded while waiting for service response")
152169

153170
node_handle.destroy_client(client)
171+
172+
result = future.result()
173+
154174
if result is not None:
155175
# Turn the response into JSON and pass to the callback
156176
json_response = extract_values(result)
157177
else:
158-
raise Exception(result)
178+
exception = future.exception()
179+
raise Exception("Service call exception: " + str(exception))
159180

160181
return json_response

rosbridge_library/test/capabilities/test_call_service.py

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ def trigger_cb(self, request, response):
2424
response.message = "called trigger service successfully"
2525
return response
2626

27+
def trigger_long_cb(self, request, response):
28+
"""Helper callback function for a long running test service with no arguments."""
29+
time.sleep(0.5)
30+
response.success = True
31+
response.message = "called trigger service successfully"
32+
return response
33+
2734
def set_bool_cb(self, request, response):
2835
"""Helper callback function for a test service with arguments."""
2936
response.success = request.data
@@ -40,6 +47,7 @@ def setUp(self):
4047
self.executor.add_node(self.node)
4148

4249
self.node.declare_parameter("call_services_in_new_thread", False)
50+
self.node.declare_parameter("default_call_service_timeout", 5.0)
4351
self.node.declare_parameter("send_action_goals_in_new_thread", False)
4452

4553
# Create service servers with a separate callback group
@@ -50,6 +58,12 @@ def setUp(self):
5058
self.trigger_cb,
5159
callback_group=self.cb_group,
5260
)
61+
self.trigger_long_srv = self.node.create_service(
62+
Trigger,
63+
self.node.get_name() + "/trigger_long",
64+
self.trigger_long_cb,
65+
callback_group=self.cb_group,
66+
)
5367
self.set_bool_srv = self.node.create_service(
5468
SetBool,
5569
self.node.get_name() + "/set_bool",
@@ -62,8 +76,9 @@ def setUp(self):
6276

6377
def tearDown(self):
6478
self.executor.remove_node(self.node)
65-
self.node.destroy_node()
6679
self.executor.shutdown()
80+
self.exec_thread.join()
81+
self.node.destroy_node()
6782
rclpy.shutdown()
6883

6984
def test_missing_arguments(self):
@@ -98,12 +113,6 @@ def cb(msg, cid=None, compression="none"):
98113

99114
s.call_service(send_msg)
100115

101-
timeout = 1.0
102-
start = time.time()
103-
while time.time() - start < timeout:
104-
if received["arrived"]:
105-
break
106-
107116
self.assertTrue(received["arrived"])
108117
values = received["msg"]["values"]
109118
self.assertEqual(values["success"], True)
@@ -135,12 +144,6 @@ def cb(msg, cid=None, compression="none"):
135144

136145
s.call_service(send_msg)
137146

138-
timeout = 1.0
139-
start = time.time()
140-
while time.time() - start < timeout:
141-
if received["arrived"]:
142-
break
143-
144147
self.assertTrue(received["arrived"])
145148
values = received["msg"]["values"]
146149
self.assertEqual(values["success"], True)
@@ -174,11 +177,45 @@ def cb(msg, cid=None, compression="none"):
174177

175178
s.call_service(send_msg)
176179

177-
timeout = 1.0
178-
start = time.time()
179-
while time.time() - start < timeout:
180-
if received["arrived"]:
181-
break
180+
self.assertTrue(received["arrived"])
181+
self.assertFalse(received["msg"]["result"])
182+
183+
def test_call_service_timeout(self):
184+
185+
client = self.node.create_client(Trigger, self.trigger_long_srv.srv_name)
186+
assert client.wait_for_service(1.0)
187+
188+
proto = Protocol("test_call_service_timeout", self.node)
189+
s = CallService(proto)
190+
send_msg = loads(
191+
dumps({"op": "call_service", "service": self.trigger_long_srv.srv_name, "timeout": 2.0})
192+
)
193+
194+
received = {"msg": None, "arrived": False}
195+
196+
def cb(msg, cid=None, compression="none"):
197+
print("Received message")
198+
received["msg"] = msg
199+
received["arrived"] = True
200+
201+
proto.send = cb
202+
203+
s.call_service(send_msg)
204+
205+
self.assertTrue(received["arrived"])
206+
self.assertTrue(received["msg"]["result"])
207+
values = received["msg"]["values"]
208+
self.assertEqual(values["success"], True)
209+
self.assertEqual(values["message"], "called trigger service successfully")
210+
211+
send_msg = loads(
212+
dumps({"op": "call_service", "service": self.trigger_long_srv.srv_name, "timeout": 0.1})
213+
)
214+
received = {"msg": None, "arrived": False}
215+
216+
s.call_service(send_msg)
182217

183218
self.assertTrue(received["arrived"])
184219
self.assertFalse(received["msg"]["result"])
220+
values = received["msg"]["values"]
221+
self.assertEqual(values, "Timeout exceeded while waiting for service response")

0 commit comments

Comments
 (0)