Skip to content

Commit c92d146

Browse files
authored
Improved Postgres connection stability (#2286)
1 parent 3c7c66e commit c92d146

File tree

3 files changed

+42
-11
lines changed

3 files changed

+42
-11
lines changed

orm_lib/src/DbClientImpl.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,8 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
429429
}
430430
// Reconnect after 1 second
431431
auto loop = closeConnPtr->loop();
432+
// closeConnPtr may be not valid. Close the connection file descriptor.
433+
closeConnPtr->disconnect();
432434
loop->runAfter(1, [weakPtr, loop, closeConnPtr] {
433435
auto thisPtr = weakPtr.lock();
434436
if (!thisPtr)

orm_lib/src/postgresql_impl/PgBatchConnection.cc

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,14 @@ PgConnection::PgConnection(trantor::EventLoop *loop,
8787
[](PGconn *conn) { PQfinish(conn); })),
8888
channel_(loop, PQsocket(connectionPtr_.get()))
8989
{
90+
if (channel_.fd() < 0)
91+
{
92+
LOG_ERROR << "Failed to create Postgres connection";
93+
}
9094
}
9195

9296
void PgConnection::init()
9397
{
94-
PQsetnonblocking(connectionPtr_.get(), 1);
9598
if (channel_.fd() < 0)
9699
{
97100
LOG_ERROR << "Connection with Postgres could not be established";
@@ -103,6 +106,8 @@ void PgConnection::init()
103106
}
104107
return;
105108
}
109+
110+
PQsetnonblocking(connectionPtr_.get(), 1);
106111
channel_.setReadCallback([this]() {
107112
if (status_ == ConnectStatus::Bad)
108113
{
@@ -165,8 +170,11 @@ void PgConnection::disconnect()
165170
auto thisPtr = shared_from_this();
166171
loop_->runInLoop([thisPtr, &pro]() {
167172
thisPtr->status_ = ConnectStatus::Bad;
168-
thisPtr->channel_.disableAll();
169-
thisPtr->channel_.remove();
173+
if (thisPtr->channel_.fd() >= 0)
174+
{
175+
thisPtr->channel_.disableAll();
176+
thisPtr->channel_.remove();
177+
}
170178
thisPtr->connectionPtr_.reset();
171179
pro.set_value(1);
172180
});
@@ -522,11 +530,17 @@ void PgConnection::handleFatalError(bool clearAll, bool isAbortPipeline)
522530
{
523531
for (auto &cmd : batchCommandsForWaitingResults_)
524532
{
525-
cmd->exceptionCallback_(exceptPtr);
533+
if (cmd->exceptionCallback_)
534+
{
535+
cmd->exceptionCallback_(exceptPtr);
536+
}
526537
}
527538
for (auto &cmd : batchSqlCommands_)
528539
{
529-
cmd->exceptionCallback_(exceptPtr);
540+
if (cmd->exceptionCallback_)
541+
{
542+
cmd->exceptionCallback_(exceptPtr);
543+
}
530544
}
531545
batchCommandsForWaitingResults_.clear();
532546
batchSqlCommands_.clear();
@@ -536,13 +550,19 @@ void PgConnection::handleFatalError(bool clearAll, bool isAbortPipeline)
536550
if (!batchSqlCommands_.empty() &&
537551
!batchSqlCommands_.front()->preparingStatement_.empty())
538552
{
539-
batchSqlCommands_.front()->exceptionCallback_(exceptPtr);
553+
if (batchSqlCommands_.front()->exceptionCallback_)
554+
{
555+
batchSqlCommands_.front()->exceptionCallback_(exceptPtr);
556+
}
540557
batchSqlCommands_.pop_front();
541558
}
542559
else if (!batchCommandsForWaitingResults_.empty())
543560
{
544561
auto &cmd = batchCommandsForWaitingResults_.front();
545-
cmd->exceptionCallback_(exceptPtr);
562+
if (cmd->exceptionCallback_)
563+
{
564+
cmd->exceptionCallback_(exceptPtr);
565+
}
546566
batchCommandsForWaitingResults_.pop_front();
547567
}
548568
else

orm_lib/src/postgresql_impl/PgConnection.cc

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,14 @@ PgConnection::PgConnection(trantor::EventLoop *loop,
6565
[](PGconn *conn) { PQfinish(conn); })),
6666
channel_(loop, PQsocket(connectionPtr_.get()))
6767
{
68+
if (channel_.fd() < 0)
69+
{
70+
LOG_ERROR << "Failed to create Postgres connection";
71+
}
6872
}
6973

7074
void PgConnection::init()
7175
{
72-
PQsetnonblocking(connectionPtr_.get(), 1);
7376
if (channel_.fd() < 0)
7477
{
7578
LOG_ERROR << "Connection with Postgres could not be established";
@@ -80,6 +83,8 @@ void PgConnection::init()
8083
}
8184
return;
8285
}
86+
87+
PQsetnonblocking(connectionPtr_.get(), 1);
8388
channel_.setReadCallback([this]() {
8489
if (status_ == ConnectStatus::Bad)
8590
{
@@ -142,8 +147,11 @@ void PgConnection::disconnect()
142147
auto thisPtr = shared_from_this();
143148
loop_->runInLoop([thisPtr, &pro]() {
144149
thisPtr->status_ = ConnectStatus::Bad;
145-
thisPtr->channel_.disableAll();
146-
thisPtr->channel_.remove();
150+
if (thisPtr->channel_.fd() >= 0)
151+
{
152+
thisPtr->channel_.disableAll();
153+
thisPtr->channel_.remove();
154+
}
147155
thisPtr->connectionPtr_.reset();
148156
pro.set_value(1);
149157
});
@@ -400,7 +408,8 @@ void PgConnection::handleFatalError()
400408
{
401409
auto exceptPtr =
402410
std::make_exception_ptr(Failure(PQerrorMessage(connectionPtr_.get())));
403-
exceptionCallback_(exceptPtr);
411+
if (exceptionCallback_)
412+
exceptionCallback_(exceptPtr);
404413
exceptionCallback_ = nullptr;
405414
}
406415

0 commit comments

Comments
 (0)