Skip to content

Commit 1db2fbd

Browse files
wangshao1wangshaoyi
andauthored
fix connections overflow when command processing takes too long and client close connection (#3089)
Co-authored-by: wangshaoyi <[email protected]>
1 parent 09424b2 commit 1db2fbd

File tree

4 files changed

+27
-4
lines changed

4 files changed

+27
-4
lines changed

src/net/include/net_define.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ enum EventStatus {
4848
kNone = 0,
4949
kReadable = 0x1,
5050
kWritable = 0x1 << 1,
51-
kErrorEvent = 0x1 << 2,
51+
kPeerClose = 0x1 << 2,
52+
kErrorEvent = 0x1 << 3,
5253
};
5354

5455
enum ConnStatus {

src/net/src/net_epoll.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
#include "net/include/net_define.h"
1515
#include "pstd/include/xdebug.h"
1616

17+
#ifndef EPOLLRDHUP
18+
#define EPOLLRDHUP 0x2000
19+
#endif
20+
1721
namespace net {
1822

1923
NetMultiplexer* CreateNetMultiplexer(int limit) { return new NetEpoll(limit); }
@@ -45,7 +49,7 @@ int NetEpoll::NetAddEvent(int fd, int mask) {
4549
}
4650
if (mask & kWritable) {
4751
ee.events |= EPOLLOUT;
48-
}
52+
}
4953

5054
return epoll_ctl(multiplexer_, EPOLL_CTL_ADD, fd, &ee);
5155
}
@@ -62,6 +66,10 @@ int NetEpoll::NetModEvent(int fd, int old_mask, int mask) {
6266
if ((old_mask | mask) & kWritable) {
6367
ee.events |= EPOLLOUT;
6468
}
69+
70+
if ((old_mask | mask) & kPeerClose) {
71+
ee.events |= EPOLLRDHUP;
72+
}
6573
return epoll_ctl(multiplexer_, EPOLL_CTL_MOD, fd, &ee);
6674
}
6775

@@ -93,6 +101,10 @@ int NetEpoll::NetPoll(int timeout) {
93101
ev.mask |= kWritable;
94102
}
95103

104+
if (events_[i].events & EPOLLRDHUP) {
105+
ev.mask |= kPeerClose;
106+
}
107+
96108
if (events_[i].events & (EPOLLERR | EPOLLHUP)) {
97109
ev.mask |= kErrorEvent;
98110
}

src/net/src/worker_thread.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ void* WorkerThread::ThreadMain() {
189189
ReadStatus read_status = in_conn->GetRequest();
190190
in_conn->set_last_interaction(now);
191191
if (read_status == kReadAll) {
192-
net_multiplexer_->NetModEvent(pfe->fd, 0, 0);
192+
net_multiplexer_->NetModEvent(pfe->fd, 0, kPeerClose);
193193
// Wait for the conn complete asynchronous task and
194194
// Mod Event to kWritable
195195
} else if (read_status == kReadHalf) {
@@ -199,8 +199,15 @@ void* WorkerThread::ThreadMain() {
199199
}
200200
}
201201

202+
if ((should_close == 0) && ((pfe->mask & kPeerClose) != 0)) {
203+
should_close = 1;
204+
}
205+
202206
if (((pfe->mask & kErrorEvent) != 0) || (should_close != 0)) {
203207
net_multiplexer_->NetDelEvent(pfe->fd, 0);
208+
// TODO: in_conn may live longer than fd.
209+
// eg. in_conn are being transferred to net_pubsub
210+
// while peer client closing this connection
204211
CloseFd(in_conn);
205212
in_conn = nullptr;
206213
{

tests/integration/pubsub_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ var _ = Describe("PubSub", func() {
3939

4040
It("implements Stringer", func() {
4141
pubsub := client.PSubscribe(ctx, "mychannel*")
42-
defer pubsub.Close()
42+
defer func() {
43+
time.Sleep(100 * time.Millisecond)
44+
pubsub.Close()
45+
}()
4346

4447
Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
4548
})

0 commit comments

Comments
 (0)