Skip to content

Commit 9d3e4b9

Browse files
authored
Merge pull request #30 from JuliaComputing/tan/comm
make sendq size configurable, add flush method
2 parents d8aa6ea + 18953c1 commit 9d3e4b9

File tree

3 files changed

+24
-7
lines changed

3 files changed

+24
-7
lines changed

src/AMQPClient.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module AMQPClient
22

3-
import Base: write, read, read!, close, convert, show, isopen
3+
import Base: write, read, read!, close, convert, show, isopen, flush
44

55
using Sockets
66

@@ -26,7 +26,7 @@ export tx_select, tx_commit, tx_rollback
2626
export basic_qos, basic_consume, basic_cancel, basic_publish, basic_get, basic_ack, basic_reject, basic_recover
2727
export confirm_select
2828
export EXCHANGE_TYPE_DIRECT, EXCHANGE_TYPE_FANOUT, EXCHANGE_TYPE_TOPIC, EXCHANGE_TYPE_HEADERS
29-
export read, read!, close, convert, show
29+
export read, read!, close, convert, show, flush
3030
export Message, set_properties, PERSISTENT, NON_PERSISTENT
3131

3232
end # module

src/protocol.jl

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ mutable struct Connection
191191
heartbeat_time_server::Float64
192192
heartbeat_time_client::Float64
193193

194-
function Connection(virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT)
195-
sendq = Channel{TAMQPGenericFrame}(CONN_MAX_QUEUED)
194+
function Connection(virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT; send_queue_size::Int=CONN_MAX_QUEUED)
195+
sendq = Channel{TAMQPGenericFrame}(send_queue_size)
196196
sendlck = Channel{UInt8}(1)
197197
put!(sendlck, 1)
198198
new(virtualhost, host, port, nothing,
@@ -248,6 +248,13 @@ mutable struct MessageChannel <: AbstractChannel
248248
end
249249
end
250250

251+
flush(c::MessageChannel) = flush(c.conn)
252+
function flush(c::Connection)
253+
while isready(c.sendq) && (c.sender !== nothing) && !istaskdone(c.sender)
254+
yield()
255+
end
256+
end
257+
251258
sock(c::MessageChannel) = sock(c.conn)
252259
sock(c::Connection) = c.sock
253260

@@ -492,9 +499,15 @@ function channel(c::Connection, id::Integer, create::Bool; connect_timeout=DEFAU
492499
chan
493500
end
494501

495-
function connection(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, channelmax=AMQPClient.DEFAULT_CHANNELMAX, framemax=0, heartbeat=0, connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT)
502+
function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT,
503+
framemax=0,
504+
heartbeat=0,
505+
send_queue_size::Integer=CONN_MAX_QUEUED,
506+
auth_params=AMQPClient.DEFAULT_AUTH_PARAMS,
507+
channelmax::Integer=AMQPClient.DEFAULT_CHANNELMAX,
508+
connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT)
496509
@debug("connecting", host, port, virtualhost)
497-
conn = AMQPClient.Connection(virtualhost, host, port)
510+
conn = AMQPClient.Connection(virtualhost, host, port; send_queue_size=send_queue_size)
498511
chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true)
499512

500513
# setup handler for Connection.Start

test/test_coverage.jl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
2020

2121
# open a connection
2222
testlog("opening connection...")
23-
conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params)
23+
conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params, send_queue_size=512)
24+
@test conn.conn.sendq.sz_max == 512
2425

2526
# open a channel
2627
testlog("opening channel...")
2728
chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
2829
@test chan1.id == 1
30+
@test conn.conn.sendq.sz_max == 512
2931

3032
# test default exchange names
3133
@test default_exchange_name() == ""
@@ -63,6 +65,8 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
6365
# publish 10 messages
6466
for idx in 1:10
6567
basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1)
68+
flush(chan1)
69+
@test !isready(chan1.conn.sendq)
6670
end
6771

6872
# basic get 10 messages

0 commit comments

Comments
 (0)