Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/brpc/amf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ void WriteAMFObject(const google::protobuf::Message& message,
continue;
}
}
const std::string& name = field->name();
const auto& name = field->name();
if (name.size() >= 65536u) {
LOG(ERROR) << "name is too long!";
return stream->set_bad();
Expand Down
11 changes: 7 additions & 4 deletions src/brpc/builtin/protobufs_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@


#include <google/protobuf/descriptor.h> // ServiceDescriptor

#include "protobufs_service.h"

#include "brpc/controller.h" // Controller
#include "brpc/server.h" // Server
#include "brpc/closure_guard.h" // ClosureGuard
#include "brpc/details/method_status.h"// MethodStatus
#include "brpc/builtin/protobufs_service.h"
#include "brpc/builtin/common.h"

#include "butil/strings/string_util.h"

namespace brpc {

Expand All @@ -42,7 +45,7 @@ int ProtobufsService::Init() {
}
const google::protobuf::ServiceDescriptor* d =
iter->second.service->GetDescriptor();
_map[d->full_name()] = d->DebugString();
_map[butil::EnsureString(d->full_name())] = d->DebugString();
const int method_count = d->method_count();
for (int j = 0; j < method_count; ++j) {
const google::protobuf::MethodDescriptor* md = d->method(j);
Expand All @@ -53,13 +56,13 @@ int ProtobufsService::Init() {
while (!stack.empty()) {
const google::protobuf::Descriptor* d = stack.back();
stack.pop_back();
_map[d->full_name()] = d->DebugString();
_map[butil::EnsureString(d->full_name())] = d->DebugString();
for (int i = 0; i < d->field_count(); ++i) {
const google::protobuf::FieldDescriptor* f = d->field(i);
if (f->type() == google::protobuf::FieldDescriptor::TYPE_MESSAGE ||
f->type() == google::protobuf::FieldDescriptor::TYPE_GROUP) {
const google::protobuf::Descriptor* sub_d = f->message_type();
if (sub_d != d && _map.find(sub_d->full_name()) == _map.end()) {
if (sub_d != d && _map.find(butil::EnsureString(sub_d->full_name())) == _map.end()) {
stack.push_back(sub_d);
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,17 +492,17 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,

if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
const int64_t start_send_us = butil::cpuwide_time_us();
const std::string* method_name = NULL;
std::string method_name;
if (_get_method_name) {
method_name = &_get_method_name(method, cntl);
method_name = butil::EnsureString(_get_method_name(method, cntl));
} else if (method) {
method_name = &method->full_name();
method_name = butil::EnsureString(method->full_name());
} else {
const static std::string NULL_METHOD_STR = "null-method";
method_name = &NULL_METHOD_STR;
method_name = NULL_METHOD_STR;
}
Span* span = Span::CreateClientSpan(
*method_name, start_send_real_us - start_send_us);
method_name, start_send_real_us - start_send_us);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
Expand Down
10 changes: 6 additions & 4 deletions src/brpc/nshead_pb_service_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
#include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/message.h> // Message

#include "butil/time.h"
#include "butil/iobuf.h" // butil::IOBuf
#include "nshead_pb_service_adaptor.h"

#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/span.h"
#include "brpc/details/server_private_accessor.h"
#include "brpc/details/controller_private_accessor.h"
#include "brpc/nshead_pb_service_adaptor.h"
#include "brpc/policy/most_common_message.h"

#include "butil/iobuf.h" // butil::IOBuf
#include "butil/strings/string_util.h"
#include "butil/time.h"


namespace brpc {

Expand Down Expand Up @@ -126,7 +128,7 @@ void NsheadPbServiceAdaptor::ProcessNsheadRequest(
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
ControllerPrivateAccessor(controller).set_method(method);
done->SetMethodName(method->full_name());
done->SetMethodName(butil::EnsureString(method->full_name()));
pbdone->pbreq.reset(svc->GetRequestPrototype(method).New());
pbdone->pbres.reset(svc->GetResponsePrototype(method).New());

Expand Down
19 changes: 11 additions & 8 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/text_format.h>
#include "butil/logging.h" // LOG()

#include "butil/iobuf.h" // butil::IOBuf
#include "butil/raw_pack.h" // RawPacker RawUnpacker
#include "butil/logging.h" // LOG()
#include "butil/memory/scope_guard.h"
#include "butil/raw_pack.h" // RawPacker RawUnpacker
#include "butil/strings/string_util.h"

#include "json2pb/json_to_pb.h"
#include "json2pb/pb_to_json.h"
#include "brpc/controller.h" // Controller
Expand Down Expand Up @@ -233,7 +236,7 @@ static bool SerializeResponse(const google::protobuf::Message& res,
cntl.SetFailed(ERESPONSE,
"Fail to serialize response=%s, "
"ContentType=%s, CompressType=%s, ChecksumType=%s",
res.GetDescriptor()->full_name().c_str(),
butil::EnsureString(res.GetDescriptor()->full_name()).c_str(),
ContentTypeToCStr(content_type),
CompressTypeToCStr(compress_type),
ChecksumTypeToCStr(checksum_type));
Expand Down Expand Up @@ -775,7 +778,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
cntl->SetFailed(
ELIMIT,
"Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
butil::EnsureString(mp->method->full_name()).c_str(), rejected_cc);
break;
}
}
Expand All @@ -784,7 +787,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
accessor.set_method(method);

if (span) {
span->ResetServerSpanName(method->full_name());
span->ResetServerSpanName(butil::EnsureString(method->full_name()));
}

if (!server->AcceptRequest(cntl.get())) {
Expand Down Expand Up @@ -812,7 +815,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
EREQUEST,
"Fail to parse request=%s, ContentType=%s, "
"CompressType=%s, ChecksumType=%s, request_size=%d",
messages->Request()->GetDescriptor()->full_name().c_str(),
butil::EnsureString(messages->Request()->GetDescriptor()->full_name()).c_str(),
ContentTypeToCStr(content_type),
CompressTypeToCStr(compress_type),
ChecksumTypeToCStr(checksum_type), req_size);
Expand Down Expand Up @@ -996,7 +999,7 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
EREQUEST,
"Fail to parse response=%s, ContentType=%s, "
"CompressType=%s, ChecksumType=%s, request_size=%d",
cntl->response()->GetDescriptor()->full_name().c_str(),
butil::EnsureString(cntl->response()->GetDescriptor()->full_name()).c_str(),
ContentTypeToCStr(content_type),
CompressTypeToCStr(compress_type),
ChecksumTypeToCStr(checksum_type), res_size);
Expand Down Expand Up @@ -1033,7 +1036,7 @@ void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl,
EREQUEST,
"Fail to compress request=%s, "
"ContentType=%s, CompressType=%s, ChecksumType=%s",
request->GetDescriptor()->full_name().c_str(),
butil::EnsureString(request->GetDescriptor()->full_name()).c_str(),
ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type),
ChecksumTypeToCStr(checksum_type));
}
Expand Down
47 changes: 28 additions & 19 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
#include <google/protobuf/text_format.h>
#include <gflags/gflags.h>
#include <string>

#include "brpc/policy/http_rpc_protocol.h"
#include "butil/unique_ptr.h" // std::unique_ptr
#include "butil/string_splitter.h" // StringMultiSplitter

#include "butil/string_printf.h"
#include "butil/time.h"
#include "butil/string_splitter.h" // StringMultiSplitter
#include "butil/strings/string_util.h"
#include "butil/sys_byteorder.h"
#include "butil/time.h"
#include "butil/unique_ptr.h" // std::unique_ptr

#include "json2pb/pb_to_json.h" // ProtoMessageToJson
#include "json2pb/json_to_pb.h" // JsonToProtoMessage
#include "brpc/compress.h"
Expand Down Expand Up @@ -284,7 +288,7 @@ static bool JsonToProtoMessage(const butil::IOBuf& body,
bool ok = json2pb::JsonToProtoMessage(&wrapper, message, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to parse http json body as %s: %s",
message->GetDescriptor()->full_name().c_str(),
butil::EnsureString(message->GetDescriptor()->full_name()).c_str(),
error.c_str());
}
return ok;
Expand All @@ -305,7 +309,7 @@ static bool ProtoMessageToJson(const google::protobuf::Message& message,
bool ok = json2pb::ProtoMessageToJson(message, wrapper, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to convert %s to json: %s",
message.GetDescriptor()->full_name().c_str(),
butil::EnsureString(message.GetDescriptor()->full_name()).c_str(),
error.c_str());
}
return ok;
Expand All @@ -321,7 +325,7 @@ static bool ProtoJsonToProtoMessage(const butil::IOBuf& body,
bool ok = json2pb::ProtoJsonToProtoMessage(&wrapper, message, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to parse http proto-json body as %s: %s",
message->GetDescriptor()->full_name().c_str(),
butil::EnsureString(message->GetDescriptor()->full_name()).c_str(),
error.c_str());
}
return ok;
Expand All @@ -337,7 +341,7 @@ static bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
bool ok = json2pb::ProtoMessageToProtoJson(message, wrapper, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to convert %s to proto-json: %s",
message.GetDescriptor()->full_name().c_str(), error.c_str());
butil::EnsureString(message.GetDescriptor()->full_name()).c_str(), error.c_str());
}
return ok;
}
Expand Down Expand Up @@ -527,13 +531,13 @@ void ProcessHttpResponse(InputMessageBase* msg) {
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse content as %s",
cntl->response()->GetDescriptor()->full_name().c_str());
butil::EnsureString(cntl->response()->GetDescriptor()->full_name()).c_str());
break;
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!ParsePbTextFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content as %s",
cntl->response()->GetDescriptor()->full_name().c_str());
butil::EnsureString(cntl->response()->GetDescriptor()->full_name()).c_str());
break;
}
} else if (content_type == HTTP_CONTENT_JSON) {
Expand Down Expand Up @@ -612,13 +616,13 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
if (!pbreq->SerializeToZeroCopyStream(&wrapper)) {
cntl->request_attachment().clear();
return cntl->SetFailed(EREQUEST, "Fail to serialize %s",
pbreq->GetTypeName().c_str());
butil::EnsureString(pbreq->GetTypeName()).c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!google::protobuf::TextFormat::Print(*pbreq, &wrapper)) {
cntl->request_attachment().clear();
return cntl->SetFailed(EREQUEST, "Fail to print %s as proto-text",
pbreq->GetTypeName().c_str());
butil::EnsureString(pbreq->GetTypeName()).c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
if (!ProtoMessageToProtoJson(*pbreq, &wrapper, cntl, EREQUEST)) {
Expand Down Expand Up @@ -880,11 +884,13 @@ HttpResponseSender::~HttpResponseSender() {
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->response_attachment());
if (content_type == HTTP_CONTENT_PROTO) {
if (!res->SerializeToZeroCopyStream(&wrapper)) {
cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
cntl->SetFailed(ERESPONSE, "Fail to serialize %s",
butil::EnsureString(res->GetTypeName()).c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!google::protobuf::TextFormat::Print(*res, &wrapper)) {
cntl->SetFailed(ERESPONSE, "Fail to print %s as proto-text", res->GetTypeName().c_str());
cntl->SetFailed(ERESPONSE, "Fail to print %s as proto-text",
butil::EnsureString(res->GetTypeName()).c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
ProtoMessageToProtoJson(*res, &wrapper, cntl, ERESPONSE);
Expand Down Expand Up @@ -1535,7 +1541,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
cntl->request_attachment().swap(req_body);
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
if (span) {
span->ResetServerSpanName(md->full_name());
span->ResetServerSpanName(butil::EnsureString(md->full_name()));
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
Expand Down Expand Up @@ -1565,18 +1571,19 @@ void ProcessHttpRequest(InputMessageBase *msg) {
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = mp->status;
const std::string method_full_name = butil::EnsureString(mp->method->full_name());
resp_sender.set_method_status(method_status);
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
method_full_name.c_str(), rejected_cc);
return;
}
}

if (span) {
span->ResetServerSpanName(mp->method->full_name());
span->ResetServerSpanName(method_full_name);
}
// NOTE: accesses to builtin services are not counted as part of
// concurrency, therefore are not limited by ServerOptions.max_concurrency.
Expand Down Expand Up @@ -1616,6 +1623,8 @@ void ProcessHttpRequest(InputMessageBase *msg) {
google::protobuf::Message* req = messages->Request();
google::protobuf::Message* res = messages->Response();

const std::string request_full_name = butil::EnsureString(req->GetDescriptor()->full_name());

if (__builtin_expect(!req || !res, 0)) {
PLOG(FATAL) << "Fail to new req or res";
cntl->SetFailed("Fail to new req or res");
Expand All @@ -1632,7 +1641,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (!req->IsInitialized()) {
cntl->SetFailed(EREQUEST, "%s needs to be created from a"
" non-empty json, it has required fields.",
req->GetDescriptor()->full_name().c_str());
request_full_name.c_str());
return;
} // else all fields of the request are optional.
} else {
Expand Down Expand Up @@ -1677,13 +1686,13 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
req->GetDescriptor()->full_name().c_str());
request_full_name.c_str());
return;
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!ParsePbTextFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http proto-text body as %s",
req->GetDescriptor()->full_name().c_str());
request_full_name.c_str());
return;
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
Expand Down
10 changes: 7 additions & 3 deletions src/brpc/policy/hulu_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
#include <google/protobuf/message.h> // Message
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/io/coded_stream.h>

#include "butil/strings/string_util.h"
#include "butil/time.h"

#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
Expand Down Expand Up @@ -469,25 +472,26 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
// Switch to service-specific error.
non_service_error.release();
method_status = sp->status;
const google::protobuf::MethodDescriptor* method = sp->method;
const std::string method_full_name = butil::EnsureString(method->full_name());
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
method_full_name.c_str(), rejected_cc);
break;
}
}

google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);

if (!server->AcceptRequest(cntl.get())) {
break;
}

if (span) {
span->ResetServerSpanName(method->full_name());
span->ResetServerSpanName(method_full_name);
}
const int reqsize = msg->payload.length();
butil::IOBuf req_buf;
Expand Down
Loading
Loading