Skip to content

Commit 587dd12

Browse files
authored
fix network byte ordering of field table (#58)
Integer fields in a field table should be transmitted with network byte ordering. This fixes read and write of field tables to enforce that. fixes #57
1 parent 5f3abd5 commit 587dd12

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

src/protocol.jl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,16 @@ function read(io::IO, ::Type{TAMQPFieldValue})
5151
c = read(io, Char)
5252
v = read(io, FieldValueIndicatorMap[c])
5353
T = FieldValueIndicatorMap[c]
54+
if T <: Integer
55+
v = ntoh(v)
56+
end
5457
TAMQPFieldValue{T}(c, v)
5558
end
5659

57-
write(io::IO, fv::TAMQPFieldValue) = write(io, fv.typ, fv.fld)
60+
function write(io::IO, fv::TAMQPFieldValue)
61+
v = isa(fv.fld, Integer) ? hton(fv.fld) : fv.fld
62+
write(io, fv.typ, v)
63+
end
5864

5965
read(io::IO, ::Type{TAMQPFieldValuePair}) = TAMQPFieldValuePair(read(io, TAMQPFieldName), read(io, TAMQPFieldValue))
6066

test/test_coverage.jl

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ const invalid_auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>r
1313
function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing, keepalive=true, heartbeat=true)
1414
verify_spec()
1515
test_types()
16+
test_queue_expire(;
17+
virtualhost=virtualhost,
18+
host=host,
19+
port=port,
20+
auth_params=auth_params,
21+
amqps=amqps,
22+
keepalive=keepalive,
23+
heartbeat=heartbeat)
24+
1625
@test default_exchange_name("direct") == "amq.direct"
1726
@test default_exchange_name() == ""
1827
@test AMQPClient.method_name(AMQPClient.TAMQPMethodPayload(:Basic, :Ack, (1, false))) == "Basic.Ack"
@@ -290,5 +299,51 @@ function test_types()
290299
@test barr.data == UInt8[1,2,3,4,5,6,7,8,9,0]
291300
end
292301

302+
function test_queue_expire(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing, keepalive=true, heartbeat=true)
303+
@info("testing create queue and queue expire with TTL")
304+
# open a connection
305+
@info("opening connection")
306+
conn_ref = nothing
307+
chan_ref = nothing
308+
connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512, keepalive=keepalive, heartbeat=heartbeat) do conn
309+
# open a channel
310+
@info("opening channel")
311+
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan1
312+
@test chan1.id == 1
313+
314+
# test queue create and expire
315+
expires_ms = 10 * 1000 # 10 seconds
316+
success, queue_name, message_count, consumer_count = queue_declare(chan1, QUEUE1, arguments=Dict{String,Any}("x-expires"=>expires_ms))
317+
@test success
318+
@test message_count == 0
319+
@test consumer_count == 0
320+
321+
exchange_name = default_exchange_name("direct")
322+
# queue bind should be successful when queue not expired
323+
@test queue_bind(chan1, QUEUE1, exchange_name, ROUTE1)
324+
325+
# wait for queue to expire, and a subsequent bind should fail
326+
sleep(2 + expires_ms/1000)
327+
@test_throws AMQPClient.AMQPClientException queue_bind(chan1, QUEUE1, exchange_name, ROUTE1)
328+
329+
chan_ref = chan1 # to do additional tests on a closed channel
330+
end
331+
332+
# close(chan_ref) # closing a closed channel should not be an issue
333+
AMQPClient.wait_for_state(chan_ref, AMQPClient.CONN_STATE_CLOSED)
334+
@test !isopen(chan_ref)
335+
336+
conn_ref = conn # to do additional tests on a closed connection
337+
end
338+
339+
# closing a closed connection should not be an issue
340+
# close(conn_ref)
341+
AMQPClient.wait_for_state(conn_ref, AMQPClient.CONN_STATE_CLOSED)
342+
@test !isopen(conn_ref)
343+
344+
@info("done")
345+
nothing
346+
end
347+
293348
end # module AMQPTestCoverage
294349

0 commit comments

Comments
 (0)