Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ void Controller::Call::OnComplete(
}
}

if (need_feedback) {
if (need_feedback && c->_lb) {
const LoadBalancer::CallInfo info =
{ begin_time_us, peer_id, error_code, c };
c->_lb->Feedback(info);
Expand Down
29 changes: 17 additions & 12 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ class ChannelBalancer : public SharedLoadBalancer {
ChannelBalancer() {}
~ChannelBalancer();
int Init(const char* lb_name);
int AddChannel(ChannelBase* sub_channel,
int AddChannel(ChannelBase* sub_channel, const std::string& tag,
SelectiveChannel::ChannelHandle* handle);
void RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle);
void RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle);
int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out);
int CheckHealth();
void Describe(std::ostream& os, const DescribeOptions&);
Expand Down Expand Up @@ -168,7 +168,7 @@ int ChannelBalancer::Init(const char* lb_name) {
return SharedLoadBalancer::Init(lb_name);
}

int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag,
SelectiveChannel::ChannelHandle* handle) {
if (NULL == sub_channel) {
LOG(ERROR) << "Parameter[sub_channel] is NULL";
Expand Down Expand Up @@ -206,7 +206,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
<< sock_id << " is disabled";
return -1;
}
if (!AddServer(ServerId(sock_id))) {
if (!AddServer(ServerId(sock_id, tag))) {
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
// sub_chan will be deleted when the socket is recycled.
ptr->SetFailed();
Expand All @@ -217,17 +217,18 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
// The health-check-related reference has been held on created.
_chan_map[sub_channel]= ptr.get();
if (handle) {
*handle = sock_id;
handle->id = sock_id;
handle->tag = tag;
}
return 0;
}

void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle) {
if (!RemoveServer(ServerId(handle))) {
void ChannelBalancer::RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle) {
if (!RemoveServer(ServerId(handle.id, handle.tag))) {
return;
}
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(handle, &ptr);
const int rc = Socket::AddressFailedAsWell(handle.id, &ptr);
if (rc >= 0) {
SubChannel* sub = static_cast<SubChannel*>(ptr->user());
{
Expand Down Expand Up @@ -311,8 +312,6 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
_main_cntl->SetFailed(rc, "Fail to select channel, %s", berror(rc));
return -1;
}
DLOG(INFO) << "Selected channel=" << sel_out.channel() << ", size="
<< (_main_cntl->_accessed ? _main_cntl->_accessed->size() : 0);
_main_cntl->_current_call.need_feedback = sel_out.need_feedback;
_main_cntl->_current_call.peer_id = sel_out.fake_sock->id();

Expand Down Expand Up @@ -534,16 +533,22 @@ bool SelectiveChannel::initialized() const {

int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
ChannelHandle* handle) {
return AddChannel(sub_channel, "", handle);
}

int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
const std::string& tag,
ChannelHandle* handle) {
schan::ChannelBalancer* lb =
static_cast<schan::ChannelBalancer*>(_chan._lb.get());
if (lb == NULL) {
LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel";
return -1;
}
return lb->AddChannel(sub_channel, handle);
return lb->AddChannel(sub_channel, tag, handle);
}

void SelectiveChannel::RemoveAndDestroyChannel(ChannelHandle handle) {
void SelectiveChannel::RemoveAndDestroyChannel(const ChannelHandle& handle) {
schan::ChannelBalancer* lb =
static_cast<schan::ChannelBalancer*>(_chan._lb.get());
if (lb == NULL) {
Expand Down
8 changes: 6 additions & 2 deletions src/brpc/selective_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ namespace brpc {
// in `done'.
class SelectiveChannel : public ChannelBase/*non-copyable*/ {
public:
typedef SocketId ChannelHandle;
struct ChannelHandle {
SocketId id;
std::string tag;
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking API change. The ChannelHandle type is changed from a simple typedef (SocketId) to a struct containing both id and tag. This breaks backward compatibility for existing code that treats ChannelHandle as a SocketId value. Users who pass ChannelHandle by value or compare it directly with SocketId will experience compilation errors. Consider whether a deprecation period or alternative approach would be more appropriate for this public API change.

Suggested change
std::string tag;
std::string tag;
// Backward-compatible constructors and conversion operators so that
// ChannelHandle can still be used where a SocketId was expected.
ChannelHandle()
: id(INVALID_SOCKET_ID) {}
ChannelHandle(SocketId sid)
: id(sid) {}
ChannelHandle(SocketId sid, const std::string& t)
: id(sid), tag(t) {}
// Implicit conversion to SocketId for existing code that treats
// ChannelHandle as a SocketId value.
operator SocketId() const { return id; }
// Assignment from SocketId for source compatibility.
ChannelHandle& operator=(SocketId sid) {
id = sid;
tag.clear();
return *this;
}

Copilot uses AI. Check for mistakes.
};

SelectiveChannel();
~SelectiveChannel();
Expand All @@ -67,9 +70,10 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ {
// NOTE: Different from pchan, schan can add channels at any time.
// Returns 0 on success, -1 otherwise.
int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle);
int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle);

// Remove and destroy the sub_channel associated with `handle'.
void RemoveAndDestroyChannel(ChannelHandle handle);
void RemoveAndDestroyChannel(const ChannelHandle& handle);
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature change from passing ChannelHandle by value to passing by const reference is a breaking API change. Existing code that calls RemoveAndDestroyChannel with a SocketId (which was the old ChannelHandle typedef) will fail to compile. Since ChannelHandle is now a struct, consider providing an overload that accepts just a SocketId for backward compatibility, or ensure all users are aware of this breaking change.

Copilot uses AI. Check for mistakes.

// Send request by a sub channel. schan may retry another sub channel
// according to retrying/backup-request settings.
Expand Down
Loading