Skip to content

Commit 786dd9c

Browse files
committed
Use async tasks to invoke callbacks in main thread
Adds helper python classes to store received messages/service requests and notify the main julia thread to invoke the user-specified callback. Fixes #15.
1 parent c2e25ed commit 786dd9c

File tree

11 files changed

+160
-95
lines changed

11 files changed

+160
-95
lines changed

src/RobotOS.jl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@ module RobotOS
33
using PyCall
44

55
#Empty imported modules for valid precompilation
6-
const __sys__ = PyCall.PyNULL()
6+
const _py_sys = PyCall.PyNULL()
7+
const _py_ros_callbacks = PyCall.PyNULL()
78
const __rospy__ = PyCall.PyNULL()
89

910
function __init__()
1011
#Put julia's ARGS into python's so remappings will work
11-
copy!(__sys__, pyimport("sys"))
12-
__sys__["argv"] = ARGS
12+
copy!(_py_sys, pyimport("sys"))
13+
_py_sys["argv"] = ARGS
14+
15+
if ! (dirname(@__FILE__) in _py_sys["path"])
16+
unshift!(_py_sys["path"], dirname(@__FILE__))
17+
end
18+
copy!(_py_ros_callbacks, pyimport("ros_callbacks"))
1319

1420
try
1521
copy!(__rospy__, pyimport("rospy"))
@@ -23,13 +29,12 @@ function __init__()
2329
end
2430
end
2531

26-
_threads_enabled() = ccall(:jl_threading_enabled, Cint, ()) != 0
27-
2832
include("debug.jl")
2933
include("time.jl")
3034
include("gentypes.jl")
3135
include("rospy.jl")
3236
include("pubsub.jl")
3337
include("services.jl")
38+
include("callbacks.jl")
3439

3540
end

src/callbacks.jl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#This function will run in a new python thread created by rospy.
2+
#No julia allocation allowed.
3+
function _callback_notify(handle::Ptr{Void})
4+
ccall(:uv_async_send, Cint, (Ptr{Void},), handle)
5+
end
6+
7+
const CB_NOTIFY_PTR = cfunction(_callback_notify, Cint, (Ptr{Void},))
8+
9+
function _callback_async_loop(rosobj, cond)
10+
@debug("Spinning up callback loop...")
11+
while ! is_shutdown()
12+
wait(cond)
13+
_run_callbacks(rosobj)
14+
end
15+
@debug("Exiting callback loop")
16+
end
17+
18+
function _run_callbacks{M}(sub::Subscriber{M})
19+
while pycall(sub.queue["size"], PyAny) > 0
20+
msg = pycall(sub.queue["get"], PyObject)
21+
sub.callback(convert(M, msg), sub.callback_args...)
22+
end
23+
end
24+
25+
function _run_callbacks{T}(srv::Service{T})
26+
ReqType = _srv_reqtype(T)
27+
28+
req = pycall(srv.cb_interface["get_request"], PyObject)
29+
response = srv.handler(convert(ReqType, req))
30+
31+
#Python callback is blocking until the response is ready
32+
pycall(srv.cb_interface["set_response"], PyAny, convert(PyObject, response))
33+
end

src/debug.jl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ _debug_output = false
33
_debug_indent = 0
44
debug(d::Bool) = global _debug_output = d
55
macro debug(expr, other...)
6-
:(if _debug_output
6+
esc(:(if _debug_output
77
print(repeat("\t", _debug_indent))
8-
println($expr,$(other...))
9-
end)
8+
println($expr,$(other...))
9+
end))
1010
end
1111
macro debug_addindent()
12-
:(global _debug_indent += 1)
12+
esc(:(global _debug_indent += 1))
1313
end
1414
macro debug_subindent()
15-
:(global _debug_indent -= 1)
15+
esc(:(global _debug_indent -= 1))
1616
end
1717

src/pubsub.jl

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,31 @@ function publish{MsgType<:MsgT}(p::Publisher{MsgType}, msg::MsgType)
1717
pycall(p.o["publish"], PyAny, convert(PyObject, msg))
1818
end
1919

20-
if _threads_enabled() #callbacks are broken
21-
22-
type Subscriber{T}
23-
end
24-
Subscriber(args...) = error(
25-
"""Subscribing to a topic is currently broken on julia v0.5 and above. See
26-
https://github.com/jdlangs/RobotOS.jl/issues/15 for ongoing efforts to fix this.""")
27-
28-
else #callbacks not broken
29-
3020
type Subscriber{MsgType<:MsgT}
31-
o::PyObject
3221
callback
22+
callback_args::Tuple
23+
sub_obj::PyObject
24+
queue::PyObject
25+
async_loop::Task
3326

3427
function Subscriber(topic::AbstractString, cb, cb_args::Tuple=(); kwargs...)
3528
@debug("Creating <$(string(MsgType))> subscriber on topic: '$topic'")
3629
rospycls = _get_rospy_class(MsgType)
37-
jl_cb(msg::PyObject) = cb(convert(MsgType, msg), cb_args...)
38-
return new(
39-
__rospy__[:Subscriber](ascii(topic), rospycls, jl_cb; kwargs...),
40-
jl_cb
41-
)
30+
31+
cond = Base.AsyncCondition()
32+
mqueue = _py_ros_callbacks["MessageQueue"](CB_NOTIFY_PTR, cond.handle)
33+
subobj = __rospy__[:Subscriber](ascii(topic), rospycls, mqueue["storemsg"]; kwargs...)
34+
35+
rosobj = new(cb, cb_args, subobj, mqueue)
36+
cbloop = Task(() -> _callback_async_loop(rosobj, cond))
37+
schedule(cbloop)
38+
39+
rosobj.async_loop = cbloop
40+
return rosobj
4241
end
4342
end
4443

45-
Subscriber{MsgType<:MsgT}(
46-
topic::AbstractString,
47-
::Type{MsgType},
48-
cb,
49-
cb_args::Tuple=();
50-
kwargs...
51-
) = Subscriber{MsgType}(ascii(topic), cb, cb_args; kwargs...)
44+
function Subscriber{MsgType<:MsgT}(topic, ::Type{MsgType}, cb, cb_args::Tuple=(); kwargs...)
45+
Subscriber{MsgType}(topic, cb, cb_args; kwargs...)
46+
end
5247

53-
end #check

src/ros_callbacks.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#Python 2/3 compatibility with 3 style code
2+
from __future__ import absolute_import, division, print_function, unicode_literals
3+
__metaclass__ = type
4+
5+
import sys
6+
import ctypes
7+
import threading
8+
try:
9+
import queue
10+
except ImportError:
11+
import Queue as queue
12+
13+
class MessageQueue:
14+
"Queue up received messages and invoke notification to run callback"
15+
def __init__(self, cbptr, notify_handle):
16+
CBType = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p)
17+
self._cb_notify = CBType(cbptr.value)
18+
self._notify_handle = notify_handle
19+
20+
self._queue = queue.Queue()
21+
22+
def storemsg(self, msg):
23+
self._queue.put(msg)
24+
self._cb_notify(self._notify_handle)
25+
26+
def size(self):
27+
return self._queue.qsize()
28+
29+
def get(self):
30+
return self._queue.get()
31+
32+
class ServiceCallback:
33+
def __init__(self, cbptr, notify_handle):
34+
CBType = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p)
35+
self._cb_notify = CBType(cbptr.value)
36+
self._notify_handle = notify_handle
37+
38+
self._response = None
39+
self._hasresponse = threading.Condition()
40+
41+
def srv_cb(self, srvreq):
42+
"Store received service request and block until Julia callback completes"
43+
self._hasresponse.acquire()
44+
self._request = srvreq
45+
self._cb_notify(self._notify_handle)
46+
47+
#wait for the julia callback
48+
self._hasresponse.wait()
49+
self._hasresponse.release()
50+
return self._response
51+
52+
def get_request(self):
53+
return self._request
54+
55+
def set_response(self, resp):
56+
self._response = resp
57+
self._hasresponse.acquire()
58+
self._hasresponse.notify()
59+
self._hasresponse.release()

src/rospy.jl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,18 @@ export init_node, is_shutdown, spin,
66
#General rospy functions
77
init_node(name::AbstractString; args...) =
88
__rospy__[:init_node](ascii(name); args...)
9-
spin() = __rospy__[:spin]()
109
is_shutdown() = __rospy__[:is_shutdown]()
1110
get_published_topics() = __rospy__[:get_published_topics]()
1211
get_ros_root() = __rospy__[:get_ros_root]()
1312

13+
function spin()
14+
#Have to make sure both Julia tasks and python threads can wake up so
15+
#can't just call rospy's spin
16+
while ! is_shutdown()
17+
rossleep(Duration(0.001))
18+
end
19+
end
20+
1421
#Parameter server API
1522
function get_param(param_name::AbstractString, def=nothing)
1623
try

src/services.jl

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,36 @@ end
3232
resp
3333
end
3434

35-
if _threads_enabled() #callbacks are broken
36-
37-
type Service{T}
38-
end
39-
40-
Service(args...) = error(
41-
"""Providing a service is currently broken on julia v0.5 and above. See
42-
https://github.com/jdlangs/RobotOS.jl/issues/15 for ongoing efforts to fix this.""")
43-
44-
else #callbacks not broken
45-
4635
type Service{SrvType <: ServiceDefinition}
47-
o::PyObject
48-
jl_handler
36+
handler
37+
srv_obj::PyObject
38+
cb_interface::PyObject
39+
async_loop::Task
4940

5041
function Service(name::AbstractString, handler; kwargs...)
5142
@debug("Providing <$SrvType> service at '$name'")
5243
rospycls = _get_rospy_class(SrvType)
53-
ReqType = _srv_reqtype(SrvType)
54-
jl_hndl(req::PyObject) =
55-
convert(PyObject, handler(convert(ReqType,req)))
56-
try
57-
new(__rospy__[:Service](ascii(name), rospycls, jl_hndl; kwargs...),
58-
jl_hndl
59-
)
44+
45+
cond = Base.AsyncCondition()
46+
pysrv = _py_ros_callbacks["ServiceCallback"](CB_NOTIFY_PTR, cond.handle)
47+
48+
srvobj = try
49+
__rospy__[:Service](ascii(name), rospycls, pysrv["srv_cb"]; kwargs...)
6050
catch err
6151
if isa(err, PyCall.PyError)
6252
error("Problem during service creation: $(err.val[:args][1])")
6353
else
6454
rethrow(err)
6555
end
6656
end
57+
58+
rosobj = new(handler, srvobj, pysrv)
59+
60+
cbloop = Task(() -> _callback_async_loop(rosobj, cond))
61+
schedule(cbloop)
62+
63+
rosobj.async_loop = cbloop
64+
return rosobj
6765
end
6866
end
6967
function Service{SrvType<:ServiceDefinition}(
@@ -75,8 +73,6 @@ function Service{SrvType<:ServiceDefinition}(
7573
Service{SrvType}(ascii(name), handler; kwargs...)
7674
end
7775

78-
end #check
79-
8076
function wait_for_service(service::AbstractString; kwargs...)
8177
try
8278
__rospy__[:wait_for_service](ascii(service); kwargs...)

src/time.jl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,12 @@ end
7979
now() = get_rostime()
8080

8181
function rossleep(td::Duration)
82+
#Busy sleep loop needed to allow both julia and python async activity
8283
tnsecs = to_nsec(td)
8384
t0 = time_ns()
8485
while time_ns()-t0 < tnsecs
85-
yield()
86-
__rospy__[:sleep](0.001)
86+
yield() #Allow julia callback loops to run
87+
__rospy__[:sleep](0.001) #Allow rospy comm threads to run
8788
end
8889
end
8990
rossleep(t::Real) = rossleep(Duration(t))

test/pubsub.jl

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,13 @@ function pose_cb(msg::PoseStamped, msgs::Vector{PoseStamped})
3636
end
3737
pose_cb(PoseStamped(), msgs) #warm up run
3838

39-
if RobotOS._threads_enabled() #callbacks are broken
40-
41-
warn("Not testing subscriber!")
42-
43-
publish_messages(ros_pub, refs, 20.0)
44-
rossleep(Duration(5.0))
45-
Nreceived = get_param("/num_received_messages")
46-
@test Nreceived == length(refs)
47-
set_param("/num_received_messages", 0)
48-
49-
else #callbacks not broken
50-
5139
const ros_sub = Subscriber("poses", PoseStamped, pose_cb, (msgs,), queue_size = 10)
5240

5341
#First message doesn't go out for some reason
5442
publish(ros_pub, Vector3(1.1,2.2,3.3))
5543
rossleep(Duration(1.0))
44+
45+
#Test messages
5646
publish_messages(ros_pub, refs, 20.0)
5747
rossleep(Duration(1.0))
5848

@@ -65,5 +55,3 @@ for i=1:Nmsgs
6555
@test_approx_eq msgs[i].pose.position.z refs[i].z
6656
end
6757
empty!(msgs)
68-
69-
end #check

test/services.jl

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ const srvcall = ServiceProxy("callme", SetBool)
88
println("Waiting for 'callme' service...")
99
wait_for_service("callme")
1010

11+
const flag = Bool[false]
12+
const Nposes = 5
13+
1114
function srv_cb(req::GetPlanRequest)
1215
println("GetPlan call received")
1316
@test_approx_eq(req.start.pose.position.x, 1.0)
@@ -24,23 +27,6 @@ function srv_cb(req::GetPlanRequest)
2427
return resp
2528
end
2629

27-
if RobotOS._threads_enabled() #callbacks are broken
28-
29-
warn("Not testing service provider!")
30-
31-
println("Calling service...")
32-
srvcall(SetBoolRequest(false))
33-
rossleep(Duration(5.0))
34-
35-
didcall = get_param("/received_service_call")
36-
@test didcall
37-
set_param("/received_service_call", false)
38-
39-
else #callbacks not broken
40-
41-
const flag = Bool[false]
42-
const Nposes = 5
43-
4430
const srvlisten = Service("getplan", GetPlan, srv_cb)
4531

4632
println("Calling service...")
@@ -64,8 +50,6 @@ if flag[1]
6450
end
6551
empty!(msgs)
6652

67-
end #check
68-
6953
#Test error handling
7054
@test_throws ErrorException wait_for_service("fake_srv", timeout=1.0)
7155
@test_throws ArgumentError srvcall(SetBoolResponse())

0 commit comments

Comments
 (0)