Skip to content

Commit b919815

Browse files
committed
SERVER-42026 Lock during ConnectionPool::SpecificPool::spawnConnections()
1 parent 6dabe58 commit b919815

File tree

4 files changed

+47
-31
lines changed

4 files changed

+47
-31
lines changed

src/mongo/executor/connection_pool.cpp

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,11 @@ class ConnectionPool::SpecificPool final
331331
template <typename CallableT>
332332
void runOnExecutor(CallableT&& cb) {
333333
ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) //
334-
.getAsync([ anchor = shared_from_this(),
335-
cb = std::forward<CallableT>(cb) ](Status && status) mutable {
334+
.getAsync([ this, anchor = shared_from_this(), cb = std::forward<CallableT>(cb) ](
335+
Status && status) mutable {
336336
invariant(status);
337+
338+
stdx::lock_guard lk(_parent->_mutex);
337339
cb();
338340
});
339341
}
@@ -489,22 +491,17 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
489491
if (iter == _pools.end())
490492
return;
491493

492-
auto pool = iter->second;
494+
auto& pool = iter->second;
493495
pool->triggerShutdown(
494496
Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"));
495497
}
496498

497499
void ConnectionPool::dropConnections(transport::Session::TagMask tags) {
498-
// Grab all current pools (under the lock)
499-
auto pools = [&] {
500-
stdx::lock_guard lk(_mutex);
501-
return _pools;
502-
}();
500+
stdx::lock_guard lk(_mutex);
503501

504-
for (const auto& pair : pools) {
502+
for (const auto& pair : _pools) {
505503
auto& pool = pair.second;
506504

507-
stdx::lock_guard lk(_mutex);
508505
if (pool->matchesTags(tags))
509506
continue;
510507

@@ -664,8 +661,6 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
664661
auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
665662
auto deleter = [ this, anchor = shared_from_this() ](ConnectionInterface * connection) {
666663
runOnExecutor([this, connection]() {
667-
stdx::lock_guard lk(_parent->_mutex);
668-
669664
returnConnection(connection);
670665

671666
_lastActiveTime = _parent->_factory->now();
@@ -837,9 +832,16 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) {
837832

838833
// Sets state to shutdown and kicks off the failure protocol to tank existing connections
839834
void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) {
840-
_health.isShutdown = true;
835+
auto wasShutdown = std::exchange(_health.isShutdown, true);
836+
if (wasShutdown) {
837+
return;
838+
}
841839

842840
LOG(2) << "Delisting connection pool for " << _hostAndPort;
841+
842+
// Make sure the pool lifetime lasts until the end of this function,
843+
// it could be only in the map of pools
844+
auto anchor = shared_from_this();
843845
_parent->_controller->removeHost(_id);
844846
_parent->_pools.erase(_hostAndPort);
845847

@@ -1118,7 +1120,7 @@ void ConnectionPool::SpecificPool::updateController() {
11181120
}
11191121
}
11201122

1121-
runOnExecutor([ this, anchor = shared_from_this() ]() { spawnConnections(); });
1123+
runOnExecutor([this]() { spawnConnections(); });
11221124
}
11231125

11241126
// Updates our state and manages the request timer

src/mongo/executor/connection_pool_test.cpp

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,28 @@ class ConnectionPoolTest : public unittest::Test {
6464
}
6565

6666
auto makePool(ConnectionPool::Options options = {}) {
67-
_pool =
68-
std::make_shared<ConnectionPool>(std::make_shared<PoolImpl>(), "test pool", options);
67+
_pool = std::make_shared<ConnectionPool>(
68+
std::make_shared<PoolImpl>(_executor), "test pool", options);
6969
return _pool;
7070
}
7171

72+
/**
73+
* Get from a pool with out-of-line execution and return the future for a connection
74+
*
75+
* Since the InlineOutOfLineExecutor starts running on the same thread once schedule is called,
76+
* this function allows us to avoid deadlocks with get(), which is the only public function that
77+
* calls schedule while holding a lock. In normal operation, the OutOfLineExecutor is actually
78+
* out of line, and this contrivance isn't necessary.
79+
*/
80+
template <typename... Args>
81+
auto getFromPool(Args&&... args) {
82+
return ExecutorFuture(_executor)
83+
.then([ pool = _pool, args... ]() { return pool->get(args...); })
84+
.semi();
85+
}
86+
7287
private:
88+
std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>();
7389
std::shared_ptr<ConnectionPool> _pool;
7490
};
7591

@@ -1440,7 +1456,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
14401456
size_t connId = 0;
14411457

14421458
// no connections in the pool, our future is not satisfied
1443-
auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
1459+
auto connFuture = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
14441460
ASSERT_FALSE(connFuture.isReady());
14451461

14461462
// Successfully get a new connection
@@ -1463,8 +1479,8 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
14631479
size_t connId2 = 0;
14641480
size_t connId3 = 0;
14651481

1466-
auto connFuture1 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
1467-
auto connFuture2 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
1482+
auto connFuture1 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
1483+
auto connFuture2 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
14681484

14691485
// The first future should be immediately ready. The second should be in the queue.
14701486
ASSERT_TRUE(connFuture1.isReady());
@@ -1475,7 +1491,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
14751491
auto conn1 = std::move(connFuture1).get();
14761492

14771493
// Grab our third future while our first one is being fulfilled
1478-
connFuture3 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
1494+
connFuture3 = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
14791495

14801496
connId1 = getId(conn1);
14811497
doneWith(conn1);
@@ -1512,7 +1528,7 @@ TEST_F(ConnectionPoolTest, ReturnAfterShutdown) {
15121528
auto pool = makePool();
15131529

15141530
// Grab a connection and hold it to end of scope
1515-
auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds(1));
1531+
auto connFuture = getFromPool(HostAndPort(), transport::kGlobalSSLMode, Seconds(1));
15161532
ConnectionImpl::pushSetup(Status::OK());
15171533
auto conn = std::move(connFuture).get();
15181534
doneWith(conn);

src/mongo/executor/connection_pool_test_fixture.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,15 @@ void TimerImpl::clear() {
6565
}
6666
}
6767

68-
void TimerImpl::fireIfNecessary() {
69-
auto now = PoolImpl().now();
68+
Date_t TimerImpl::now() {
69+
return _global->now();
70+
}
7071

72+
void TimerImpl::fireIfNecessary() {
7173
auto timers = _timers;
7274

7375
for (auto&& x : timers) {
74-
if (_timers.count(x) && (x->_expiration <= now)) {
76+
if (_timers.count(x) && (x->_expiration <= x->now())) {
7577
auto execCB = [cb = std::move(x->_cb)](auto&&) mutable {
7678
std::move(cb)();
7779
};
@@ -82,10 +84,6 @@ void TimerImpl::fireIfNecessary() {
8284
}
8385
}
8486

85-
Date_t TimerImpl::now() {
86-
return _global->now();
87-
}
88-
8987
std::set<TimerImpl*> TimerImpl::_timers;
9088

9189
ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global)

src/mongo/executor/connection_pool_test_fixture.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class PoolImpl;
4646
*/
4747
class TimerImpl final : public ConnectionPool::TimerInterface {
4848
public:
49-
TimerImpl(PoolImpl* global);
49+
explicit TimerImpl(PoolImpl* global);
5050
~TimerImpl() override;
5151

5252
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
@@ -174,7 +174,7 @@ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
174174
friend class TimerImpl;
175175

176176
public:
177-
PoolImpl() = default;
177+
explicit PoolImpl(const std::shared_ptr<OutOfLineExecutor>& executor) : _executor(executor) {}
178178
std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection(
179179
const HostAndPort& hostAndPort,
180180
transport::ConnectSSLMode sslMode,
@@ -197,7 +197,7 @@ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
197197

198198
private:
199199
ConnectionPool* _pool = nullptr;
200-
std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>();
200+
std::shared_ptr<OutOfLineExecutor> _executor;
201201

202202
static boost::optional<Date_t> _now;
203203
};

0 commit comments

Comments
 (0)