Skip to content

Commit 655ac4a

Browse files
committed
allow using tcp keepalives instead of heartbeats
AMQP allows heartbeats to be disabled completely, provided the server allows that. But it is essential to have an alternative to detect network disconnections and stale connections. In this PR, we switch on TCP keepalives on the connection (optional but enabled by default) and allow heartbeats to be switched off. Heartbeats are still enabled by default though. The `connection` method has these new parameters to allow that: ```julia connection(; heartbeat = true, keepalive = DEFAULT_KEEPALIVE_SECS, ) ``` - `heartbeat`: `true` to enable heartbeat, `false` to disable. Can also be set to a positive integer, in which case it is the heartbeat interval in seconds. Defaults to `true`. If `false`, ensure `keepalive` is enabled to detect dead connections. This parameter is negotiated with the server. - `keepalive`: `true` to enable TCP keepalives, `false` to disable. Can also be set to a positive integer, in which case it is the keepalive interval in seconds. Defaults to `DEFAULT_KEEPALIVE_SECS`.
1 parent 6145773 commit 655ac4a

File tree

3 files changed

+160
-23
lines changed

3 files changed

+160
-23
lines changed

src/protocol.jl

Lines changed: 105 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,19 @@ const CONN_STATE_OPENING = 1
164164
const CONN_STATE_OPEN = 2
165165
const CONN_STATE_CLOSING = 3
166166
const CONN_MAX_QUEUED = 1024 #typemax(Int)
167+
const DEFAULT_KEEPALIVE_SECS = 60
167168

168169
abstract type AbstractChannel end
169170

171+
function keepalive!(sock, enable::Bool; interval::Integer=DEFAULT_KEEPALIVE_SECS)
172+
@debug("setting tcp keepalive on tcp socket", enable, interval)
173+
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), sock.handle, enable, interval)
174+
if err != 0
175+
throw(AMQPProtocolException("error setting keepalive on socket to $enable with interval $interval"))
176+
end
177+
return sock
178+
end
179+
170180
mutable struct Connection
171181
virtualhost::String
172182
host::String
@@ -178,6 +188,9 @@ mutable struct Connection
178188
channelmax::TAMQPShortInt
179189
framemax::TAMQPLongInt
180190
heartbeat::TAMQPShortInt
191+
enable_heartbeat::Bool
192+
keepalive::Integer
193+
enable_keepalive::Bool
181194

182195
state::UInt8
183196
sendq::Channel{TAMQPGenericFrame}
@@ -191,12 +204,22 @@ mutable struct Connection
191204
heartbeat_time_server::Float64
192205
heartbeat_time_client::Float64
193206

194-
function Connection(; virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT, send_queue_size::Int=CONN_MAX_QUEUED)
207+
function Connection(;
208+
virtualhost::String="/",
209+
host::String="localhost",
210+
port::Int=AMQP_DEFAULT_PORT,
211+
send_queue_size::Int=CONN_MAX_QUEUED,
212+
heartbeat::Integer=0,
213+
enable_heartbeat::Bool=true,
214+
keepalive::Integer=DEFAULT_KEEPALIVE_SECS,
215+
enable_keepalive::Bool=true,
216+
)
195217
sendq = Channel{TAMQPGenericFrame}(send_queue_size)
196218
sendlck = Channel{UInt8}(1)
197219
put!(sendlck, 1)
198220
new(virtualhost, host, port, nothing,
199-
Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0, 0,
221+
Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0,
222+
heartbeat, enable_heartbeat, keepalive, enable_keepalive,
200223
CONN_STATE_CLOSED, sendq, sendlck, Dict{TAMQPChannel, AbstractChannel}(),
201224
nothing, nothing, nothing,
202225
0.0, 0.0)
@@ -502,6 +525,21 @@ function find_unused_channel(c::Connection)
502525
end
503526
throw(AMQPClientException("No free channel available (max: $maxid)"))
504527
end
528+
529+
"""
530+
channel(conn, id, create)
531+
channel(f, args...)
532+
533+
Create or return an existing a channel object.
534+
Multiple channels can be multiplexed over a single connection.
535+
Can be used with the Julia do block syntax to create a channel and close it afterwards.
536+
537+
- `conn`: The connection over which to create the channel.
538+
- `id`: Channels are identified by their numeric id. Specifying `AMQPClient.UNUSED_CHANNEL` as channel
539+
id during creation will automatically assign an unused id.
540+
- `create`: If true, a new channel will be created. Else an existing channel with the specified id
541+
will be returned.
542+
"""
505543
channel(c::MessageChannel, id::Integer) = channel(c.conn, id)
506544
channel(c::Connection, id::Integer) = c.channels[id]
507545
channel(c::MessageChannel, id::Integer, create::Bool) = channel(c.conn, id, create)
@@ -546,26 +584,83 @@ function channel(f, args...; kwargs...)
546584
end
547585
end
548586

587+
"""
588+
connection(f; kwargs...)
589+
590+
connection(;
591+
virtualhost = "/",
592+
host = "localhost",
593+
port = AMQPClient.AMQP_DEFAULT_PORT,
594+
framemax = 0,
595+
heartbeat = true,
596+
keepalive = DEFAULT_KEEPALIVE_SECS,
597+
send_queue_size = CONN_MAX_QUEUED,
598+
auth_params = AMQPClient.DEFAULT_AUTH_PARAMS,
599+
channelmax = AMQPClient.DEFAULT_CHANNELMAX,
600+
connect_timeout = AMQPClient.DEFAULT_CONNECT_TIMEOUT,
601+
amqps = nothing
602+
)
603+
604+
Creates a fresh connection to the AMQP server.
605+
Returns a connection that can be used to open channels subsequently.
606+
Can be used with the Julia do block syntax to create a connection and close it afterwards.
607+
608+
Keyword arguments:
609+
- `host`: The message server host to connect to. Defaults to "localhost".
610+
- `port`: The message server port to connect to. Defaults to the default AMQP port.
611+
- `virtualhost`: The virtual host to connect to. Defaults to "/".
612+
- `amqps`: If connection is to be done over AMQPS, the TLS options to use. See `amqps_configure`.
613+
- `connect_timeout`: TCP connect timeout to impose. Default `AMQPClient.DEFAULT_CONNECT_TIMEOUT`,
614+
- `framemax`: The maximum frame size to use. Defaults to 0, which means no limit.
615+
- `heartbeat`: `true` to enable heartbeat, `false` to disable. Can also be set to a positive integer,
616+
in which case it is the heartbeat interval in seconds. Defaults to `true`. If `false`, ensure
617+
`keepalive` is enabled to detect dead connections. This parameter is negotiated with the server.
618+
- `keepalive`: `true` to enable TCP keepalives, `false` to disable. Can also be set to a positive integer,
619+
in which case it is the keepalive interval in seconds. Defaults to `DEFAULT_KEEPALIVE_SECS`.
620+
- `send_queue_size`: Maximum number of items to buffer in memory before blocking the send API until
621+
messages are drained. Defaults to CONN_MAX_QUEUED.
622+
- `auth_params`: Parameters to use to authenticate the connection. Defaults to AMQPClient.DEFAULT_AUTH_PARAMS.
623+
- `channelmax`: Maximum channel number to impose/negotiate with the server. Defaults to AMQPClient.DEFAULT_CHANNELMAX.
624+
625+
"""
549626
function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT,
550627
framemax=0,
551-
heartbeat=0,
628+
heartbeat::Union{Int,Bool}=true,
629+
keepalive::Union{Int,Bool}=DEFAULT_KEEPALIVE_SECS,
552630
send_queue_size::Integer=CONN_MAX_QUEUED,
553631
auth_params=AMQPClient.DEFAULT_AUTH_PARAMS,
554632
channelmax::Integer=AMQPClient.DEFAULT_CHANNELMAX,
555633
connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT,
556634
amqps::Union{MbedTLS.SSLConfig,Nothing}=nothing)
557635
@debug("connecting", host, port, virtualhost)
558-
conn = Connection(; virtualhost=virtualhost, host=host, port=port, send_queue_size=send_queue_size)
636+
637+
keepalive_interval = isa(keepalive, Bool) ? DEFAULT_KEEPALIVE_SECS : keepalive
638+
enable_keepalive = isa(keepalive, Bool) ? keepalive : (keepalive_interval > 0)
639+
640+
heartbeat_interval = isa(heartbeat, Bool) ? 0 : heartbeat
641+
enable_heartbeat = isa(heartbeat, Bool) ? heartbeat : (heartbeat > 0)
642+
643+
conn = Connection(;
644+
virtualhost=virtualhost,
645+
host=host,
646+
port=port,
647+
send_queue_size=send_queue_size,
648+
heartbeat=heartbeat_interval,
649+
enable_heartbeat=enable_heartbeat,
650+
keepalive=keepalive_interval,
651+
enable_keepalive=enable_keepalive,)
559652
chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true)
560653

561654
# setup handler for Connection.Start
562-
ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat)
655+
ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat_interval)
563656
AMQPClient.handle(chan, :Connection, :Start, AMQPClient.on_connection_start, ctx)
564657

565658
# open socket and start processor tasks
566659
sock = connect(conn.host, conn.port)
567660
isdefined(Sockets, :nagle) && Sockets.nagle(sock, false)
568661
isdefined(Sockets, :quickack) && Sockets.quickack(sock, true)
662+
keepalive!(sock, enable_keepalive; interval=keepalive_interval)
663+
569664
conn.sock = (amqps !== nothing) ? setup_tls(sock, host, amqps) : sock
570665
conn.sender = @async AMQPClient.connection_processor(conn, "ConnectionSender", AMQPClient.connection_sender)
571666
conn.receiver = @async AMQPClient.connection_processor(conn, "ConnectionReceiver", AMQPClient.connection_receiver)
@@ -1119,13 +1214,15 @@ function send_connection_tune_ok(chan::MessageChannel, channelmax=0, framemax=0,
11191214

11201215
conn.channelmax = opt(channelmax, conn.channelmax)
11211216
conn.framemax = opt(framemax, conn.framemax)
1122-
conn.heartbeat = opt(heartbeat, conn.heartbeat)
1217+
conn.heartbeat = conn.enable_heartbeat ? opt(heartbeat, conn.heartbeat) : 0
11231218

11241219
@debug("send_connection_tune_ok", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat)
11251220
send(chan, TAMQPMethodPayload(:Connection, :TuneOk, (conn.channelmax, conn.framemax, conn.heartbeat)))
11261221

1127-
# start heartbeat timer
1128-
conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater)
1222+
if conn.enable_heartbeat
1223+
# start heartbeat timer
1224+
conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater)
1225+
end
11291226
nothing
11301227
end
11311228

test/runtests.jl

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,63 @@
11
using AMQPClient
2+
using Test
23

34
include("test_coverage.jl")
45
include("test_throughput.jl")
56
include("test_rpc.jl")
67

7-
AMQPTestCoverage.runtests()
8-
AMQPTestThroughput.runtests()
9-
AMQPTestRPC.runtests()
8+
@testset "AMQPClient" begin
9+
@testset "AMQP" begin
10+
@testset "Functionality" begin
11+
for keepalive in [true, false]
12+
for heartbeat in (true, false)
13+
@testset "keepalive=$keepalive,heartbeat=$heartbeat" begin
14+
AMQPTestCoverage.runtests(; keepalive=keepalive, heartbeat=heartbeat)
15+
end
16+
end
17+
end
18+
end
19+
# @testset "Throughput" begin
20+
# AMQPTestThroughput.runtests()
21+
# end
22+
# @testset "RPC" begin
23+
# AMQPTestRPC.runtests()
24+
# end
25+
end
1026

11-
if length(ARGS) > 0
12-
amqps_host = ARGS[1]
13-
virtualhost = ARGS[2]
14-
port = AMQPClient.AMQPS_DEFAULT_PORT
27+
if length(ARGS) > 0
28+
@testset "AMQPS" begin
29+
amqps_host = ARGS[1]
30+
virtualhost = ARGS[2]
31+
port = AMQPClient.AMQPS_DEFAULT_PORT
1532

16-
login = ENV["AMQPPLAIN_LOGIN"]
17-
password = ENV["AMQPPLAIN_PASSWORD"]
18-
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)
33+
login = ENV["AMQPPLAIN_LOGIN"]
34+
password = ENV["AMQPPLAIN_PASSWORD"]
35+
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)
1936

20-
AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, virtualhost=virtualhost, amqps=amqps_configure(), auth_params=auth_params)
21-
AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true)
22-
AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
37+
@testset "Functionality" begin
38+
for keepalive in [true, false]
39+
for heartbeat in (true, false)
40+
@testset "keepalive=$keepalive,heartbeat=$heartbeat" begin
41+
AMQPTestCoverage.runtests(;
42+
host=amqps_host,
43+
port=AMQPClient.AMQPS_DEFAULT_PORT,
44+
virtualhost=virtualhost,
45+
amqps=amqps_configure(),
46+
auth_params=auth_params,
47+
keepalive=keepalive,
48+
heartbeat=heartbeat)
49+
end
50+
end
51+
end
52+
end
53+
@testset "Throughput" begin
54+
AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true)
55+
end
56+
@testset "RPC" begin
57+
AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
58+
end
59+
end
60+
end
2361
end
2462

2563
exit(0)

test/test_coverage.jl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const QUEUE1 = "queue1"
1010
const ROUTE1 = "key1"
1111
const invalid_auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>randstring(10), "PASSWORD"=>randstring(10))
1212

13-
function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing)
13+
function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing, keepalive=true, heartbeat=true)
1414
verify_spec()
1515
test_types()
1616
@test default_exchange_name("direct") == "amq.direct"
@@ -24,7 +24,7 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
2424

2525
# open a connection
2626
@info("opening connection")
27-
connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512) do conn
27+
connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512, keepalive=keepalive, heartbeat=heartbeat) do conn
2828
@test conn.conn.sendq.sz_max == 512
2929

3030
# open a channel
@@ -159,6 +159,8 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
159159
end
160160
@test c.heartbeat_time_server > ts1
161161
@test c.heartbeat_time_client > tc1
162+
elseif conn.conn.heartbeat == 0
163+
@info("heartbeat disabled")
162164
else
163165
@info("not testing heartbeats (wait too long at $(3*conn.conn.heartbeat) secs)")
164166
end

0 commit comments

Comments
 (0)