Skip to content

Commit 7306a46

Browse files
wangshao1wangshaoyi
andauthored
skip processing command while associated connection has been closed (#3111)
Co-authored-by: wangshaoyi <[email protected]>
1 parent 0014d47 commit 7306a46

File tree

4 files changed

+11
-3
lines changed

4 files changed

+11
-3
lines changed

src/net/include/net_conn.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <sys/time.h>
1010
#include <sstream>
1111
#include <string>
12+
#include <atomic>
1213

1314
#ifdef __ENABLE_SSL
1415
# include <openssl/err.h>
@@ -70,7 +71,7 @@ class NetConn : public std::enable_shared_from_this<NetConn>, public pstd::nonco
7071
std::string name() { return name_; }
7172
void set_name(std::string name) { name_ = std::move(name); }
7273

73-
bool IsClose() { return close_; }
74+
bool IsClose() { return close_.load(); }
7475
void SetClose(bool close);
7576

7677
void set_last_interaction(const struct timeval& now) { last_interaction_ = now; }
@@ -100,7 +101,7 @@ class NetConn : public std::enable_shared_from_this<NetConn>, public pstd::nonco
100101
std::string ip_port_;
101102
bool is_reply_ = false;
102103
bool is_writable_ = false;
103-
bool close_ = false;
104+
std::atomic_bool close_ = false;
104105
struct timeval last_interaction_;
105106
int flags_ = 0;
106107
std::string name_;

src/net/src/net_conn.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ NetConn::NetConn(const int fd, std::string ip_port, Thread* thread, NetMultiple
2323
#ifdef __ENABLE_SSL
2424
ssl_(nullptr),
2525
#endif
26+
close_(false),
2627
thread_(thread),
2728
net_multiplexer_(net_mpx) {
2829
gettimeofday(&last_interaction_, nullptr);
@@ -36,7 +37,7 @@ NetConn::~NetConn() {
3637
#endif
3738

3839
void NetConn::SetClose(bool close) {
39-
close_ = close;
40+
close_.store(close);
4041
}
4142

4243
bool NetConn::SetNonblock() {

src/net/src/worker_thread.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ void* WorkerThread::ThreadMain() {
144144
} else if (ti.notify_type() == kNotiEpollin) {
145145
net_multiplexer_->NetModEvent(ti.fd(), 0, kReadable);
146146
} else if (ti.notify_type() == kNotiEpolloutAndEpollin) {
147+
// TODO(wangshaoyi): conn_fd might be closed already, so epoll_ctl might return non-zero
147148
net_multiplexer_->NetModEvent(ti.fd(), 0, kReadable | kWritable);
148149
} else if (ti.notify_type() == kNotiWait) {
149150
// do not register events
@@ -209,6 +210,7 @@ void* WorkerThread::ThreadMain() {
209210
// eg. in_conn are being transferred to net_pubsub
210211
// while peer client closing this connection
211212
CloseFd(in_conn);
213+
in_conn->SetClose(true);
212214
in_conn = nullptr;
213215
{
214216
std::lock_guard lock(rwlock_);

src/pika_client_conn.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,10 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
338338
std::unique_ptr<BgTaskArg> bg_arg(static_cast<BgTaskArg*>(arg));
339339
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
340340
conn_ptr->time_stat_->dequeue_ts_ = pstd::NowMicros();
341+
if (conn_ptr->IsClose()) {
342+
LOG(INFO) << "conn " << conn_ptr->ip_port() << " has already been closed, skip processing command";
343+
return;
344+
}
341345
if (bg_arg->redis_cmds.empty()) {
342346
conn_ptr->NotifyEpoll(false);
343347
return;

0 commit comments

Comments
 (0)