Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "bthread/countdown_event.h"

DEFINE_string(d, "", "POST this data to the http server");
DEFINE_bool(progressive, false, "whether or not progressive read data from server");
DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
Expand All @@ -36,6 +39,25 @@ namespace brpc {
DECLARE_bool(http_verbose);
}

class PartDataReader: public brpc::ProgressiveReader {
public:
explicit PartDataReader(bthread::CountdownEvent* done): _done(done){}

butil::Status OnReadOnePart(const void* data, size_t length) {
memcpy(_buffer, data, length);
LOG(INFO) << "data : " << _buffer << " size : " << length;
return butil::Status::OK();
}

void OnEndOfMessage(const butil::Status& status) {
_done->signal();
LOG(INFO) << "progressive read data final status : " << status;
}
private:
char _buffer[1024];
bthread::CountdownEvent* _done;
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
Expand Down Expand Up @@ -71,13 +93,25 @@ int main(int argc, char* argv[]) {
cntl.request_attachment().append(FLAGS_d);
}

if (FLAGS_progressive) {
cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms);
cntl.response_will_be_read_progressively();
}

// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return -1;
}

if (FLAGS_progressive) {
bthread::CountdownEvent done(1);
cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done));
done.wait();
LOG(INFO) << "wait client progressive read done safely";
}
// If -http_verbose is on, brpc already prints the response to stderr.
if (!brpc::FLAGS_http_verbose) {
std::cout << cntl.response_attachment() << std::endl;
Expand Down
7 changes: 7 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL");
DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL");
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");
DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout");

namespace example {

Expand Down Expand Up @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService {

// sleep a while to send another part.
bthread_usleep(10000);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down Expand Up @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService {

// sleep a while to send another part.
bthread_usleep(10000 * 10);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down
54 changes: 53 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_bool(graceful_quit_on_sighup, false,
"Register SIGHUP handle func to quit graceful");

DEFINE_bool(log_idle_progressive_read_close, false,
"Print log when an idle progressive read is closed");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
const IdlNames idl_multi_req_single_res = { "", "res" };
Expand Down Expand Up @@ -331,6 +332,15 @@ void Controller::Call::Reset() {
stream_user_data = NULL;
}

void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){
if(progressive_read_timeout_ms <= 0x7fffffff){
_progressive_read_timeout_ms = progressive_read_timeout_ms;
} else {
_progressive_read_timeout_ms = 0x7fffffff;
LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff";
}
}

void Controller::set_timeout_ms(int64_t timeout_ms) {
if (timeout_ms <= 0x7fffffff) {
_timeout_ms = timeout_ms;
Expand Down Expand Up @@ -1027,6 +1037,43 @@ void Controller::SubmitSpan() {
_span = NULL;
}

void* Controller::HandleIdleProgressiveReader(void* arg) {
auto* cntl = static_cast<Controller*>(arg);
const uint64_t CHECK_INTERVAL_US = 1000000UL;
auto log_idle = FLAGS_log_idle_progressive_read_close;
std::vector<SocketId> remove_socket_ids;
while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
// TODO: this is not efficient for a lot of connections(>100K)
auto socketIds = cntl->_checking_progressive_read_fds;
int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000;
for (auto socket_id : socketIds){
SocketUniquePtr s;
if (Socket::Address(socket_id, &s) == 0) {
auto cpuwide_time_us = butil::cpuwide_time_us();
const int64_t last_active_us = s->last_active_time_us();
if (cpuwide_time_us - last_active_us <= progressive_read_timeout_us) {
continue;
}
LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id
<< " progressive read timeout us : " << progressive_read_timeout_us
<< " progressive read idle duration : " << cpuwide_time_us - last_active_us;
if (s->parsing_context() != NULL) {
s->parsing_context()->Destroy();
}
s->ReleaseReferenceIfIdle(0xffffffff);
remove_socket_ids.push_back(socket_id);
} else {
LOG(ERROR) << "not found the socket id : " << socket_id;
remove_socket_ids.push_back(socket_id);
}
}
for (auto remove_socket_id : remove_socket_ids) {
socketIds.erase(remove_socket_id);
}
}
return NULL;
}

void Controller::HandleSendFailed() {
if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
Expand Down Expand Up @@ -1179,6 +1226,11 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Tag the socket so that when the response comes back, the parser will
// stop before reading all body.
_current_call.sending_sock->read_will_be_progressive(_connection_type);
auto socket_id = _current_call.sending_sock->id();
if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek(socket_id) == NULL) {
_checking_progressive_read_fds.insert(socket_id);
LOG(INFO) << "insert the progressive read fd : " << socket_id << " socket fds size : " << _checking_progressive_read_fds.size();
}
}

// Handle authentication
Expand Down
25 changes: 22 additions & 3 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
#include "brpc/rpc_dump.h"

// EAUTH is defined in MAC
#ifndef EAUTH
#define EAUTH ERPCAUTH
Expand Down Expand Up @@ -163,7 +162,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
uint64_t log_id;
std::string request_id;
};

static void* HandleIdleProgressiveReader(void* arg);
public:
Controller();
Controller(const Inheritable& parent_ctx);
Expand All @@ -177,6 +176,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Set/get timeout in milliseconds for the RPC call. Use
// ChannelOptions.timeout_ms on unset.
void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms);
int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; }

void set_timeout_ms(int64_t timeout_ms);
int64_t timeout_ms() const { return _timeout_ms; }

Expand Down Expand Up @@ -323,7 +325,19 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
void response_will_be_read_progressively() {
if(has_flag(FLAGS_READ_PROGRESSIVELY) && _progressive_read_idle_tid > 0) {
return;
}
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _bthread_tag;
if(bthread_start_background(&_progressive_read_idle_tid, &tmp, HandleIdleProgressiveReader, this) != 0){
LOG(FATAL) << "Failed to start controller bthread id : " << _progressive_read_idle_tid;
}
LOG(INFO) << "Start Response progressive reader idle checker close idle_tid : " << _progressive_read_idle_tid
<< " _bthread_tag : " << _bthread_tag;
add_flag(FLAGS_READ_PROGRESSIVELY);
}
// Make the RPC end when the HTTP request has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
Expand Down Expand Up @@ -837,6 +851,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
int32_t _progressive_read_timeout_ms;
butil::FlatSet<SocketId> _checking_progressive_read_fds;
bthread_t _progressive_read_idle_tid;
// Controller belongs to this tag
bthread_tag_t _bthread_tag = bthread_self_tag();
// Priority: `_backup_request_policy' > `_backup_request_ms'.
BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real timeout for current call
Expand Down
Loading