Skip to content

Add gflag to manage ConcurrencyRemover lifecycle for CallAfterRpcResp#3309

Open
feng-y wants to merge 1 commit into
apache:masterfrom
feng-y:feature/concurrency-remover-manages-after-rpc-resp
Open

Add gflag to manage ConcurrencyRemover lifecycle for CallAfterRpcResp#3309
feng-y wants to merge 1 commit into
apache:masterfrom
feng-y:feature/concurrency-remover-manages-after-rpc-resp

Conversation

@feng-y
Copy link
Copy Markdown
Contributor

@feng-y feng-y commented May 25, 2026

Problem

In SendRpcResponse, ConcurrencyRemover was destroyed before CallAfterRpcResp was called. This means the concurrency control didn't cover the after-response callback, which could lead to:

  • Inaccurate concurrency tracking
  • Incorrect latency measurements
  • Premature resource release if callbacks hold resources

Solution

Add a gflag that is automatically captured to controller when set_after_rpc_resp_fn() is called. When the flag is true, ConcurrencyRemover's lifetime is extended to cover CallAfterRpcResp.

Key Design

  • Global gflag: FLAGS_concurrency_remover_manages_after_rpc_resp (default: false)
  • Controller-level flag: Captured from gflag when set_after_rpc_resp_fn() is called
  • Protocol uses controller flag: Decoupled from global gflag
  • Applied to baidu_rpc_protocol: HTTP protocol not modified due to its more complex async flow

Behavior

  • flag=false (default): Original behavior - ConcurrencyRemover is released before CallAfterRpcResp via explicit reset()
  • flag=true (when gflag enabled and callback set): ConcurrencyRemover lives until the end of BRPC_SCOPE_EXIT, covering the entire response lifecycle including after-response callbacks

Implementation Details

// In controller.cpp
void Controller::set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) {
    _after_rpc_resp_fn = fn;
    // Capture gflag value to controller instance
    _concurrency_remover_manages_after_rpc_resp = FLAGS_concurrency_remover_manages_after_rpc_resp;
}

Uses unique_ptr with explicit reset() for clear control flow:

  • Always allocate ConcurrencyRemover via unique_ptr
  • Default path: explicitly reset() before CallAfterRpcResp
  • Extended path: let unique_ptr naturally destruct at scope end

Backward Compatibility

✅ Default behavior unchanged (flag defaults to false)
✅ No impact on existing deployments
✅ Opt-in via gflag when users need accurate concurrency tracking

Scope

  • ✅ Modified: baidu_rpc_protocol
  • ⏸️ Not modified: http_rpc_protocol (can be addressed separately if needed due to its more complex async flow)

Addresses Review Feedback

  • ✅ Kept global gflag for configuration
  • ✅ Flag is captured to controller in set_after_rpc_resp_fn()
  • ✅ Protocol layer uses controller flag instead of global gflag

@yanglimingcn yanglimingcn requested a review from chenBright May 25, 2026 11:34
@yanglimingcn
Copy link
Copy Markdown
Contributor

Looking at the previous code, concurrency_remover was executed only after all responses had finished executing. Therefore, I think it's sufficient to keep it consistent with the initial code; there's no need to add a flag.

@feng-y
Copy link
Copy Markdown
Contributor Author

feng-y commented May 25, 2026

Thanks. My concern is that removing the gflag would not just drop configurability, but would also change the default semantics.

ConcurrencyRemover's destructor calls MethodStatus::OnResponded(...), so changing its lifetime affects not only when concurrency is released, but also the latency accounting window. If it is made to cover CallAfterRpcResp unconditionally, method_status latency would also start including the after-response callback. I think that semantic change should be evaluated explicitly as well.

@wwbmmm
Copy link
Copy Markdown
Contributor

wwbmmm commented May 26, 2026

Thanks. My concern is that removing the gflag would not just drop configurability, but would also change the default semantics.

ConcurrencyRemover's destructor calls MethodStatus::OnResponded(...), so changing its lifetime affects not only when concurrency is released, but also the latency accounting window. If it is made to cover CallAfterRpcResp unconditionally, method_status latency would also start including the after-response callback. I think that semantic change should be evaluated explicitly as well.

I agree to add a flag.

Copy link
Copy Markdown
Contributor

@chenBright chenBright left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HTTP protocol also needs to be updated.

@chenBright
Copy link
Copy Markdown
Contributor

I agree to add a flag.

I think it's more appropriate to set the flag in the controller using set_after_rpc_resp_fn.

brpc/src/brpc/controller.h

Lines 624 to 626 in a47e349

void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) { _after_rpc_resp_fn = fn; }
void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res);

@feng-y feng-y force-pushed the feature/concurrency-remover-manages-after-rpc-resp branch from 4c47f7a to fd71081 Compare May 27, 2026 02:55
@feng-y
Copy link
Copy Markdown
Contributor Author

feng-y commented May 27, 2026

Thanks for the feedback! I've updated the implementation based on your suggestions:

Changes Made

  1. Removed global gflag - Now uses a controller-level flag instead
  2. Automatic flag setting - The flag is automatically set to true when set_after_rpc_resp_fn() is called
  3. Updated HTTP protocol - Applied the same logic to http_rpc_protocol.cpp

How It Works

When a user sets an after-response callback:

controller.set_after_rpc_resp_fn([](Controller* cntl, const Message* req, const Message* res) {
    // Custom callback logic
});
// Flag is automatically set to true here

The ConcurrencyRemover will then automatically manage the lifecycle to cover the callback execution.

Benefits

  • ✅ No global configuration needed
  • ✅ Automatic opt-in based on usage
  • ✅ Backward compatible (default behavior unchanged)
  • ✅ Consistent across both baidu_rpc and http protocols

Please let me know if there are any other concerns!

In SendRpcResponse, ConcurrencyRemover was destroyed before CallAfterRpcResp
was called, meaning concurrency control didn't cover the after-response callback.
This could lead to inaccurate concurrency tracking and latency measurements.

This change adds FLAGS_concurrency_remover_manages_after_rpc_resp (default: false)
and automatically sets it to controller when set_after_rpc_resp_fn is called.

Implementation:
- Add _concurrency_remover_manages_after_rpc_resp flag to Controller
- In set_after_rpc_resp_fn(), read gflag value and set to controller instance
- In baidu_rpc_protocol, use controller flag instead of global gflag
- Use unique_ptr with explicit reset() for clear control flow

When false (default): Original behavior - ConcurrencyRemover is released before
CallAfterRpcResp via explicit reset().

When true (gflag enabled when callback set): ConcurrencyRemover lives until the
end of BRPC_SCOPE_EXIT, covering the entire response lifecycle.

Note: HTTP protocol not modified in this change due to its more complex async
flow. Can be addressed separately if needed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@feng-y feng-y force-pushed the feature/concurrency-remover-manages-after-rpc-resp branch from fd71081 to eed1207 Compare May 27, 2026 03:07
@feng-y
Copy link
Copy Markdown
Contributor Author

feng-y commented May 27, 2026

I've updated the implementation based on the feedback:

Current Implementation

✅ What Changed

  1. Kept the gflag: FLAGS_concurrency_remover_manages_after_rpc_resp (default: false)
  2. Capture to controller: In set_after_rpc_resp_fn(), the gflag value is captured to the controller instance
  3. Protocol uses controller flag: baidu_rpc_protocol now uses cntl->concurrency_remover_manages_after_rpc_resp() instead of the global gflag

🎯 How It Works

// 1. User optionally sets global gflag
// -concurrency_remover_manages_after_rpc_resp=true

// 2. User sets callback (in service implementation)
controller.set_after_rpc_resp_fn([](Controller* cntl, ...) {
    // At this point, the gflag value is captured to controller
    // _concurrency_remover_manages_after_rpc_resp = FLAGS_concurrency_remover_manages_after_rpc_resp
});

// 3. In SendRpcResponse
if (!cntl->concurrency_remover_manages_after_rpc_resp()) {
    concurrency_remover_ptr.reset();  // Release early (default)
}

📝 Scope

  • Modified: baidu_rpc_protocol only
  • Not modified: http_rpc_protocol
    • HTTP has a more complex async flow (HttpResponseSenderAsDone)
    • Can be addressed separately if needed
    • Wanted to keep this PR focused and low-risk

✅ Benefits

  • Global configuration via gflag
  • Per-controller instance capture (no race conditions)
  • Backward compatible (default behavior unchanged)
  • Minimal changes (only 3 files, +19 -3 lines)

Let me know if you'd like me to also update HTTP protocol, or if we should handle that separately!

Comment thread src/brpc/controller.cpp
void Controller::set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) {
_after_rpc_resp_fn = fn;
// Set the flag from global gflag when after_rpc_resp_fn is set
_concurrency_remover_manages_after_rpc_resp = FLAGS_concurrency_remover_manages_after_rpc_resp;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not rely on gflag, but rely on the Controller's _flags.

uint32_t _flags; // all boolean fields inside Controller

brpc/src/brpc/controller.h

Lines 133 to 154 in f97f23e

static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
static const uint32_t FLAGS_ADDED_CONCURRENCY = (1 << 2);
static const uint32_t FLAGS_READ_PROGRESSIVELY = (1 << 3);
static const uint32_t FLAGS_PROGRESSIVE_READER = (1 << 4);
static const uint32_t FLAGS_BACKUP_REQUEST = (1 << 5);
// Let _done delete the correlation_id, used by combo channels to
// make lifetime of the correlation_id more flexible.
static const uint32_t FLAGS_DESTROY_CID_IN_DONE = (1 << 7);
static const uint32_t FLAGS_CLOSE_CONNECTION = (1 << 8);
static const uint32_t FLAGS_LOG_ID = (1 << 9); // log_id is set
static const uint32_t FLAGS_REQUEST_CODE = (1 << 10);
static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11);
static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants