Skip to content

Commit d8aa6ea

Browse files
authored
Merge pull request #29 from JuliaComputing/tan/misc
throw when channel is closed due to server error
2 parents 5a3805d + 0a5fbfe commit d8aa6ea

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

src/protocol.jl

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -631,13 +631,6 @@ function _wait_resp(sendmethod, chan::MessageChannel, default_result::T,
631631
result = default_result
632632
if !nowait
633633
reply = Channel{T}(1)
634-
# timer to time the request out, in case of an error
635-
t = Timer(timeout) do t
636-
try
637-
put!(reply, timeout_result)
638-
catch
639-
end
640-
end
641634
# register a callback
642635
handle(chan, resp_class, resp_meth, resp_handler, reply)
643636
end
@@ -646,7 +639,18 @@ function _wait_resp(sendmethod, chan::MessageChannel, default_result::T,
646639

647640
if !nowait
648641
# wait for response
649-
result = take!(reply)
642+
result = timeout_result
643+
if :ok === timedwait(()->(isready(reply) || !isopen(chan)), Float64(timeout); pollint=0.01)
644+
if isready(reply)
645+
result = take!(reply)
646+
else
647+
error_message = "Connection closed"
648+
if nothing !== chan.closereason
649+
error_message = string(error_message, " - ", string(chan.closereason.code), " (", convert(String, chan.closereason.msg), ")")
650+
end
651+
throw(AMQPClientException(error_message))
652+
end
653+
end
650654
close(reply)
651655
end
652656
result

test/test_coverage.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
3535
testlog("creating exchanges...")
3636
@test exchange_declare(chan1, EXCG_DIRECT, EXCHANGE_TYPE_DIRECT; arguments=Dict{String,Any}("Hello"=>"World", "Foo"=>"bar"))
3737
@test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT)
38+
# redeclaring the exchange with same attributes should be fine
39+
@test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT)
40+
# redeclaring an existing exchange with different attributes should fail
41+
@test_throws AMQPClient.AMQPClientException exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_DIRECT)
42+
43+
# must reconnect as channel gets closed after a channel exception
44+
close(chan1) # closing an already closed channel should be fine
45+
chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
46+
@test chan1.id == 1
3847

3948
# create and bind queues
4049
testlog("creating queues...")

0 commit comments

Comments
 (0)