Skip to content

Commit d0243ea

Browse files
mfaltnsslh
andauthored
Handle empty message (#54)
* Stop using obsolete Base.write_sub Fixes #49 * Catching messages without body Co-authored-by: Sheldon Hearn <[email protected]>
1 parent cbc9531 commit d0243ea

File tree

2 files changed

+45
-13
lines changed

2 files changed

+45
-13
lines changed

src/protocol.jl

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,11 +1374,33 @@ function on_basic_get_empty_or_ok(chan::MessageChannel, m::TAMQPMethodFrame, ctx
13741374
nothing
13751375
end
13761376

1377+
function on_channel_message_completed(chan::MessageChannel, msg::Message)
1378+
# got all data for msg
1379+
if isempty(msg.consumer_tag)
1380+
put!(chan.chan_get, pop!(chan.partial_msgs))
1381+
else
1382+
lock(chan.lck) do
1383+
if msg.consumer_tag in keys(chan.consumers)
1384+
put!(chan.consumers[msg.consumer_tag].recvq, pop!(chan.partial_msgs))
1385+
else
1386+
put!(get!(()->Channel{Message}(typemax(Int)), chan.pending_msgs, msg.consumer_tag), msg)
1387+
@debug("holding message, no consumer yet with tag", tag=msg.consumer_tag)
1388+
end
1389+
end
1390+
end
1391+
nothing
1392+
end
1393+
13771394
function on_channel_message_in(chan::MessageChannel, m::TAMQPContentHeaderFrame, ctx)
13781395
msg = last(chan.partial_msgs)
13791396
msg.properties = m.hdrpayload.proplist
13801397
msg.data = Vector{UInt8}(undef, m.hdrpayload.bodysize)
13811398
msg.filled = 0
1399+
1400+
if m.hdrpayload.bodysize == 0
1401+
# got all data for msg
1402+
on_channel_message_completed(chan, msg)
1403+
end
13821404
nothing
13831405
end
13841406

@@ -1392,18 +1414,7 @@ function on_channel_message_in(chan::MessageChannel, m::TAMQPContentBodyFrame, c
13921414

13931415
if msg.filled >= length(msg.data)
13941416
# got all data for msg
1395-
if isempty(msg.consumer_tag)
1396-
put!(chan.chan_get, pop!(chan.partial_msgs))
1397-
else
1398-
lock(chan.lck) do
1399-
if msg.consumer_tag in keys(chan.consumers)
1400-
put!(chan.consumers[msg.consumer_tag].recvq, pop!(chan.partial_msgs))
1401-
else
1402-
put!(get!(()->Channel{Message}(typemax(Int)), chan.pending_msgs, msg.consumer_tag), msg)
1403-
@debug("holding message, no consumer yet with tag", tag=msg.consumer_tag)
1404-
end
1405-
end
1406-
end
1417+
on_channel_message_completed(chan, msg)
14071418
end
14081419

14091420
nothing

test/test_coverage.jl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
116116
@test rcvd_msg.exchange == EXCG_DIRECT
117117
@test rcvd_msg.redelivered == false
118118
@test rcvd_msg.routing_key == ROUTE1
119-
@test rcvd_msg.data == M.data
120119
global msg_count
121120
msg_count += 1
121+
if msg_count <= 10
122+
@test rcvd_msg.data == M.data
123+
else
124+
@test rcvd_msg.data == UInt8[]
125+
end
122126
println("received msg $(msg_count): $(String(rcvd_msg.data))")
123127
basic_ack(chan1, rcvd_msg.delivery_tag)
124128
end
@@ -137,6 +141,23 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
137141
end
138142
@test msg_count == 10
139143

144+
@info("testing empty messages")
145+
# Test sending and receiving empty message
146+
M_empty = Message(Vector{UInt8}(), content_type="text/plain", delivery_mode=PERSISTENT)
147+
basic_publish(chan1, M_empty; exchange=EXCG_DIRECT, routing_key=ROUTE1)
148+
149+
M_no_ct = Message(Vector{UInt8}(), delivery_mode=PERSISTENT)
150+
basic_publish(chan1, M_no_ct; exchange=EXCG_DIRECT, routing_key=ROUTE1)
151+
152+
println("Waiting")
153+
# wait for a reasonable time to receive last two messages
154+
for idx in 1:5
155+
(msg_count == 12) && break
156+
sleep(1)
157+
end
158+
println("Waited")
159+
@test msg_count == 12
160+
140161
# cancel the consumer task
141162
@test basic_cancel(chan1, consumer_tag)
142163

0 commit comments

Comments
 (0)