Skip to content

feat: implement UpdateColdDataFlowCtrGroupConfigSubCommand (#6270)#6329

Merged
mxsm merged 2 commits intomxsm:mainfrom
magogosora:feature/issue-6270-implement-UpdateColdDataFlowCtrGroupConfig
Feb 15, 2026
Merged

feat: implement UpdateColdDataFlowCtrGroupConfigSubCommand (#6270)#6329
mxsm merged 2 commits intomxsm:mainfrom
magogosora:feature/issue-6270-implement-UpdateColdDataFlowCtrGroupConfig

Conversation

@magogosora
Copy link
Contributor

@magogosora magogosora commented Feb 14, 2026

Which Issue(s) This PR Fixes(Closes)

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features
    • Added support for updating cold data flow control group configuration on brokers.
    • New CLI command to manage cold data flow control settings for a broker or entire cluster.
    • Client and admin APIs implemented to send update requests to brokers.
    • Configuration updates can be applied in parallel across multiple brokers in a cluster.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@magogosora 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@rocketmq-rust-robot rocketmq-rust-robot added Difficulty level/Moderate Moderate difficult ISSUE feature🚀 Suggest an idea for this project. labels Feb 14, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 14, 2026

Walkthrough

Adds end-to-end support for UpdateColdDataFlowCtrGroupConfig: new admin CLI subcommand, client API call, and broker request handler that parses properties and returns structured errors (implementation hook for controller integration remains TODO).

Changes

Cohort / File(s) Summary
Broker request handler
rocketmq-broker/src/processor/admin_broker_processor.rs, rocketmq-broker/src/processor/admin_broker_processor/update_cold_data_flow_ctr_group_config.rs
Adds UpdateColdDataFlowCtrGroupConfigRequestHandler, wires it into AdminBrokerProcessor, and routes RequestCode::UpdateColdDataFlowCtrConfig to the new handler. Handler parses properties, validates body, and currently returns SystemError for unimplemented controller integration.
Client API layer
rocketmq-client/src/admin/default_mq_admin_ext_impl.rs, rocketmq-client/src/implementation/mq_client_api_impl.rs
Implements update_cold_data_flow_ctr_group_config in admin impl and MQClientAPIImpl. Converts properties to request body, constructs RemotingCommand, sends to broker (VIP routing respected), and maps non-success responses to errors.
Admin tool / CLI
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs, rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs, rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/update_cold_data_flow_ctr_group_config_sub_command.rs
Adds new CLI subcommand updateColdDataFlowCtrGroupConfig, registers it in BrokerCommands, and implements execution to target a single broker or a cluster (per-broker updates, concurrent cluster updates, aggregated failure reporting).

Sequence Diagram

sequenceDiagram
    participant AdminCLI as Admin CLI
    participant SubCmd as UpdateColdDataFlowCtrGroupConfig<br/>SubCommand
    participant MQAdminExt as DefaultMQAdminExt
    participant ClientAPI as MQClientAPIImpl
    participant Broker as AdminBrokerProcessor

    AdminCLI->>SubCmd: execute(target, consumer_group, threshold)
    SubCmd->>SubCmd: validate & build properties map
    SubCmd->>MQAdminExt: start / call update_cold_data_flow_ctr_group_config
    MQAdminExt->>ClientAPI: update_cold_data_flow_ctr_group_config(broker_addr, properties)
    ClientAPI->>ClientAPI: serialize properties -> body, build RemotingCommand
    ClientAPI->>Broker: send RemotingCommand (VIP channel if enabled)
    Broker->>Broker: parse body -> properties, validate
    Broker-->>ClientAPI: return response (OK / InvalidParameter / SystemError)
    ClientAPI-->>MQAdminExt: propagate result/error
    MQAdminExt-->>SubCmd: return outcome
    SubCmd->>SubCmd: aggregate/log per-broker results
    SubCmd-->>AdminCLI: report success/failures
Loading

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 I hopped through code, both near and far,
Commands and handlers now join the star,
Brokers listen, clients send,
Admins tune thresholds end to end,
A tiny rabbit cheers: success, hurrah! 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Merge Conflict Detection ⚠️ Warning ❌ Merge conflicts detected (11 files):

⚔️ rocketmq-broker/src/broker_runtime.rs (content)
⚔️ rocketmq-broker/src/processor/admin_broker_processor.rs (content)
⚔️ rocketmq-client/src/admin/default_mq_admin_ext_impl.rs (content)
⚔️ rocketmq-client/src/base/client_config.rs (content)
⚔️ rocketmq-client/src/implementation/mq_client_api_impl.rs (content)
⚔️ rocketmq-client/src/producer/default_mq_producer.rs (content)
⚔️ rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (content)
⚔️ rocketmq-example/Cargo.toml (content)
⚔️ rocketmq-remoting/src/protocol/namespace_util.rs (content)
⚔️ rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs (content)
⚔️ rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs (content)

These conflicts must be resolved before merging into main.
Resolve conflicts locally and push changes to this branch.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main feature implemented: UpdateColdDataFlowCtrGroupConfigSubCommand, a new broker command for managing cold data flow control configuration.
Linked Issues check ✅ Passed The PR implements all key objectives from issue #6270: the UpdateColdDataFlowCtrGroupConfig command in broker_commands module with support for updating flow control thresholds, rate limits, and priority levels for consumer groups.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing UpdateColdDataFlowCtrGroupConfig functionality across broker, client, and admin components with no unrelated modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
⚔️ Resolve merge conflicts (beta)
  • Auto-commit resolved conflicts to branch feature/issue-6270-implement-UpdateColdDataFlowCtrGroupConfig
  • Post resolved changes as copyable diffs in a comment

No actionable comments were generated in the recent review. 🎉

🧹 Recent nitpick comments
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/update_cold_data_flow_ctr_group_config_sub_command.rs (1)

168-177: Avoid unnecessary cloning: use into_iter() instead of iter() + clone().

The join_all result vector is consumed only here, so into_iter() lets you take ownership directly without cloning each Result.

Suggested fix
             .await
-                .iter()
-                .filter_map(|result| result.clone().err())
+                .into_iter()
+                .filter_map(|result| result.err())
                 .collect();

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In
`@rocketmq-broker/src/processor/admin_broker_processor/update_cold_data_flow_ctr_group_config.rs`:
- Around line 55-62: The handler currently parses the request body but always
returns a SystemError stub (response.set_code(ResponseCode::SystemError))
instead of performing the update; replace the stub by retrieving the broker
controller/runtime, fetch the ColdDataCgCtrService from broker_runtime, call the
appropriate update method on ColdDataCgCtrService using the parsed properties
(from mix_all::string_to_properties) and set a successful response code/remark
on success, and on missing service return a clear unsupported response (not
SystemError) explaining ColdDataCgCtrService is not configured; ensure you
update the code paths in update_cold_data_flow_ctr_group_config.rs to use the
real service rather than the placeholder.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/update_cold_data_flow_ctr_group_config_sub_command.rs`:
- Around line 95-157: The error messages are misleading: update the "Failed to
get user for brokers ..." message used in
UpdateColdDataFlowCtrGroupConfigSubCommand to accurately describe the failure
(e.g., "Failed to get broker addresses for brokers {addrs}") and in
update_config_for_cluster's Err branch include the cluster_name in the formatted
message (e.g., "Failed to fetch broker addresses for cluster {cluster_name}:
{e}"); locate the strings in the UpdateColdDataFlowCtrGroupConfigSubCommand flow
and the update_config_for_cluster function and replace the incorrect/missing
text while keeping existing context and error variable interpolation.
- Around line 32-67: The command currently only accepts a raw threshold string
and forwards it; update the UpdateColdDataFlowCtrGroupConfigSubCommand struct to
add new optional CLI args (e.g., --rate and --priority) and change threshold to
be parsed into a numeric type, then validate it in execute (parse threshold to
u64 or f64, reject parse errors and negative/zero where appropriate), and insert
validated values into the properties HashMap with explicit keys (e.g.,
consumerGroup, threshold, rate, priority) before sending to the broker; make
sure validation errors return RocketMQError::IllegalArgument from execute so
invalid inputs are rejected early.

Comment on lines +55 to +62
let body = mix_all::string_to_properties(&String::from_utf8_lossy(body));
match body {
Some(_body) => {
// TODO get broker controller and do operations
Ok(Some(response.set_code(ResponseCode::SystemError).set_remark(
"UpdateColdDataFlowCtrGroupConfig not implemented: ColdDataCgCtrService not configured in \
broker_runtime",
)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handler is a stub: valid requests always return SystemError. Line 55–62 returns “not implemented” for any well-formed body, so the new admin command can never succeed. Please wire this to the ColdDataCgCtrService (or equivalent) and apply the updates, or gate the request with a clear unsupported response until implementation is ready.

🤖 Prompt for AI Agents
In
`@rocketmq-broker/src/processor/admin_broker_processor/update_cold_data_flow_ctr_group_config.rs`
around lines 55 - 62, The handler currently parses the request body but always
returns a SystemError stub (response.set_code(ResponseCode::SystemError))
instead of performing the update; replace the stub by retrieving the broker
controller/runtime, fetch the ColdDataCgCtrService from broker_runtime, call the
appropriate update method on ColdDataCgCtrService using the parsed properties
(from mix_all::string_to_properties) and set a successful response code/remark
on success, and on missing service return a clear unsupported response (not
SystemError) explaining ColdDataCgCtrService is not configured; ensure you
update the code paths in update_cold_data_flow_ctr_group_config.rs to use the
real service rather than the placeholder.

Comment on lines 32 to 67
#[derive(Debug, Clone, Parser)]
#[command(group(ArgGroup::new("target")
.required(true)
.args(&["broker_addr", "cluster_name"]))
)]
pub struct UpdateColdDataFlowCtrGroupConfigSubCommand {
#[arg(short = 'b', long = "brokerAddr", required = false, help = "update which broker")]
broker_addr: Option<String>,

#[arg(short = 'c', long = "clusterName", required = false, help = "update which cluster")]
cluster_name: Option<String>,

#[arg(
short = 'g',
long = "consumerGroup",
required = true,
help = "specific consumerGroup"
)]
consumer_group: String,

#[arg(short = 'v', long = "threshold", required = true, help = "cold read threshold value")]
threshold: String,
}

impl CommandExecute for UpdateColdDataFlowCtrGroupConfigSubCommand {
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
let target = Target::new(&self.cluster_name, &self.broker_addr).map_err(|_| {
RocketMQError::IllegalArgument(
"UpdateColdDataFlowCtrGroupConfigSubCommand: Specify exactly one of --brokerAddr (-b) or \
--clusterName (-c)"
.into(),
)
})?;
let mut properties = HashMap::<CheetahString, CheetahString>::new();
properties.insert(self.consumer_group.clone().into(), self.threshold.clone().into());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing validation and config fields vs. intended command capabilities. Line 32–67 only accepts consumerGroup + threshold and forwards the raw string; there’s no validation or support for cold-data rate limits / priority. This falls short of the stated objectives and risks sending invalid config to the broker. Consider parsing threshold to a numeric type (reject invalid/negative) and adding CLI args + properties for rate/priority.

🔧 Minimal validation example
-        let mut properties = HashMap::<CheetahString, CheetahString>::new();
-        properties.insert(self.consumer_group.clone().into(), self.threshold.clone().into());
+        let threshold: u64 = self.threshold.parse().map_err(|_| {
+            RocketMQError::IllegalArgument("threshold must be a non-negative integer".into())
+        })?;
+        let mut properties = HashMap::<CheetahString, CheetahString>::new();
+        properties.insert(self.consumer_group.clone().into(), threshold.to_string().into());
🤖 Prompt for AI Agents
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/update_cold_data_flow_ctr_group_config_sub_command.rs`
around lines 32 - 67, The command currently only accepts a raw threshold string
and forwards it; update the UpdateColdDataFlowCtrGroupConfigSubCommand struct to
add new optional CLI args (e.g., --rate and --priority) and change threshold to
be parsed into a numeric type, then validate it in execute (parse threshold to
u64 or f64, reject parse errors and negative/zero where appropriate), and insert
validated values into the properties HashMap with explicit keys (e.g.,
consumerGroup, threshold, rate, priority) before sending to the broker; make
sure validation errors return RocketMQError::IllegalArgument from execute so
invalid inputs are rejected early.

@codecov
Copy link

codecov bot commented Feb 14, 2026

Codecov Report

❌ Patch coverage is 0% with 169 lines in your changes missing coverage. Please review.
✅ Project coverage is 42.57%. Comparing base (870da38) to head (e128d0f).
⚠️ Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
...ate_cold_data_flow_ctr_group_config_sub_command.rs 0.00% 98 Missing ⚠️
...rocessor/update_cold_data_flow_ctr_group_config.rs 0.00% 31 Missing ⚠️
...mq-client/src/implementation/mq_client_api_impl.rs 0.00% 24 Missing ⚠️
...tmq-broker/src/processor/admin_broker_processor.rs 0.00% 6 Missing ⚠️
...etmq-client/src/admin/default_mq_admin_ext_impl.rs 0.00% 6 Missing ⚠️
...ketmq-admin-core/src/admin/default_mq_admin_ext.rs 0.00% 3 Missing ⚠️
...ocketmq-admin-core/src/commands/broker_commands.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6329      +/-   ##
==========================================
- Coverage   42.62%   42.57%   -0.05%     
==========================================
  Files         912      914       +2     
  Lines      128038   128224     +186     
==========================================
+ Hits        54570    54595      +25     
- Misses      73468    73629     +161     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - All CI checks passed ✅

Copy link
Owner

@mxsm mxsm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mxsm mxsm merged commit 2c51c19 into mxsm:main Feb 15, 2026
8 of 13 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Feb 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge Difficulty level/Moderate Moderate difficult ISSUE feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement UpdateColdDataFlowCtrGroupConfig command in rocketmq-admin-core

4 participants