Skip to content

Commit d324811

Browse files
georgiy-belyaninTotktonada
authored andcommitted
connpool: speed up by using recent conn checks
This patch speeds up `connpool.call()` and `connpool.filter()` methods by making them not wait for instances that have been unavailable recently. Thanks to the reconnecting logic introduced previously we can be sure that if connpool was not able to reconnect to an instance recently we might continue and assume it is dead instead of waiting. A detailed description of this idea is presented within the mentioned patch. Closes tarantool#10330 @TarantoolBot document Title: connpool: add information on advanced connection management Product: Tarantool Platform `experimental.connpool` module also offers advanced automatic connection management. It automatically closes unused connections and tries to reconnect to recently used in case of an error. See also a `connpool.idle_timeout` configuration option.
1 parent 116a76b commit d324811

File tree

3 files changed

+119
-5
lines changed

3 files changed

+119
-5
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
## feature/connpool
2+
3+
* `experimental.connpool` methods `call()` and `filter()` became faster. They do
4+
not wait for unavailable instances if it is known they have been inaccessible
5+
recently.

src/box/lua/connpool.lua

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ function pool_methods.connect(self, instance_name, opts)
239239
end
240240

241241
--- This method connects to the specified instances and returns
242-
--- the set of successfully connected ones.
242+
--- the set of successfully connected ones. Instances that are
243+
--- known to have been recently unavailable are skipped.
243244
---
244245
--- If a callback accepting an instance name and returning a
245246
--- boolean value is provided in `opts.any` the method connects
@@ -279,23 +280,54 @@ function pool_methods.connect_to_multiple(self, instances, opts)
279280
local timeout = opts.timeout or WATCHER_TIMEOUT
280281
local connect_deadline = clock.monotonic() + timeout
281282

283+
-- We divide instances into three categories.
284+
-- * Available ones: these instances are already connected.
285+
-- Nothing to do with them apart from returning them.
286+
-- * Failed: the pool assumes an instance failed if it has
287+
-- been recently accessed but the connection has failed.
288+
-- Such instances are not returned and not waited. The pool
289+
-- tries to automatically reconnect to them in the
290+
-- background guaranteeing the failed status is relatively
291+
-- actual.
292+
-- * Unknown: these are either or the ones that have been
293+
-- flushed because they have not been needed for a while
294+
-- (see idle timeout and related logic) or the ones not
295+
-- accessed since startup. The pool connects to them and
296+
-- waits for success/fail.
297+
local candidate_instances = {}
282298
for _, instance_name in pairs(instances) do
283-
self:connect(instance_name, {wait_connected = false})
299+
local conn = self._connections[instance_name]
300+
301+
-- If instance has not been accessed recently (it has
302+
-- unknown state) start connecting to it. The call assume
303+
-- it is a candidate and will wait for the connection to
304+
-- fail/succeed.
305+
if conn == nil then
306+
self:connect(instance_name, {wait_connected = false})
307+
table.insert(candidate_instances, instance_name)
308+
-- If the connection is already ok it is likely it should
309+
-- be returned as is.
310+
elseif is_connection_valid(conn,
311+
{fetch_schema = (conn.opts or {}).fetch_schema}) then
312+
table.insert(candidate_instances, instance_name)
313+
-- The remaining connections are the failed ones that has
314+
-- been checked rather recently. Skip them.
315+
end
284316
end
285317

286318
local connected_instances = {}
287319
while clock.monotonic() < connect_deadline do
288-
connected_instances = fun.iter(instances)
320+
connected_instances = fun.iter(candidate_instances)
289321
:filter(is_instance_connected)
290322
:totable()
291323

292324
if opts.any then
293-
if fun.iter(instances):any(opts.any) then
325+
if fun.iter(candidate_instances):any(opts.any) then
294326
break
295327
end
296328
end
297329

298-
if fun.iter(instances):all(is_instance_checked) then
330+
if fun.iter(candidate_instances):all(is_instance_checked) then
299331
break
300332
end
301333

test/config-luatest/rpc_test.lua

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,3 +1305,80 @@ g.test_tries_to_reconnect = function()
13051305
t.assert_equals(_G.i2_conn.state, 'error')
13061306
end)
13071307
end
1308+
1309+
g.test_does_not_wait_if_recently_checked = function()
1310+
local config = cbuilder:new()
1311+
:set_global_option('credentials.users.myuser',
1312+
{password = 'secret',
1313+
roles = { 'replication' },
1314+
privileges = {{permissions = {'execute'}, universe = true}}})
1315+
:set_global_option('iproto.advertise.peer.login', 'myuser')
1316+
:add_instance('i-001', { database = { mode = 'rw' } })
1317+
:add_instance('i-002', {})
1318+
:config()
1319+
1320+
local cluster = cluster:new(config)
1321+
1322+
-- The reconnect after interval for connpool is hardcoded in
1323+
-- connpool and equals to 3 seconds.
1324+
local reconnect_after = 3
1325+
1326+
-- Add a counter to count netbox.connect() calls.
1327+
treegen.write_file(cluster._dir, 'override/net/box.lua',
1328+
string.dump(function()
1329+
local loaders = require('internal.loaders')
1330+
1331+
rawset(_G, 'connects', 0)
1332+
1333+
local builtin_netbox = loaders.builtin['net.box']
1334+
local builtin_connect = builtin_netbox.connect
1335+
builtin_netbox.connect = function(...)
1336+
_G.connects = _G.connects + 1
1337+
return builtin_connect(...)
1338+
end
1339+
1340+
return builtin_netbox
1341+
end))
1342+
1343+
cluster:start()
1344+
1345+
cluster['i-001']:exec(function()
1346+
local connpool = require('experimental.connpool')
1347+
1348+
connpool.call('box.info', nil, {mode = 'ro'})
1349+
t.assert_equals(_G.connects, 2)
1350+
1351+
-- Save a connection to i-002.
1352+
rawset(_G, 'i2_conn', connpool.connect('i-002'))
1353+
end)
1354+
1355+
cluster['i-002']:stop()
1356+
1357+
-- The connpool might try to reconnect. Let's wait
1358+
-- for one attempt.
1359+
fiber.sleep(reconnect_after + 1)
1360+
1361+
cluster['i-001']:exec(function()
1362+
local connpool = require('experimental.connpool')
1363+
local clock = require('clock')
1364+
1365+
-- The connection timeout for filter() is hardcoded in
1366+
-- connpool and equals to 10 seconds.
1367+
local CONNECT_TIMEOUT = 10
1368+
1369+
-- Check no new connection attempts has been done
1370+
-- and the call has quickly failed due to no candidates.
1371+
local timestamp_before_call = clock.monotonic()
1372+
local cur_connects = _G.connects
1373+
1374+
local exp_err = 'no candidates are available with these conditions'
1375+
local opts = {mode = 'ro'}
1376+
t.assert_error_msg_contains(exp_err, connpool.call, 'box.info', nil,
1377+
opts)
1378+
1379+
local elapsed_time = clock.monotonic() - timestamp_before_call
1380+
1381+
t.assert_equals(_G.connects, cur_connects)
1382+
t.assert_lt(elapsed_time, CONNECT_TIMEOUT)
1383+
end)
1384+
end

0 commit comments

Comments
 (0)