Skip to content

Commit a5cdfc1

Browse files
committed
add the progressive read idle timout checker
1 parent 8f9d12a commit a5cdfc1

File tree

4 files changed

+116
-4
lines changed

4 files changed

+116
-4
lines changed

example/http_c++/http_client.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
#include <gflags/gflags.h>
2626
#include <butil/logging.h>
2727
#include <brpc/channel.h>
28+
#include "bthread/countdown_event.h"
2829

2930
DEFINE_string(d, "", "POST this data to the http server");
31+
DEFINE_bool(progressive, false, "whether or not progressive read data from server");
32+
DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds");
3033
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
3134
DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds");
3235
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
@@ -36,6 +39,25 @@ namespace brpc {
3639
DECLARE_bool(http_verbose);
3740
}
3841

42+
class PartDataReader: public brpc::ProgressiveReader {
43+
public:
44+
explicit PartDataReader(bthread::CountdownEvent* done): _done(done){}
45+
46+
butil::Status OnReadOnePart(const void* data, size_t length) {
47+
memcpy(_buffer, data, length);
48+
LOG(INFO) << "data : " << _buffer << " size : " << length;
49+
return butil::Status::OK();
50+
}
51+
52+
void OnEndOfMessage(const butil::Status& status) {
53+
_done->signal();
54+
LOG(INFO) << "progressive read data final status : " << status;
55+
}
56+
private:
57+
char _buffer[1024];
58+
bthread::CountdownEvent* _done;
59+
};
60+
3961
int main(int argc, char* argv[]) {
4062
// Parse gflags. We recommend you to use gflags as well.
4163
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
@@ -71,13 +93,25 @@ int main(int argc, char* argv[]) {
7193
cntl.request_attachment().append(FLAGS_d);
7294
}
7395

96+
if (FLAGS_progressive) {
97+
cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms);
98+
cntl.response_will_be_read_progressively();
99+
}
100+
74101
// Because `done'(last parameter) is NULL, this function waits until
75102
// the response comes back or error occurs(including timedout).
76103
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
77104
if (cntl.Failed()) {
78105
std::cerr << cntl.ErrorText() << std::endl;
79106
return -1;
80107
}
108+
109+
if (FLAGS_progressive) {
110+
bthread::CountdownEvent done(1);
111+
cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done));
112+
done.wait();
113+
LOG(INFO) << "wait client progressive read done safely";
114+
}
81115
// If -http_verbose is on, brpc already prints the response to stderr.
82116
if (!brpc::FLAGS_http_verbose) {
83117
std::cout << cntl.response_attachment() << std::endl;

example/http_c++/http_server.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
3131
DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL");
3232
DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL");
3333
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");
34+
DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout");
3435

3536
namespace example {
3637

@@ -104,6 +105,9 @@ class FileServiceImpl : public FileService {
104105

105106
// sleep a while to send another part.
106107
bthread_usleep(10000);
108+
if (FLAGS_enable_progressive_timout && i > 50) {
109+
bthread_usleep(100000000UL);
110+
}
107111
}
108112
return NULL;
109113
}
@@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService {
194198

195199
// sleep a while to send another part.
196200
bthread_usleep(10000 * 10);
201+
if (FLAGS_enable_progressive_timout && i > 50) {
202+
bthread_usleep(100000000UL);
203+
}
197204
}
198205
return NULL;
199206
}

src/brpc/controller.cpp

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false,
9595
"Register SIGTERM handle func to quit graceful");
9696
DEFINE_bool(graceful_quit_on_sighup, false,
9797
"Register SIGHUP handle func to quit graceful");
98-
98+
DEFINE_bool(log_idle_progressive_read_close, false,
99+
"Print log when an idle progressive read is closed");
99100
const IdlNames idl_single_req_single_res = { "req", "res" };
100101
const IdlNames idl_single_req_multi_res = { "req", "" };
101102
const IdlNames idl_multi_req_single_res = { "", "res" };
@@ -331,6 +332,15 @@ void Controller::Call::Reset() {
331332
stream_user_data = NULL;
332333
}
333334

335+
void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){
336+
if(progressive_read_timeout_ms <= 0x7fffffff){
337+
_progressive_read_timeout_ms = progressive_read_timeout_ms;
338+
} else {
339+
_progressive_read_timeout_ms = 0x7fffffff;
340+
LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff";
341+
}
342+
}
343+
334344
void Controller::set_timeout_ms(int64_t timeout_ms) {
335345
if (timeout_ms <= 0x7fffffff) {
336346
_timeout_ms = timeout_ms;
@@ -1027,6 +1037,43 @@ void Controller::SubmitSpan() {
10271037
_span = NULL;
10281038
}
10291039

1040+
void* Controller::HandleIdleProgressiveReader(void* arg) {
1041+
auto* cntl = static_cast<Controller*>(arg);
1042+
const uint64_t CHECK_INTERVAL_US = 1000000UL;
1043+
auto log_idle = FLAGS_log_idle_progressive_read_close;
1044+
std::vector<SocketId> remove_socket_ids;
1045+
while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
1046+
// TODO: this is not efficient for a lot of connections(>100K)
1047+
auto socketIds = cntl->_checking_progressive_read_fds;
1048+
int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000;
1049+
for (auto socket_id : socketIds){
1050+
SocketUniquePtr s;
1051+
if (Socket::Address(socket_id, &s) == 0) {
1052+
auto cpuwide_time_us = butil::cpuwide_time_us();
1053+
const int64_t last_active_us = s->last_active_time_us();
1054+
if (cpuwide_time_us - last_active_us <= progressive_read_timeout_us) {
1055+
continue;
1056+
}
1057+
LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id
1058+
<< " progressive read timeout us : " << progressive_read_timeout_us
1059+
<< " progressive read idle duration : " << cpuwide_time_us - last_active_us;
1060+
if (s->parsing_context() != NULL) {
1061+
s->parsing_context()->Destroy();
1062+
}
1063+
s->ReleaseReferenceIfIdle(0xffffffff);
1064+
remove_socket_ids.push_back(socket_id);
1065+
} else {
1066+
LOG(ERROR) << "not found the socket id : " << socket_id;
1067+
remove_socket_ids.push_back(socket_id);
1068+
}
1069+
}
1070+
for (auto remove_socket_id : remove_socket_ids) {
1071+
socketIds.erase(remove_socket_id);
1072+
}
1073+
}
1074+
return NULL;
1075+
}
1076+
10301077
void Controller::HandleSendFailed() {
10311078
if (!FailedInline()) {
10321079
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
@@ -1179,6 +1226,11 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
11791226
// Tag the socket so that when the response comes back, the parser will
11801227
// stop before reading all body.
11811228
_current_call.sending_sock->read_will_be_progressive(_connection_type);
1229+
auto socket_id = _current_call.sending_sock->id();
1230+
if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek(socket_id) == NULL) {
1231+
_checking_progressive_read_fds.insert(socket_id);
1232+
LOG(INFO) << "insert the progressive read fd : " << socket_id << " socket fds size : " << _checking_progressive_read_fds.size();
1233+
}
11821234
}
11831235

11841236
// Handle authentication

src/brpc/controller.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
#include "brpc/grpc.h"
4848
#include "brpc/kvmap.h"
4949
#include "brpc/rpc_dump.h"
50-
5150
// EAUTH is defined in MAC
5251
#ifndef EAUTH
5352
#define EAUTH ERPCAUTH
@@ -163,7 +162,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
163162
uint64_t log_id;
164163
std::string request_id;
165164
};
166-
165+
static void* HandleIdleProgressiveReader(void* arg);
167166
public:
168167
Controller();
169168
Controller(const Inheritable& parent_ctx);
@@ -177,6 +176,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
177176

178177
// Set/get timeout in milliseconds for the RPC call. Use
179178
// ChannelOptions.timeout_ms on unset.
179+
void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms);
180+
int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; }
181+
180182
void set_timeout_ms(int64_t timeout_ms);
181183
int64_t timeout_ms() const { return _timeout_ms; }
182184

@@ -323,7 +325,19 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
323325

324326
// Make the RPC end when the HTTP response has complete headers and let
325327
// user read the remaining body by using ReadProgressiveAttachmentBy().
326-
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
328+
void response_will_be_read_progressively() {
329+
if(has_flag(FLAGS_READ_PROGRESSIVELY) && _progressive_read_idle_tid > 0) {
330+
return;
331+
}
332+
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
333+
tmp.tag = _bthread_tag;
334+
if(bthread_start_background(&_progressive_read_idle_tid, &tmp, HandleIdleProgressiveReader, this) != 0){
335+
LOG(FATAL) << "Failed to start controller bthread id : " << _progressive_read_idle_tid;
336+
}
337+
LOG(INFO) << "Start Response progressive reader idle checker close idle_tid : " << _progressive_read_idle_tid
338+
<< " _bthread_tag : " << _bthread_tag;
339+
add_flag(FLAGS_READ_PROGRESSIVELY);
340+
}
327341
// Make the RPC end when the HTTP request has complete headers and let
328342
// user read the remaining body by using ReadProgressiveAttachmentBy().
329343
void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
@@ -837,6 +851,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
837851
int32_t _timeout_ms;
838852
int32_t _connect_timeout_ms;
839853
int32_t _backup_request_ms;
854+
int32_t _progressive_read_timeout_ms;
855+
butil::FlatSet<SocketId> _checking_progressive_read_fds;
856+
bthread_t _progressive_read_idle_tid;
857+
// Controller belongs to this tag
858+
bthread_tag_t _bthread_tag = bthread_self_tag();
840859
// Priority: `_backup_request_policy' > `_backup_request_ms'.
841860
BackupRequestPolicy* _backup_request_policy;
842861
// If this rpc call has retry/backup request,this var save the real timeout for current call

0 commit comments

Comments
 (0)