Skip to content

Commit c6f2dbc

Browse files
author
yanyuan06
committed
surpport tag for selective channel
1 parent c7973d0 commit c6f2dbc

File tree

3 files changed

+26
-17
lines changed

3 files changed

+26
-17
lines changed

src/brpc/controller.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ void Controller::Call::OnComplete(
862862
}
863863
}
864864

865-
if (need_feedback) {
865+
if (need_feedback && c->_lb) {
866866
const LoadBalancer::CallInfo info =
867867
{ begin_time_us, peer_id, error_code, c };
868868
c->_lb->Feedback(info);

src/brpc/selective_channel.cpp

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ class ChannelBalancer : public SharedLoadBalancer {
8383
ChannelBalancer() {}
8484
~ChannelBalancer();
8585
int Init(const char* lb_name);
86-
int AddChannel(ChannelBase* sub_channel,
86+
int AddChannel(ChannelBase* sub_channel, const std::string& tag,
8787
SelectiveChannel::ChannelHandle* handle);
88-
void RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle);
88+
void RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle);
8989
int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out);
9090
int CheckHealth();
9191
void Describe(std::ostream& os, const DescribeOptions&);
@@ -168,7 +168,7 @@ int ChannelBalancer::Init(const char* lb_name) {
168168
return SharedLoadBalancer::Init(lb_name);
169169
}
170170

171-
int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
171+
int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag,
172172
SelectiveChannel::ChannelHandle* handle) {
173173
if (NULL == sub_channel) {
174174
LOG(ERROR) << "Parameter[sub_channel] is NULL";
@@ -206,7 +206,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
206206
<< sock_id << " is disabled";
207207
return -1;
208208
}
209-
if (!AddServer(ServerId(sock_id))) {
209+
if (!AddServer(ServerId(sock_id, tag))) {
210210
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
211211
// sub_chan will be deleted when the socket is recycled.
212212
ptr->SetFailed();
@@ -217,17 +217,18 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
217217
// The health-check-related reference has been held on created.
218218
_chan_map[sub_channel]= ptr.get();
219219
if (handle) {
220-
*handle = sock_id;
220+
handle->id = sock_id;
221+
handle->tag = tag;
221222
}
222223
return 0;
223224
}
224225

225-
void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle) {
226-
if (!RemoveServer(ServerId(handle))) {
226+
void ChannelBalancer::RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle) {
227+
if (!RemoveServer(ServerId(handle.id, handle.tag))) {
227228
return;
228229
}
229230
SocketUniquePtr ptr;
230-
const int rc = Socket::AddressFailedAsWell(handle, &ptr);
231+
const int rc = Socket::AddressFailedAsWell(handle.id, &ptr);
231232
if (rc >= 0) {
232233
SubChannel* sub = static_cast<SubChannel*>(ptr->user());
233234
{
@@ -311,8 +312,6 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
311312
_main_cntl->SetFailed(rc, "Fail to select channel, %s", berror(rc));
312313
return -1;
313314
}
314-
DLOG(INFO) << "Selected channel=" << sel_out.channel() << ", size="
315-
<< (_main_cntl->_accessed ? _main_cntl->_accessed->size() : 0);
316315
_main_cntl->_current_call.need_feedback = sel_out.need_feedback;
317316
_main_cntl->_current_call.peer_id = sel_out.fake_sock->id();
318317

@@ -340,7 +339,7 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
340339
// Forward request attachment to the subcall
341340
sub_cntl->request_attachment().append(_main_cntl->request_attachment());
342341
sub_cntl->http_request() = _main_cntl->http_request();
343-
342+
344343
sel_out.channel()->CallMethod(_main_cntl->_method,
345344
&r.sub_done->_cntl,
346345
_request,
@@ -534,16 +533,22 @@ bool SelectiveChannel::initialized() const {
534533

535534
int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
536535
ChannelHandle* handle) {
536+
return AddChannel(sub_channel, "", handle);
537+
}
538+
539+
int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
540+
const std::string& tag,
541+
ChannelHandle* handle) {
537542
schan::ChannelBalancer* lb =
538-
static_cast<schan::ChannelBalancer*>(_chan._lb.get());
543+
static_cast<schan::ChannelBalancer*>(_chan._lb.get());
539544
if (lb == NULL) {
540545
LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel";
541546
return -1;
542547
}
543-
return lb->AddChannel(sub_channel, handle);
548+
return lb->AddChannel(sub_channel, tag, handle);
544549
}
545550

546-
void SelectiveChannel::RemoveAndDestroyChannel(ChannelHandle handle) {
551+
void SelectiveChannel::RemoveAndDestroyChannel(const ChannelHandle& handle) {
547552
schan::ChannelBalancer* lb =
548553
static_cast<schan::ChannelBalancer*>(_chan._lb.get());
549554
if (lb == NULL) {

src/brpc/selective_channel.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ namespace brpc {
5151
// in `done'.
5252
class SelectiveChannel : public ChannelBase/*non-copyable*/ {
5353
public:
54-
typedef SocketId ChannelHandle;
54+
struct ChannelHandle {
55+
SocketId id;
56+
std::string tag;
57+
};
5558

5659
SelectiveChannel();
5760
~SelectiveChannel();
@@ -67,9 +70,10 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ {
6770
// NOTE: Different from pchan, schan can add channels at any time.
6871
// Returns 0 on success, -1 otherwise.
6972
int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle);
73+
int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle);
7074

7175
// Remove and destroy the sub_channel associated with `handle'.
72-
void RemoveAndDestroyChannel(ChannelHandle handle);
76+
void RemoveAndDestroyChannel(const ChannelHandle& handle);
7377

7478
// Send request by a sub channel. schan may retry another sub channel
7579
// according to retrying/backup-request settings.

0 commit comments

Comments
 (0)