Skip to content

Commit a311311

Browse files
committed
check stale conn while chunking message to send
This fixes a condition where sending a message on a stale connection can potentially get into an infinite loop while chunking a large message.
1 parent d7bc69f commit a311311

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ uuid = "79c8b4cd-a41a-55fa-907c-fab5288e1383"
33
keywords = ["amqpclient", "rabbitmq", "amqp", "amqp-client", "message-queue"]
44
license = "MIT"
55
desc = "A Julia AMQP (Advanced Message Queuing Protocol) / RabbitMQ Client."
6-
version = "0.4.2"
6+
version = "0.4.3"
77

88
[deps]
99
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"

src/protocol.jl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,13 +267,15 @@ get_property(c::Connection, s::Symbol, default) = get(c.properties, s, default)
267267
send(c::MessageChannel, f, msgframes::Vector=[]) = send(c.conn, f, msgframes)
268268
function send(c::Connection, f, msgframes::Vector=[])
269269
#uncomment to enable synchronization (not required till we have preemptive tasks or threads)
270+
@debug("queing messageframes", nframes=length(msgframes))
270271
lck = take!(c.sendlck)
271272
try
272273
put!(c.sendq, TAMQPGenericFrame(f))
273274
for m in msgframes
274275
put!(c.sendq, TAMQPGenericFrame(m))
275276
end
276277
finally
278+
@debug("queued messageframes", nqueued=length(c.sendq.data))
277279
put!(c.sendlck, lck)
278280
end
279281
nothing
@@ -292,17 +294,25 @@ function send(c::MessageChannel, payload::TAMQPMethodPayload, msg::Union{Message
292294
# send one or more message body frames
293295
offset = 1
294296
msglen = length(message.data)
297+
framemax = c.conn.framemax
298+
if framemax <= 0
299+
errormsg = (c.conn.state == CONN_STATE_OPEN) ? "Unexpected framemax ($framemax) value for connection" : "Connection closed"
300+
throw(AMQPClientException(errormsg))
301+
end
302+
295303
while offset <= msglen
296-
msgend = min(msglen, offset + c.conn.framemax - 1)
304+
msgend = min(msglen, offset + framemax - 1)
297305
bodypayload = TAMQPBodyPayload(message.data[offset:msgend])
298306
offset = msgend + 1
307+
@debug("sending", msglen, offset)
299308
push!(msgframes, TAMQPContentBodyFrame(frameprop, bodypayload))
300309
end
301310

302311
send(c, TAMQPMethodFrame(frameprop, payload), msgframes)
303312
else
304313
send(c, TAMQPMethodFrame(frameprop, payload))
305314
end
315+
@debug("sent", methodname=method_name(payload))
306316
end
307317

308318
# ----------------------------------------

0 commit comments

Comments
 (0)