Skip to content

Commit f596fe4

Browse files
authored
Merge pull request #48 from JuliaComputing/tan/misc
reduce copying of large messages while sending
2 parents 4388aec + 2feb6a5 commit f596fe4

File tree

6 files changed

+61
-118
lines changed

6 files changed

+61
-118
lines changed

.travis.yml

Lines changed: 0 additions & 34 deletions
This file was deleted.

appveyor.yml

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/protocol.jl

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -287,55 +287,49 @@ isopen(c::MessageChannel) = isopen(c.conn) && (c.id in keys(c.conn.channels))
287287
get_property(c::MessageChannel, s::Symbol, default) = get_property(c.conn, s, default)
288288
get_property(c::Connection, s::Symbol, default) = get(c.properties, s, default)
289289

290-
send(c::MessageChannel, f, msgframes::Vector=[]) = send(c.conn, f, msgframes)
291-
function send(c::Connection, f, msgframes::Vector=[])
292-
#uncomment to enable synchronization (not required till we have preemptive tasks or threads)
293-
@debug("queing messageframes", nframes=length(msgframes))
294-
lck = take!(c.sendlck)
290+
with_sendlock(f, c::MessageChannel) = with_sendlock(f, c.conn)
291+
with_sendlock(f, c::Connection) = with_sendlock(f, c.sendlck)
292+
function with_sendlock(f, sendlck::Channel{UInt8})
293+
lck = take!(sendlck)
295294
try
296-
put!(c.sendq, TAMQPGenericFrame(f))
297-
for m in msgframes
298-
put!(c.sendq, TAMQPGenericFrame(m))
299-
end
295+
f()
300296
finally
301-
@debug("queued messageframes", nqueued=length(c.sendq.data))
302-
put!(c.sendlck, lck)
297+
put!(sendlck, lck)
303298
end
304-
nothing
305299
end
306-
function send(c::MessageChannel, payload::TAMQPMethodPayload, msg::Union{Message, Nothing}=nothing)
307-
@debug("sending", methodname=method_name(payload), hascontent=(msg !== nothing))
300+
send(c::MessageChannel, f) = send(c.conn, f)
301+
send(c::Connection, f) = put!(c.sendq, TAMQPGenericFrame(f))
302+
function send(c::MessageChannel, payload::TAMQPMethodPayload)
303+
@debug("sending without content", methodname=method_name(payload))
304+
frameprop = TAMQPFrameProperties(c.id,0)
305+
send(c, TAMQPMethodFrame(frameprop, payload))
306+
end
307+
function send(c::MessageChannel, payload::TAMQPMethodPayload, msg::Message)
308+
@debug("sending with content", methodname=method_name(payload))
308309
frameprop = TAMQPFrameProperties(c.id,0)
309-
if msg !== nothing
310-
msgframes = []
311-
message = msg
310+
framemax = c.conn.framemax
311+
if framemax <= 0
312+
errormsg = (c.conn.state == CONN_STATE_OPEN) ? "Unexpected framemax ($framemax) value for connection" : "Connection closed"
313+
throw(AMQPClientException(errormsg))
314+
end
312315

313-
# send message header frame
314-
hdrpayload = TAMQPHeaderPayload(payload.class, message)
315-
push!(msgframes, TAMQPContentHeaderFrame(frameprop, hdrpayload))
316+
with_sendlock(c) do
317+
send(c, TAMQPMethodFrame(frameprop, payload))
318+
hdrpayload = TAMQPHeaderPayload(payload.class, msg)
319+
send(c, TAMQPContentHeaderFrame(frameprop, hdrpayload))
316320

317321
# send one or more message body frames
318322
offset = 1
319-
msglen = length(message.data)
320-
framemax = c.conn.framemax
321-
if framemax <= 0
322-
errormsg = (c.conn.state == CONN_STATE_OPEN) ? "Unexpected framemax ($framemax) value for connection" : "Connection closed"
323-
throw(AMQPClientException(errormsg))
324-
end
325-
323+
msglen = length(msg.data)
324+
@debug("sending message with content body", msglen)
326325
while offset <= msglen
327326
msgend = min(msglen, offset + framemax - 1)
328-
bodypayload = TAMQPBodyPayload(message.data[offset:msgend])
327+
bodypayload = TAMQPBodyPayload(msg.data[offset:msgend])
329328
offset = msgend + 1
330-
@debug("sending", msglen, offset)
331-
push!(msgframes, TAMQPContentBodyFrame(frameprop, bodypayload))
329+
@debug("sending content body frame", msglen, offset)
330+
send(c, TAMQPContentBodyFrame(frameprop, bodypayload))
332331
end
333-
334-
send(c, TAMQPMethodFrame(frameprop, payload), msgframes)
335-
else
336-
send(c, TAMQPMethodFrame(frameprop, payload))
337332
end
338-
@debug("sent", methodname=method_name(payload))
339333
end
340334

341335
# ----------------------------------------

src/types.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ end
7676
TAMQPFieldValue(v::T) where {T} = TAMQPFieldValue{T}(FieldIndicatorMap[T], v)
7777
TAMQPFieldValue(v::Dict) = TAMQPFieldValue(TAMQPFieldTable(v))
7878
TAMQPFieldValue(v::String) = TAMQPFieldValue(TAMQPLongStr(v))
79+
TAMQPFieldValue(v::Bool) = TAMQPFieldValue('b', TAMQPBool(v))
7980

8081
struct TAMQPFieldValuePair{T <: TAMQPFV}
8182
name::TAMQPFieldName

test/runtests.jl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ include("test_rpc.jl")
1616
end
1717
end
1818
end
19-
# @testset "Throughput" begin
20-
# AMQPTestThroughput.runtests()
21-
# end
22-
# @testset "RPC" begin
23-
# AMQPTestRPC.runtests()
24-
# end
19+
@testset "Throughput" begin
20+
AMQPTestThroughput.runtests()
21+
end
22+
@testset "RPC" begin
23+
AMQPTestRPC.runtests()
24+
end
2525
end
2626

2727
if length(ARGS) > 0

test/test_coverage.jl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,31 @@ function test_types()
239239
mframe = AMQPClient.TAMQPMethodFrame(mfprop, mpayload)
240240
show(iob, mframe)
241241
@test length(take!(iob)) > 0
242+
243+
fields = AMQPClient.TAMQPFieldValue[
244+
AMQPClient.TAMQPFieldValue(true),
245+
AMQPClient.TAMQPFieldValue(1.1),
246+
AMQPClient.TAMQPFieldValue(1),
247+
AMQPClient.TAMQPFieldValue("hello world"),
248+
AMQPClient.TAMQPFieldValue(Dict{String,Int}("one"=>1, "two"=>2)),
249+
]
250+
251+
fieldarray = AMQPClient.TAMQPFieldArray(fields)
252+
simplified_fields = AMQPClient.simplify(fieldarray)
253+
@test simplified_fields == Any[
254+
0x01,
255+
1.1,
256+
1,
257+
"hello world",
258+
Dict{String, Any}("two" => 2, "one" => 1)
259+
]
260+
261+
iob = PipeBuffer()
262+
write(iob, hton(AMQPClient.TAMQPLongUInt(10)))
263+
write(iob, UInt8[1,2,3,4,5,6,7,8,9,0])
264+
barr = read(iob, AMQPClient.TAMQPByteArray)
265+
@test barr.len == 10
266+
@test barr.data == UInt8[1,2,3,4,5,6,7,8,9,0]
242267
end
243268

244269
end # module AMQPTestCoverage

0 commit comments

Comments
 (0)