Skip to content

Commit cfba400

Browse files
CamJNFooBarWidget
andauthored
Make Core Controller application socket connections non-blocking (#2593)
`ApplicationPool::Socket::checkoutConnection()` initiates blocking connects, which could block the event loop if the application process's socket backlog is full. --------- Co-authored-by: Hongli Lai <hongli@phusion.nl>
1 parent 2903dad commit cfba400

31 files changed

+457
-62
lines changed

CHANGELOG

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Release 6.0.27 (Not yet released)
44
* Fix compilation on FreeBSD.
55
* [Ruby] Fix compatibility with Rack 2 While maintaining compatibility with Rack 3. Closes GH-2595.
66
* [Ruby] Use non-deprecated functions in native extensions.
7+
* Fix an issue where Passenger could freeze while connecting to application processes (event loop blocking).
78
* Updated various library versions used in precompiled binaries (used for e.g. gem installs):
89
- ccache: 4.10.2 -> 4.11.2
910
- cmake: 3.31.3 -> 3.31.6
@@ -19,6 +20,7 @@ Release 6.0.27 (Not yet released)
1920
- 3.4.1 -> 3.4.2
2021
- rubygems: 3.6.2 -> 3.6.6
2122
- zstd: 1.5.6 -> 1.5.7
23+
*
2224

2325

2426
Release 6.0.26

build/support/cxx_dependency_map.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3022,6 +3022,7 @@
30223022
"src/cxx_supportlib/DataStructures/HashedStaticString.h",
30233023
"src/cxx_supportlib/Exceptions.h",
30243024
"src/cxx_supportlib/FileDescriptor.h",
3025+
"src/cxx_supportlib/FileTools/FileManip.h",
30253026
"src/cxx_supportlib/IOTools/BufferedIO.h",
30263027
"src/cxx_supportlib/IOTools/IOUtils.h",
30273028
"src/cxx_supportlib/LoggingKit/Assert.h",
@@ -3030,7 +3031,9 @@
30303031
"src/cxx_supportlib/LoggingKit/LoggingKit.h",
30313032
"src/cxx_supportlib/StaticString.h",
30323033
"src/cxx_supportlib/StrIntTools/StrIntUtils.h",
3034+
"src/cxx_supportlib/Utils.h",
30333035
"src/cxx_supportlib/Utils/FastStringStream.h",
3036+
"src/cxx_supportlib/Utils/ScopeGuard.h",
30343037
"src/cxx_supportlib/oxt/backtrace.hpp",
30353038
"src/cxx_supportlib/oxt/detail/backtrace_disabled.hpp",
30363039
"src/cxx_supportlib/oxt/detail/backtrace_enabled.hpp",

resources/templates/standalone/server.erb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ passenger_enabled on;
4242
<%= nginx_option(app, :app_type) %>
4343
<%= nginx_option(app, :startup_file) %>
4444
<%= nginx_option(app, :app_start_command) %>
45+
<%= nginx_option(app, :app_connect_timeout) %>
4546
<%= nginx_option(app, :start_timeout) %>
4647
<%= nginx_option(app, :min_instances) %>
4748
<%= nginx_option(app, :max_request_queue_size) %>

src/agent/Core/ApplicationPool/AbstractSession.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,19 @@ class AbstractSession {
5252
virtual StaticString getProtocol() const = 0;
5353
virtual unsigned int getStickySessionId() const = 0;
5454
virtual const ApiKey &getApiKey() const = 0;
55+
/** The connection fd that was established by \c initiate(), or -1 if not initiated yet. Note that this fd is non-blocking. */
5556
virtual int fd() const = 0;
5657
virtual bool isClosed() const = 0;
5758

58-
virtual void initiate(bool blocking = true) = 0;
59+
/**
60+
* Initiates a non-blocking connect to the application socket, or reuse an existing connection.
61+
*
62+
* @return Whether the non-blocking connect finished (true) or whether you need to wait for it (false).
63+
* @throws SystemException Something went wrong.
64+
* @throws RuntimeException Something went wrong.
65+
* @throws boost::thread_interrupted A system call has been interrupted.
66+
*/
67+
virtual bool initiate() = 0;
5968

6069
virtual void requestOOBW() { /* Do nothing */ }
6170

src/agent/Core/ApplicationPool/Session.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,20 +185,15 @@ class Session: public AbstractSession {
185185
return getSocket()->protocol;
186186
}
187187

188-
189-
virtual void initiate(bool blocking = true) override {
188+
// See AbstractSession.h for docs
189+
virtual bool initiate() override {
190190
assert(!closed);
191191
ScopeGuard g(boost::bind(&Session::callOnInitiateFailure, this));
192192
Connection connection = socket->checkoutConnection();
193193
connection.fail = true;
194-
if (connection.blocking && !blocking) {
195-
FdGuard g2(connection.fd, NULL, 0);
196-
setNonBlocking(connection.fd);
197-
g2.clear();
198-
connection.blocking = false;
199-
}
200194
g.clear();
201195
this->connection = connection;
196+
return connection.ready;
202197
}
203198

204199
bool initiated() const {

src/agent/Core/ApplicationPool/Socket.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ struct Connection {
5151
int fd;
5252
bool wantKeepAlive: 1;
5353
bool fail: 1;
54-
bool blocking: 1;
54+
bool ready: 1;
5555

5656
Connection()
5757
: fd(-1),
5858
wantKeepAlive(false),
5959
fail(false),
60-
blocking(true)
60+
ready(false)
6161
{ }
6262

6363
void close() {
@@ -85,13 +85,15 @@ class Socket {
8585
return concurrency;
8686
}
8787

88+
/** Initiates a non-blocking connect. */
8889
Connection connect() const {
8990
Connection connection;
9091
P_TRACE(3, "Connecting to " << address);
91-
connection.fd = connectToServer(address, __FILE__, __LINE__);
92+
NConnect_State state(address, __FILE__, __LINE__);
93+
connection.ready = state.connectToServer();
9294
connection.fail = true;
95+
connection.fd = state.getFd().detach();
9396
connection.wantKeepAlive = false;
94-
connection.blocking = true;
9597
P_LOG_FILE_DESCRIPTOR_PURPOSE(connection.fd, "App " << pid << " connection");
9698
return connection;
9799
}
@@ -164,15 +166,22 @@ class Socket {
164166
}
165167

166168
/**
167-
* Connect to this socket or reuse an existing connection.
169+
* Initiates a non-blocking connection to this socket or reuse an existing connection.
170+
* Use `result.ready` to check whether the connect is finished or whether you need
171+
* to wait for it to finish.
168172
*
169173
* One MUST call checkinConnection() when one's done using the Connection.
170174
* Failure to do so will result in a resource leak.
175+
*
176+
* @throws SystemException Something went wrong.
177+
* @throws RuntimeException Something went wrong.
178+
* @throws boost::thread_interrupted A system call has been interrupted.
171179
*/
172180
Connection checkoutConnection() {
173181
boost::unique_lock<boost::mutex> l(connectionPoolLock);
174182

175183
if (!idleConnections.empty()) {
184+
TRACE_POINT();
176185
P_TRACE(3, "Socket " << address << ": checking out connection from connection pool (" <<
177186
idleConnections.size() << " -> " << (idleConnections.size() - 1) <<
178187
" items). Current total number of connections: " << totalConnections);
@@ -181,6 +190,7 @@ class Socket {
181190
totalIdleConnections--;
182191
return connection;
183192
} else {
193+
TRACE_POINT();
184194
Connection connection = connect();
185195
totalConnections++;
186196
P_TRACE(3, "Socket " << address << ": there are now " <<

src/agent/Core/ApplicationPool/TestSession.h

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,22 @@
2828

2929
#include <boost/thread.hpp>
3030
#include <string>
31+
#include <utility>
3132
#include <cassert>
33+
#include <cstring>
34+
#include <unistd.h>
35+
#include <LoggingKit/Logging.h>
3236
#include <IOTools/IOUtils.h>
3337
#include <IOTools/BufferedIO.h>
38+
#include <StrIntTools/StrIntUtils.h>
3439
#include <Core/ApplicationPool/AbstractSession.h>
40+
#include <Exceptions.h>
41+
#include <StaticString.h>
42+
#include <FileDescriptor.h>
43+
#include <Utils.h>
44+
#include <Utils/ScopeGuard.h>
45+
#include <FileTools/FileManip.h>
46+
#include <oxt/system_calls.hpp>
3547

3648
namespace Passenger {
3749
namespace ApplicationPool2 {
@@ -57,6 +69,7 @@ class TestSession: public AbstractSession {
5769
mutable bool closed;
5870
mutable bool success;
5971
mutable bool wantKeepAlive;
72+
mutable bool forcingNonInstantConnect;
6073

6174
public:
6275
TestSession()
@@ -67,7 +80,8 @@ class TestSession: public AbstractSession {
6780
stickySessionId(0),
6881
closed(false),
6982
success(false),
70-
wantKeepAlive(false)
83+
wantKeepAlive(false),
84+
forcingNonInstantConnect(false)
7185
{ }
7286

7387
virtual void ref() const override {
@@ -163,12 +177,57 @@ class TestSession: public AbstractSession {
163177
return wantKeepAlive;
164178
}
165179

166-
virtual void initiate(bool blocking = true) override {
180+
void forceNonInstantConnect() {
167181
boost::lock_guard<boost::mutex> l(syncher);
168-
connection = createUnixSocketPair(__FILE__, __LINE__);
182+
forcingNonInstantConnect = true;
183+
}
184+
185+
virtual bool initiate() override {
186+
boost::lock_guard<boost::mutex> l(syncher);
187+
188+
// Create a unique temporary directory for the socket
189+
string tempDirPathTemplate = StaticString(getSystemTempDir()) + "/passenger.session.XXXXXX";
190+
DynamicBuffer tempDirPath(tempDirPathTemplate.size() + 1);
191+
memcpy(tempDirPath.data, tempDirPathTemplate.c_str(), tempDirPathTemplate.size() + 1);
192+
if (mkdtemp(tempDirPath.data) == NULL) {
193+
int e = errno;
194+
throw SystemException("Cannot create temporary directory", e);
195+
}
196+
ScopeGuard g([&]() {
197+
try {
198+
removeDirTree(tempDirPath.data);
199+
} catch (const std::exception &e) {
200+
P_ERROR("Error deleting temporary directory " << tempDirPath.data << ": " << e.what());
201+
}
202+
});
203+
204+
// Create server socket
205+
string socketPath = StaticString(tempDirPath.data) + "/socket";
206+
FileDescriptor serverFd(createUnixServer(socketPath.c_str(), 0, true, __FILE__, __LINE__),
207+
__FILE__, __LINE__);
208+
209+
// Create client socket (non-blocking)
210+
NUnix_State clientState;
211+
setupNonBlockingUnixSocket(clientState, socketPath, __FILE__, __LINE__);
212+
bool immediatelyConnected = connectToUnixServer(clientState);
213+
connection.first = std::move(clientState.fd);
214+
215+
// Accept connection (blocking)
216+
FileDescriptor serverSideFd(oxt::syscalls::accept(serverFd, NULL, NULL),
217+
__FILE__, __LINE__);
218+
if (serverSideFd == -1) {
219+
int e = errno;
220+
throw SystemException("Cannot accept connection", e);
221+
}
222+
223+
// Store the server-side fd.
224+
connection.second = std::move(serverSideFd);
169225
peerBufferedIO = BufferedIO(connection.second);
170-
if (!blocking) {
171-
setNonBlocking(connection.first);
226+
227+
if (forcingNonInstantConnect) {
228+
return false;
229+
} else {
230+
return immediatelyConnected;
172231
}
173232
}
174233

src/agent/Core/Controller.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include <sys/types.h>
4949
#include <sys/uio.h>
5050
#include <utility>
51+
#include <exception>
5152
#include <cstdio>
5253
#include <cstdlib>
5354
#include <cstddef>
@@ -100,10 +101,6 @@ class Controller: public ServerKit::HttpServer<Controller, Client> {
100101
typedef ServerKit::FileBufferedChannel FileBufferedChannel;
101102
typedef ServerKit::FileBufferedFdSinkChannel FileBufferedFdSinkChannel;
102103

103-
// If you change this value, make sure that Request::sessionCheckoutTry
104-
// has enough bits.
105-
static const unsigned int MAX_SESSION_CHECKOUT_TRY = 10;
106-
107104
ControllerMainConfig mainConfig;
108105
ControllerRequestConfigPtr requestConfig;
109106
StringKeyTable< boost::shared_ptr<Options> > poolOptionsCache;
@@ -195,9 +192,13 @@ class Controller: public ServerKit::HttpServer<Controller, Client> {
195192
const AbstractSessionPtr &session, const ExceptionPtr &e);
196193
void maybeSend100Continue(Client *client, Request *req);
197194
void initiateSession(Client *client, Request *req);
195+
void finishInitiatingSession(Client *client, Request *req);
196+
static void onSessionSocketConnected(EV_P_ ev_io *io, int revents);
197+
static void onSessionSocketConnectTimeout(EV_P_ ev_timer *io, int flag);
198198
static void checkoutSessionLater(Request *req);
199199
void reportSessionCheckoutError(Client *client, Request *req,
200200
const ExceptionPtr &e);
201+
void handleSessionInitiationError(Client *client, Request *req, const std::exception &e);
201202
int lookupCodeFromHeader(Request *req, const char* header, int statusCode);
202203
void writeRequestQueueFullExceptionErrorResponse(Client *client,
203204
Request *req, const boost::shared_ptr<RequestQueueFullException> &e);
@@ -311,6 +312,7 @@ class Controller: public ServerKit::HttpServer<Controller, Client> {
311312
void endRequestWithSimpleResponse(Client **c, Request **r,
312313
const StaticString &body, int code = 200);
313314
void endRequestAsBadGateway(Client **client, Request **req);
315+
void endRequestAsGatewayTimeout(Client **client, Request **req);
314316
void writeBenchmarkResponse(Client **client, Request **req,
315317
bool end = true);
316318
bool getBoolOption(Request *req, const HashedStaticString &name,
@@ -355,9 +357,15 @@ class Controller: public ServerKit::HttpServer<Controller, Client> {
355357

356358
virtual void asyncGetFromApplicationPool(Request *req,
357359
ApplicationPool2::GetCallback callback);
360+
virtual int getSessionSocketConnectIoWatchConditions() const;
361+
virtual double getSessionSocketEffectiveConnectTimeout(Request *req) const;
358362

359363

360364
public:
365+
// If you change this value, make sure that Request::sessionCheckoutTry
366+
// has enough bits.
367+
static constexpr unsigned int MAX_SESSION_CHECKOUT_TRY = 10;
368+
361369
typedef ControllerConfigChangeRequest ConfigChangeRequest;
362370

363371
// Dependencies

0 commit comments

Comments
 (0)