Skip to content

Commit f166e70

Browse files
[ISSUE #6275]🚀Implement RemoveColdDataFlowCtrGroupConfig command in rocketmq-admin-core (#6348)
1 parent e3e6e71 commit f166e70

File tree

3 files changed

+184
-0
lines changed

3 files changed

+184
-0
lines changed

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ impl CommandExecute for ClassificationTablePrint {
197197
command: "getBrokerConfig",
198198
remark: "Get broker config by cluster or special broker.",
199199
},
200+
Command {
201+
category: "Broker",
202+
command: "removeColdDataFlowCtrGroupConfig",
203+
remark: "Remove consumer from cold ctr config.",
204+
},
200205
Command {
201206
category: "Broker",
202207
command: "resetMasterFlushOffset",

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ mod clean_expired_cq_sub_command;
1616
mod clean_unused_topic_sub_command;
1717
mod delete_expired_commit_log_sub_command;
1818
mod get_broker_config_sub_command;
19+
mod remove_cold_data_flow_ctr_group_config_sub_command;
1920
mod reset_master_flush_offset_sub_command;
2021
mod send_msg_status_sub_command;
2122
mod switch_timer_engine_sub_command;
@@ -32,6 +33,7 @@ use crate::commands::broker_commands::clean_expired_cq_sub_command::CleanExpired
3233
use crate::commands::broker_commands::clean_unused_topic_sub_command::CleanUnusedTopicSubCommand;
3334
use crate::commands::broker_commands::delete_expired_commit_log_sub_command::DeleteExpiredCommitLogSubCommand;
3435
use crate::commands::broker_commands::get_broker_config_sub_command::GetBrokerConfigSubCommand;
36+
use crate::commands::broker_commands::remove_cold_data_flow_ctr_group_config_sub_command::RemoveColdDataFlowCtrGroupConfigSubCommand;
3537
use crate::commands::broker_commands::reset_master_flush_offset_sub_command::ResetMasterFlushOffsetSubCommand;
3638
use crate::commands::broker_commands::send_msg_status_sub_command::SendMsgStatusSubCommand;
3739
use crate::commands::broker_commands::switch_timer_engine_sub_command::SwitchTimerEngineSubCommand;
@@ -69,6 +71,13 @@ pub enum BrokerCommands {
6971
)]
7072
GetBrokerConfig(GetBrokerConfigSubCommand),
7173

74+
#[command(
75+
name = "removeColdDataFlowCtrGroupConfig",
76+
about = "Remove consumer from cold ctr config.",
77+
long_about = None,
78+
)]
79+
RemoveColdDataFlowCtrGroupConfig(RemoveColdDataFlowCtrGroupConfigSubCommand),
80+
7281
#[command(
7382
name = "resetMasterFlushOffset",
7483
about = "Reset master flush offset in slave.",
@@ -112,6 +121,7 @@ impl CommandExecute for BrokerCommands {
112121
BrokerCommands::CleanUnusedTopic(value) => value.execute(rpc_hook).await,
113122
BrokerCommands::DeleteExpiredCommitLog(value) => value.execute(rpc_hook).await,
114123
BrokerCommands::GetBrokerConfig(cmd) => cmd.execute(rpc_hook).await,
124+
BrokerCommands::RemoveColdDataFlowCtrGroupConfig(value) => value.execute(rpc_hook).await,
115125
BrokerCommands::ResetMasterFlushOffset(value) => value.execute(rpc_hook).await,
116126
BrokerCommands::SendMsgStatus(value) => value.execute(rpc_hook).await,
117127
BrokerCommands::SwitchTimerEngine(value) => value.execute(rpc_hook).await,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Copyright 2023 The RocketMQ Rust Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use cheetah_string::CheetahString;
18+
use clap::ArgGroup;
19+
use clap::Parser;
20+
use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
21+
use rocketmq_common::TimeUtils::get_current_millis;
22+
use rocketmq_error::RocketMQError;
23+
use rocketmq_error::RocketMQResult;
24+
use rocketmq_remoting::runtime::RPCHook;
25+
26+
use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
27+
use crate::commands::command_util::CommandUtil;
28+
use crate::commands::target::Target;
29+
use crate::commands::CommandExecute;
30+
31+
#[derive(Debug, Clone, Parser)]
32+
#[command(group(ArgGroup::new("target")
33+
.required(true)
34+
.args(&["broker_addr", "cluster_name"]))
35+
)]
36+
pub struct RemoveColdDataFlowCtrGroupConfigSubCommand {
37+
#[arg(short = 'b', long = "brokerAddr", required = false, help = "update which broker")]
38+
broker_addr: Option<String>,
39+
40+
#[arg(short = 'c', long = "clusterName", required = false, help = "update which cluster")]
41+
cluster_name: Option<String>,
42+
43+
#[arg(
44+
short = 'g',
45+
long = "consumerGroup",
46+
required = true,
47+
help = "the consumer group will remove from the config"
48+
)]
49+
consumer_group: String,
50+
}
51+
52+
struct ParsedCommand {
53+
consumer_group: String,
54+
}
55+
56+
impl ParsedCommand {
57+
fn new(command: &RemoveColdDataFlowCtrGroupConfigSubCommand) -> Result<Self, RocketMQError> {
58+
let consumer_group = command.consumer_group.trim();
59+
if consumer_group.is_empty() {
60+
return Err(RocketMQError::IllegalArgument(
61+
"RemoveColdDataFlowCtrGroupConfigSubCommand: consumer_group is empty".into(),
62+
));
63+
}
64+
Ok(Self {
65+
consumer_group: consumer_group.into(),
66+
})
67+
}
68+
}
69+
70+
impl CommandExecute for RemoveColdDataFlowCtrGroupConfigSubCommand {
71+
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
72+
let target = Target::new(&self.cluster_name, &self.broker_addr).map_err(|_| {
73+
RocketMQError::IllegalArgument(
74+
"RemoveColdDataFlowCtrGroupConfigSubCommand: Specify exactly one of --brokerAddr (-b) or \
75+
--clusterName (-c)"
76+
.into(),
77+
)
78+
})?;
79+
let command = ParsedCommand::new(self)?;
80+
81+
let mut default_mqadmin_ext = if let Some(rpc_hook) = rpc_hook {
82+
DefaultMQAdminExt::with_rpc_hook(rpc_hook)
83+
} else {
84+
DefaultMQAdminExt::new()
85+
};
86+
default_mqadmin_ext
87+
.client_config_mut()
88+
.set_instance_name(get_current_millis().to_string().into());
89+
90+
MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| {
91+
RocketMQError::Internal(format!(
92+
"RemoveColdDataFlowCtrGroupConfigSubCommand: Failed to start MQAdminExt: {}",
93+
e
94+
))
95+
})?;
96+
97+
let operation_result = async {
98+
match target {
99+
Target::BrokerAddr(broker_addr) => {
100+
remove_config_for_broker(&default_mqadmin_ext, &broker_addr, &command.consumer_group).await
101+
}
102+
Target::ClusterName(cluster_name) => {
103+
let failed_broker_addr =
104+
remove_config_for_cluster(&default_mqadmin_ext, &cluster_name, &command.consumer_group).await?;
105+
if failed_broker_addr.is_empty() {
106+
Ok(())
107+
} else {
108+
Err(RocketMQError::Internal(format!(
109+
"RemoveColdDataFlowCtrGroupConfigSubCommand: Failed to remove for brokers {}",
110+
failed_broker_addr.join(", ")
111+
)))
112+
}
113+
}
114+
}
115+
}
116+
.await;
117+
MQAdminExt::shutdown(&mut default_mqadmin_ext).await;
118+
operation_result
119+
}
120+
}
121+
122+
async fn remove_config_for_broker(
123+
default_mqadmin_ext: &DefaultMQAdminExt,
124+
broker_addr: &str,
125+
consumer_group: &str,
126+
) -> Result<(), RocketMQError> {
127+
match default_mqadmin_ext
128+
.remove_cold_data_flow_ctr_group_config(CheetahString::from(broker_addr), CheetahString::from(consumer_group))
129+
.await
130+
{
131+
Ok(_) => {
132+
println!("remove broker cold read threshold success, {}", broker_addr);
133+
Ok(())
134+
}
135+
Err(e) => Err(RocketMQError::Internal(format!(
136+
"RemoveColdDataFlowCtrGroupConfigSubCommand: Failed to remove for broker {}: {}",
137+
broker_addr, e
138+
))),
139+
}
140+
}
141+
142+
async fn remove_config_for_cluster(
143+
default_mqadmin_ext: &DefaultMQAdminExt,
144+
cluster_name: &str,
145+
consumer_group: &str,
146+
) -> Result<Vec<CheetahString>, RocketMQError> {
147+
let cluster_info = default_mqadmin_ext.examine_broker_cluster_info().await?;
148+
149+
match CommandUtil::fetch_master_addr_by_cluster_name(&cluster_info, cluster_name) {
150+
Ok(addresses) => {
151+
let failed_brokers: Vec<CheetahString> =
152+
futures::future::join_all(addresses.into_iter().map(|addr| async move {
153+
remove_config_for_broker(default_mqadmin_ext, addr.as_str(), consumer_group)
154+
.await
155+
.map_err(|_err| addr)
156+
}))
157+
.await
158+
.iter()
159+
.filter_map(|result| result.clone().err())
160+
.collect();
161+
162+
Ok(failed_brokers)
163+
}
164+
Err(e) => Err(RocketMQError::Internal(format!(
165+
"RemoveColdDataFlowCtrGroupConfigSubCommand: Failed to fetch broker addresses by cluster name {}",
166+
e
167+
))),
168+
}
169+
}

0 commit comments

Comments
 (0)