Skip to content

Commit 203098c

Browse files
authored
Merge pull request #33 from JuliaComputing/tan/misc
check for errors while waiting for channel state
2 parents 737741e + 3a0023f commit 203098c

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

src/protocol.jl

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,13 @@ end
309309
# Async message handler framework begin
310310
# ----------------------------------------
311311
function wait_for_state(c, states; interval=1, timeout=typemax(Int))
312-
t1 = time()
313-
while !(c.state in states)
314-
((time() - t1) > timeout) && (return false)
315-
sleep(interval)
312+
timedwait(Float64(timeout); pollint=Float64(interval)) do
313+
# if we are looking for open states, and connection gets closed in the meantime, it's an error, break out
314+
conn_error = !(CONN_STATE_CLOSED in states) && (c.state == CONN_STATE_CLOSED)
315+
state_found = (c.state in states)
316+
conn_error || state_found
316317
end
317-
true
318+
c.state in states
318319
end
319320

320321
function connection_processor(c, name, fn)
@@ -512,7 +513,11 @@ function channel(c::Connection, id::Integer, create::Bool; connect_timeout=DEFAU
512513
send_channel_open(chan)
513514

514515
if !wait_for_state(chan, CONN_STATE_OPEN; timeout=connect_timeout)
515-
throw(AMQPClientException("Channel handshake failed"))
516+
error_message = "Channel handshake failed"
517+
if nothing !== chan.closereason
518+
error_message = string(error_message, " - ", string(chan.closereason.code), " (", convert(String, chan.closereason.msg), ")")
519+
end
520+
throw(AMQPClientException(error_message))
516521
end
517522
end
518523
else
@@ -562,7 +567,11 @@ function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DE
562567
flush(AMQPClient.sock(chan))
563568

564569
if !AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_OPEN; timeout=connect_timeout) || !AMQPClient.wait_for_state(chan, AMQPClient.CONN_STATE_OPEN; timeout=connect_timeout)
565-
throw(AMQPClientException("Connection handshake failed"))
570+
error_message = "Connection handshake failed"
571+
if nothing !== chan.closereason
572+
error_message = string(error_message, " - ", string(chan.closereason.code), " (", convert(String, chan.closereason.msg), ")")
573+
end
574+
throw(AMQPClientException(error_message))
566575
end
567576
chan
568577
end

test/test_coverage.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const EXCG_DIRECT = "ExcgDirect"
88
const EXCG_FANOUT = "ExcgFanout"
99
const QUEUE1 = "queue1"
1010
const ROUTE1 = "key1"
11+
const invalid_auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>randstring(10), "PASSWORD"=>randstring(10))
1112

1213
function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing)
1314
verify_spec()
@@ -16,6 +17,9 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
1617
@test default_exchange_name() == ""
1718
@test AMQPClient.method_name(AMQPClient.TAMQPMethodPayload(:Basic, :Ack, (1, false))) == "Basic.Ack"
1819

20+
# test failure on invalid auth_params
21+
@test_throws AMQPClient.AMQPClientException connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=invalid_auth_params)
22+
1923
conn_ref = nothing
2024

2125
# open a connection

0 commit comments

Comments
 (0)