Skip to content

Commit fa0540b

Browse files
georgiy-belyaninTotktonada
authored andcommitted
connpool: reconnect to recent instances on error
This patch introduces logic that makes Tarantool connpool automatically reconnect to recently accessed instances in case of errors. The main aim is to make the connection pool methods `connpool.call()` and `connpool.filter()` faster by making they behave as follows when they need a connection to multiple instances. * Run some of the mentioned connpool methods. * Apply static configuration filters and find instance candidates. * Check if there are recent connections to all of the candidates. - Yes, there are connections to all of them (both active and broken). In that case, try to use the active and available ones. If the connection is not available, we may guarantee that it has been also unavailable during past 3 seconds (hard-coded reconnect after interval). That means there is no need to wait and try to reconnect to it. - No, some instances have not been accessed yet. In that case, connect to all of the remaining in parallel and wait until the connect is established/failed. The key improvement is that we no longer wait if some of the candidate connections have failed and re-use the existing connections. This logic will be added in the follow-up patch. Part of tarantool#10330 NO_DOC=will be added later
1 parent 06502b1 commit fa0540b

File tree

3 files changed

+186
-0
lines changed

3 files changed

+186
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
## feature/connpool
2+
3+
* The `experimental.connpool` methods now try to reconnect to recently accessed
4+
instances when they become unavailable. Reconnect attempts happen after a
5+
constant interval and are stopped if the instance is no longer needed.

src/box/lua/connpool.lua

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ local netbox = require('net.box')
88
local WATCHER_DELAY = 0.1
99
local WATCHER_TIMEOUT = 10
1010

11+
-- This option controls delay between reconnect attempts to
12+
-- recently needed instances which connections have failed.
13+
local RECONNECT_AFTER = 3
14+
1115
-- {{{ Basic instance connection pool
1216

1317
local pool_methods = {}
@@ -75,6 +79,86 @@ function pool_methods._unused_connection_watchdog_wake(self)
7579
self._unused_connection_watchdog_fiber = f
7680
end
7781

82+
function pool_methods._failed_connection_watchdog_step(self)
83+
local now = clock.monotonic()
84+
local until_next_reconnect = math.huge
85+
local instance_names_to_reconnect = {}
86+
87+
-- At first, collect instances to reconnect and then
88+
-- perform actual reconnect not to modify self._connections
89+
-- in-place.
90+
for name, conn in pairs(self._connections) do
91+
if conn._reconnect ~= nil then
92+
local until_reconnect = conn._reconnect - now
93+
94+
if until_reconnect <= 0 then
95+
table.insert(instance_names_to_reconnect, name)
96+
elseif until_reconnect < until_next_reconnect then
97+
until_next_reconnect = until_reconnect
98+
end
99+
end
100+
end
101+
102+
for _, instance_name in ipairs(instance_names_to_reconnect) do
103+
local old_conn = self._connections[instance_name]
104+
105+
-- The connection has not been nil when collected and
106+
-- the function did not yield. It should remain non-nil.
107+
-- In other words, if the connection has not been
108+
-- closed as unused it remains so during the whole step.
109+
--
110+
-- This non-yielding logic prevents possible races
111+
-- when reconnect happens at the same time when deadline
112+
-- is reached.
113+
assert(old_conn ~= nil)
114+
115+
local opts = {
116+
ttl = 0,
117+
wait_connected = false,
118+
fetch_schema = old_conn.opts.fetch_schema,
119+
}
120+
121+
if not is_connection_valid(old_conn, opts) then
122+
local new_conn = self:connect(instance_name, opts)
123+
new_conn._reconnect = now + RECONNECT_AFTER
124+
if until_next_reconnect > RECONNECT_AFTER then
125+
until_next_reconnect = RECONNECT_AFTER
126+
end
127+
else
128+
old_conn._reconnect = nil
129+
end
130+
end
131+
132+
return until_next_reconnect
133+
end
134+
135+
function pool_methods._failed_connection_watchdog_loop(self)
136+
while true do
137+
local until_next_reconnect = self:_failed_connection_watchdog_step()
138+
139+
if until_next_reconnect == math.huge then
140+
break
141+
end
142+
143+
if not self._failed_connection_watchdog.fired then
144+
self._failed_connection_watchdog.cond:wait(until_next_reconnect)
145+
end
146+
self._failed_connection_watchdog.fired = false
147+
end
148+
end
149+
150+
function pool_methods._failed_connection_watchdog_wake(self)
151+
local f = self._failed_connection_watchdog_fiber
152+
if f ~= nil and f:status() ~= 'dead' then
153+
self._failed_connection_watchdog.fired = true
154+
self._failed_connection_watchdog.cond:signal()
155+
return
156+
end
157+
158+
f = fiber.new(self._failed_connection_watchdog_loop, self)
159+
self._failed_connection_watchdog_fiber = f
160+
end
161+
78162
--- Connect to an instance or receive a cached connection by
79163
--- name.
80164
---
@@ -91,6 +175,7 @@ function pool_methods.connect(self, instance_name, opts)
91175

92176
local conn = self._connections[instance_name]
93177
local old_deadline = (conn or {})._deadline
178+
local old_reconnect = (conn or {})._reconnect
94179
if not is_connection_valid(conn, opts) then
95180
local uri = config:instance_uri('peer', {instance = instance_name})
96181
if uri == nil then
@@ -118,11 +203,17 @@ function pool_methods.connect(self, instance_name, opts)
118203
end
119204
conn.mode = mode
120205
conn._deadline = old_deadline
206+
conn._reconnect = old_reconnect
121207
local function watch_status(_key, value)
122208
conn._mode = value.is_ro and 'ro' or 'rw'
123209
self._connection_mode_update_cond:broadcast()
124210
end
125211
conn:watch('box.status', watch_status)
212+
local function on_disconnect()
213+
conn._reconnect = clock.monotonic() + RECONNECT_AFTER
214+
self:_failed_connection_watchdog_wake()
215+
end
216+
conn:on_disconnect(on_disconnect)
126217
end
127218

128219
local idle_timeout = opts.ttl or self._idle_timeout
@@ -227,6 +318,13 @@ local function create_pool()
227318
cond = fiber.cond(),
228319
},
229320

321+
-- Failed connection management
322+
_failed_connection_watchdog_fiber = nil,
323+
_failed_connection_watchdog = {
324+
fired = false,
325+
cond = fiber.cond(),
326+
},
327+
230328
_idle_timeout = 60,
231329
}, pool_mt)
232330
end

test/config-luatest/rpc_test.lua

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
-- tags: parallel
22

33
local t = require('luatest')
4+
local fiber = require('fiber')
45
local fun = require('fun')
56
local treegen = require('luatest.treegen')
67
local server = require('luatest.server')
@@ -1222,3 +1223,85 @@ g.test_closes_unused_connections = function()
12221223
t.assert_equals(_G.connects, 4)
12231224
end)
12241225
end
1226+
1227+
g.test_tries_to_reconnect = function()
1228+
local config = cbuilder:new()
1229+
:set_global_option('credentials.users.myuser',
1230+
{password = 'secret',
1231+
roles = { 'replication' },
1232+
privileges = {{permissions = {'execute'}, universe = true}}})
1233+
:set_global_option('iproto.advertise.peer.login', 'myuser')
1234+
:add_instance('i-001', { database = { mode = 'rw' } })
1235+
:add_instance('i-002', {})
1236+
:config()
1237+
1238+
local cluster = cluster:new(config)
1239+
1240+
-- The reconnect after interval for connpool is hardcoded in
1241+
-- connpool and equals to 3 seconds.
1242+
local reconnect_after = 3
1243+
1244+
-- Add a counter to count netbox.connect() calls.
1245+
treegen.write_file(cluster._dir, 'override/net/box.lua',
1246+
string.dump(function()
1247+
local loaders = require('internal.loaders')
1248+
1249+
rawset(_G, 'connects', 0)
1250+
1251+
local builtin_netbox = loaders.builtin['net.box']
1252+
local builtin_connect = builtin_netbox.connect
1253+
builtin_netbox.connect = function(...)
1254+
_G.connects = _G.connects + 1
1255+
return builtin_connect(...)
1256+
end
1257+
1258+
return builtin_netbox
1259+
end))
1260+
1261+
cluster:start()
1262+
1263+
cluster['i-001']:exec(function()
1264+
local connpool = require('experimental.connpool')
1265+
1266+
connpool.call('box.info', nil, {mode = 'ro'})
1267+
t.assert_equals(_G.connects, 2)
1268+
1269+
-- Save a connection to i-002.
1270+
rawset(_G, 'i2_conn', connpool.connect('i-002'))
1271+
end)
1272+
1273+
cluster['i-002']:stop()
1274+
1275+
-- The connpool should try to reconnect.
1276+
-- Let's skip the first attempt to make sure it tries
1277+
-- more than once.
1278+
fiber.sleep(reconnect_after + 1)
1279+
1280+
cluster['i-001']:exec(function()
1281+
local reconnect_after = 3
1282+
-- Check the saved connection is broken since i-002
1283+
-- has been stopped.
1284+
t.helpers.retrying({timeout = reconnect_after + 1}, t.assert_equals,
1285+
_G.i2_conn.state, 'error')
1286+
end)
1287+
1288+
cluster['i-002']:start()
1289+
fiber.sleep(reconnect_after + 1)
1290+
1291+
cluster['i-001']:exec(function()
1292+
local connpool = require('experimental.connpool')
1293+
local reconnect_after = 3
1294+
1295+
-- Check reconnecting has been done in the background
1296+
-- and no new connections has been created.
1297+
local cur_connects = _G.connects
1298+
t.helpers.retrying({timeout = reconnect_after + 1}, connpool.call,
1299+
'box.info', nil, {mode = 'ro'})
1300+
t.assert_equals(_G.connects, cur_connects)
1301+
1302+
-- A new connection to i-002 is active but the saved
1303+
-- one should still remain broken.
1304+
t.assert_equals(connpool.connect('i-002').state, 'active')
1305+
t.assert_equals(_G.i2_conn.state, 'error')
1306+
end)
1307+
end

0 commit comments

Comments
 (0)