Skip to content

Commit 9491451

Browse files
authored
Merge pull request #9 from JuliaComputing/tan/misc
example and test scenario for RPC client-server
2 parents ec68c4b + 8869921 commit 9491451

File tree

5 files changed

+235
-27
lines changed

5 files changed

+235
-27
lines changed

src/show.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ end
2222

2323
function show(io::IO, f::TAMQPFieldValuePair)
2424
indent = isa(io, IOContext) ? get(io, :indent, "") : ""
25-
print(indent)
25+
print(io, indent)
2626
show(io, f.name)
2727
print(io, " => ")
2828
show(io, f.val)
@@ -52,7 +52,7 @@ function show(io::IO, fields::Vector{Pair{Symbol,TAMQPField}})
5252
idx = 1
5353
for fld in fields
5454
(idx > 1) && print(ioc, '\n')
55-
print(indent)
55+
print(ioc, indent)
5656
show(ioc, fld.first)
5757
print(ioc, " => ")
5858
show(ioc, fld.second)

test/runtests.jl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
include("test_coverage.jl")
22
include("test_throughput.jl")
3+
include("test_rpc.jl")
34

4-
AMPQTestCoverage.runtests()
5-
AMPQTestThroughput.runtests()
5+
AMQPTestCoverage.runtests()
6+
AMQPTestThroughput.runtests()
7+
AMQPTestRPC.runtests()

test/test_coverage.jl

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module AMPQTestCoverage
1+
module AMQPTestCoverage
22

33
using AMQPClient
44
using Base.Test
@@ -12,6 +12,7 @@ testlog(msg) = println(msg)
1212

1313
function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS)
1414
verify_spec()
15+
test_types()
1516
@test default_exchange_name("direct") == "amq.direct"
1617
@test default_exchange_name() == ""
1718
@test AMQPClient.method_name(AMQPClient.TAMQPMethodPayload(:Basic, :Ack, (1, false))) == "Basic.Ack"
@@ -73,21 +74,21 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
7374
@test isnull(basic_get(chan1, QUEUE1, false))
7475

7576
## test reject and requeue
76-
#basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1)
77+
basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1)
7778

78-
#result = basic_get(chan1, QUEUE1, false)
79-
#@test !isnull(result)
80-
#rcvd_msg = get(result)
81-
#@test rcvd_msg.redelivered == false
79+
result = basic_get(chan1, QUEUE1, false)
80+
@test !isnull(result)
81+
rcvd_msg = get(result)
82+
@test rcvd_msg.redelivered == false
8283

83-
#basic_reject(chan1, rcvd_msg.delivery_tag; requeue=true)
84+
basic_reject(chan1, rcvd_msg.delivery_tag; requeue=true)
8485

85-
#result = basic_get(chan1, QUEUE1, false)
86-
#@test !isnull(result)
87-
#rcvd_msg = get(result)
88-
#@test rcvd_msg.redelivered == true
86+
result = basic_get(chan1, QUEUE1, false)
87+
@test !isnull(result)
88+
rcvd_msg = get(result)
89+
@test rcvd_msg.redelivered == true
8990

90-
#basic_ack(chan1, rcvd_msg.delivery_tag)
91+
basic_ack(chan1, rcvd_msg.delivery_tag)
9192

9293
testlog("testing basic consumer...")
9394
# start a consumer task
@@ -126,6 +127,7 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
126127
@test tx_commit(chan1)
127128
@test tx_rollback(chan1)
128129

130+
# test heartbeats
129131
if 120 >= conn.conn.heartbeat > 0
130132
c = conn.conn
131133
testlog("testing heartbeats (waiting $(3*c.heartbeat) secs)...")
@@ -179,4 +181,25 @@ function verify_spec()
179181
end
180182
end
181183

182-
end # module AMPQTestCoverage
184+
function test_types()
185+
d = Dict{String,Any}(
186+
"bool" => 0x1,
187+
"int" => 10,
188+
"uint" => 0x1,
189+
"float" => rand(),
190+
"shortstr" => convert(AMQPClient.TAMQPShortStr, randstring(10)),
191+
"longstr" => convert(AMQPClient.TAMQPLongStr, randstring(1024)))
192+
ft = convert(AMQPClient.TAMQPFieldTable, d)
193+
iob = IOBuffer()
194+
show(iob, ft)
195+
@test length(take!(iob)) > 0
196+
197+
fields = [Pair{Symbol,AMQPClient.TAMQPField}(:bit, AMQPClient.TAMQPBit(0x1)),
198+
Pair{Symbol,AMQPClient.TAMQPField}(:shortstr, convert(AMQPClient.TAMQPShortStr, randstring(10))),
199+
Pair{Symbol,AMQPClient.TAMQPField}(:longstr, convert(AMQPClient.TAMQPLongStr, randstring(1024))),
200+
Pair{Symbol,AMQPClient.TAMQPField}(:fieldtable, ft)]
201+
show(iob, fields)
202+
@test length(take!(iob)) > 0
203+
end
204+
205+
end # module AMQPTestCoverage

test/test_rpc.jl

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
module AMQPTestRPC
2+
3+
using AMQPClient
4+
using Base.Test
5+
6+
const QUEUE_RPC = "queue_rpc"
7+
8+
testlog(msg) = println(msg)
9+
reply_queue_id = 0
10+
server_id = 0
11+
12+
const NRPC_MSGS = 100
13+
const NRPC_CLNTS = 4
14+
const NRPC_SRVRS = 4
15+
16+
function test_rpc_client(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS)
17+
# open a connection
18+
testlog("client opening connection...")
19+
conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params)
20+
21+
# open a channel
22+
testlog("client opening channel...")
23+
chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
24+
25+
# create a reply queue for a client
26+
global reply_queue_id
27+
reply_queue_id += 1
28+
queue_name = QUEUE_RPC * "_" * string(reply_queue_id) * "_" * string(getpid())
29+
testlog("client creating queue " * queue_name * "...")
30+
success, message_count, consumer_count = queue_declare(chan1, queue_name; exclusive=true)
31+
32+
testlog("client testing rpc...")
33+
rpc_reply_count = 0
34+
rpc_fn = (rcvd_msg) -> begin
35+
rpc_reply_count += 1
36+
37+
msg_str = String(rcvd_msg.data)
38+
println("client ", msg_str)
39+
40+
basic_ack(chan1, rcvd_msg.delivery_tag)
41+
end
42+
43+
# start a consumer task
44+
success, consumer_tag = basic_consume(chan1, queue_name, rpc_fn)
45+
@test success
46+
47+
correlation_id = 0
48+
49+
# publish NRPC_MSGS messages to the queue
50+
while correlation_id < NRPC_MSGS
51+
correlation_id += 1
52+
M = Message(convert(Vector{UInt8}, "hello from " * queue_name), content_type="text/plain", delivery_mode=PERSISTENT, reply_to=queue_name, correlation_id=string(correlation_id))
53+
basic_publish(chan1, M; exchange=default_exchange_name(), routing_key=QUEUE_RPC)
54+
# sleep a random time between 1 and 5 seconds between requests
55+
sleep(rand())
56+
end
57+
58+
while (rpc_reply_count < NRPC_MSGS)
59+
sleep(1)
60+
end
61+
62+
testlog("client closing down...")
63+
success, message_count = queue_purge(chan1, queue_name)
64+
@test success
65+
@test message_count == 0
66+
67+
@test basic_cancel(chan1, consumer_tag)
68+
69+
success, message_count = queue_delete(chan1, queue_name)
70+
@test success
71+
@test message_count == 0
72+
73+
# close channels and connection
74+
close(chan1)
75+
AMQPClient.wait_for_state(chan1, AMQPClient.CONN_STATE_CLOSED)
76+
@test !isopen(chan1)
77+
78+
close(conn)
79+
AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_CLOSED)
80+
@test !isopen(conn)
81+
82+
testlog("client done.")
83+
end
84+
85+
function test_rpc_server(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS)
86+
global server_id
87+
server_id += 1
88+
my_server_id = server_id
89+
90+
# open a connection
91+
testlog("server $my_server_id opening connection...")
92+
conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params)
93+
94+
# open a channel
95+
testlog("server $my_server_id opening channel...")
96+
chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
97+
98+
# create queues (no need to bind if we are using the default exchange)
99+
testlog("server $my_server_id creating queues...")
100+
# this is the callback queue
101+
success, message_count, consumer_count = queue_declare(chan1, QUEUE_RPC)
102+
@test success
103+
104+
# test RPC
105+
testlog("server $my_server_id testing rpc...")
106+
global rpc_count = 0
107+
rpc_fn = (rcvd_msg) -> begin
108+
global rpc_count
109+
rpc_count += 1
110+
111+
@test :reply_to in keys(rcvd_msg.properties)
112+
reply_to = convert(String, rcvd_msg.properties[:reply_to])
113+
correlation_id = convert(String, rcvd_msg.properties[:correlation_id])
114+
115+
resp_str = "$(my_server_id) received msg $(rpc_count) - $(reply_to): $(String(rcvd_msg.data))"
116+
println("server ", resp_str)
117+
118+
M = Message(convert(Vector{UInt8}, resp_str), content_type="text/plain", delivery_mode=PERSISTENT, correlation_id=correlation_id)
119+
basic_publish(chan1, M; exchange=default_exchange_name(), routing_key=reply_to)
120+
121+
basic_ack(chan1, rcvd_msg.delivery_tag)
122+
end
123+
124+
# start a consumer task
125+
success, consumer_tag = basic_consume(chan1, QUEUE_RPC, rpc_fn)
126+
@test success
127+
128+
while (rpc_count < NRPC_MSGS*NRPC_CLNTS)
129+
sleep(1)
130+
end
131+
132+
testlog("server $my_server_id closing down...")
133+
success, message_count = queue_purge(chan1, QUEUE_RPC)
134+
@test success
135+
@test message_count == 0
136+
137+
@test basic_cancel(chan1, consumer_tag)
138+
139+
success, message_count = queue_delete(chan1, QUEUE_RPC)
140+
@test success
141+
@test message_count == 0
142+
testlog("server $my_server_id deleted rpc queue")
143+
144+
# close channels and connection
145+
close(chan1)
146+
AMQPClient.wait_for_state(chan1, AMQPClient.CONN_STATE_CLOSED)
147+
@test !isopen(chan1)
148+
149+
close(conn)
150+
AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_CLOSED)
151+
@test !isopen(conn)
152+
153+
testlog("server $my_server_id done.")
154+
nothing
155+
end
156+
157+
function runtests()
158+
testlog("testing multiple client server rpc")
159+
clients = Vector{Task}()
160+
servers = Vector{Task}()
161+
162+
for idx in 1:NRPC_SRVRS
163+
push!(servers, @async test_rpc_server())
164+
end
165+
166+
for idx in 1:NRPC_CLNTS
167+
push!(clients, @async test_rpc_client())
168+
end
169+
170+
tasks_active = NRPC_CLNTS + NRPC_SRVRS
171+
while tasks_active > 0
172+
tasks_active = 0
173+
for idx in 1:NRPC_SRVRS
174+
istaskdone(servers[idx]) || (tasks_active += 1)
175+
end
176+
for idx in 1:NRPC_CLNTS
177+
istaskdone(clients[idx]) || (tasks_active += 1)
178+
end
179+
sleep(5)
180+
end
181+
testlog("done")
182+
end
183+
end # module AMQPTestRPC

test/test_throughput.jl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module AMPQTestThroughput
1+
module AMQPTestThroughput
22

33
using AMQPClient
44
using Base.Test
@@ -107,18 +107,18 @@ function consume(conn, chan1)
107107
end
108108

109109
function run_publisher()
110-
conn, chan1 = AMPQTestThroughput.setup()
111-
AMPQTestThroughput.publish(conn, chan1)
112-
AMPQTestThroughput.teardown(conn, chan1, false) # exit without destroying queue
110+
conn, chan1 = AMQPTestThroughput.setup()
111+
AMQPTestThroughput.publish(conn, chan1)
112+
AMQPTestThroughput.teardown(conn, chan1, false) # exit without destroying queue
113113
nothing
114114
end
115115

116116
function run_consumer()
117-
conn, chan1 = AMPQTestThroughput.setup()
118-
AMPQTestThroughput.consume(conn, chan1)
117+
conn, chan1 = AMQPTestThroughput.setup()
118+
AMQPTestThroughput.consume(conn, chan1)
119119
println("waiting for publisher to exit gracefully...")
120120
sleep(10) # wait for publisher to exit gracefully
121-
AMPQTestThroughput.teardown(conn, chan1, true)
121+
AMQPTestThroughput.teardown(conn, chan1, true)
122122
nothing
123123
end
124124

@@ -147,7 +147,7 @@ function runtests()
147147
nothing
148148
end
149149

150-
end # module AMPQTestThroughput
150+
end # module AMQPTestThroughput
151151

152-
!isempty(ARGS) && (ARGS[1] == "--runpublisher") && AMPQTestThroughput.run_publisher()
153-
!isempty(ARGS) && (ARGS[1] == "--runconsumer") && AMPQTestThroughput.run_consumer()
152+
!isempty(ARGS) && (ARGS[1] == "--runpublisher") && AMQPTestThroughput.run_publisher()
153+
!isempty(ARGS) && (ARGS[1] == "--runconsumer") && AMQPTestThroughput.run_consumer()

0 commit comments

Comments
 (0)