Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ mod update_user_sub_command;

use std::sync::Arc;

use crate::commands::auth_commands::copy_acl_sub_command::CopyAclSubCommand;
use crate::commands::auth_commands::copy_users_sub_command::CopyUsersSubCommand;
use crate::commands::auth_commands::create_acl_sub_command::CreateAclSubCommand;
use crate::commands::auth_commands::create_user_sub_command::CreateUserSubCommand;
use crate::commands::auth_commands::delete_acl_sub_command::DeleteAclSubCommand;
use crate::commands::auth_commands::delete_user_sub_command::DeleteUserSubCommand;
use crate::commands::auth_commands::get_acl_sub_command::GetAclSubCommand;
use crate::commands::auth_commands::get_user_sub_command::GetUserSubCommand;
use crate::commands::auth_commands::list_acl_sub_command::ListAclSubCommand;
use crate::commands::auth_commands::list_users_sub_command::ListUsersSubCommand;
use crate::commands::auth_commands::update_acl_sub_command::UpdateAclSubCommand;
use crate::commands::auth_commands::update_user_sub_command::UpdateUserSubCommand;
use crate::commands::CommandExecute;
use clap::Subcommand;
use rocketmq_error::RocketMQResult;
Expand All @@ -39,84 +51,84 @@ pub enum AuthCommands {
about = "Copy acl to cluster",
long_about = None,
)]
CopyAcl(copy_acl_sub_command::CopyAclSubCommand),
CopyAcl(CopyAclSubCommand),

#[command(
name = "copyUser",
about = "Copy user to cluster",
long_about = None,
)]
CopyUsers(copy_users_sub_command::CopyUsersSubCommand),
CopyUsers(CopyUsersSubCommand),

#[command(
name = "createAcl",
about = "Create acl to cluster",
long_about = None,
)]
CreateAcl(create_acl_sub_command::CreateAclSubCommand),
CreateAcl(CreateAclSubCommand),

#[command(
name = "createUser",
about = "Create user to cluster.",
long_about = None,
)]
CreateUser(create_user_sub_command::CreateUserSubCommand),
CreateUser(CreateUserSubCommand),

#[command(
name = "deleteAcl",
about = "Delete acl from cluster.",
long_about = None,
)]
DeleteAcl(delete_acl_sub_command::DeleteAclSubCommand),
DeleteAcl(DeleteAclSubCommand),

#[command(
name = "deleteUser",
about = "Delete user from cluster.",
long_about = None,
)]
DeleteUser(delete_user_sub_command::DeleteUserSubCommand),

#[command(
name = "getUser",
about = "Get user from cluster.",
long_about = None,
)]
GetUser(get_user_sub_command::GetUserSubCommand),
DeleteUser(DeleteUserSubCommand),

#[command(
name = "getAcl",
about = "Get acl from cluster.",
long_about = None,
)]
GetAcl(get_acl_sub_command::GetAclSubCommand),
GetAcl(GetAclSubCommand),

#[command(
name = "listUsers",
about = "List users from cluster.",
name = "getUser",
about = "Get user from cluster.",
long_about = None,
)]
ListUsers(list_users_sub_command::ListUsersSubCommand),
GetUser(GetUserSubCommand),

#[command(
name = "listAcl",
about = "List acl from cluster",
long_about = None,
)]
ListAcl(list_acl_sub_command::ListAclSubCommand),
ListAcl(ListAclSubCommand),

#[command(
name = "listUsers",
about = "List users from cluster.",
long_about = None,
)]
ListUsers(ListUsersSubCommand),

#[command(
name = "updateAcl",
about = "Update Access Control List (ACL)",
long_about = None,
)]
UpdateAcl(update_acl_sub_command::UpdateAclSubCommand),
UpdateAcl(UpdateAclSubCommand),

#[command(
name = "updateUser",
about = "Update user to cluster.",
long_about = None,
)]
UpdateUser(update_user_sub_command::UpdateUserSubCommand),
UpdateUser(UpdateUserSubCommand),
}

impl CommandExecute for AuthCommands {
Expand All @@ -128,8 +140,8 @@ impl CommandExecute for AuthCommands {
AuthCommands::CreateUser(value) => value.execute(rpc_hook).await,
AuthCommands::DeleteAcl(value) => value.execute(rpc_hook).await,
AuthCommands::DeleteUser(value) => value.execute(rpc_hook).await,
AuthCommands::GetUser(value) => value.execute(rpc_hook).await,
AuthCommands::GetAcl(value) => value.execute(rpc_hook).await,
AuthCommands::GetUser(value) => value.execute(rpc_hook).await,
AuthCommands::ListAcl(value) => value.execute(rpc_hook).await,
AuthCommands::ListUsers(value) => value.execute(rpc_hook).await,
AuthCommands::UpdateAcl(value) => value.execute(rpc_hook).await,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

mod clean_expired_cq_sub_command;
mod clean_unused_topic_command;
mod delete_expired_commit_log_command;
mod clean_unused_topic_sub_command;
mod delete_expired_commit_log_sub_command;
mod get_broker_config_sub_command;
mod reset_master_flush_offset_sub_command;
mod send_msg_status_command;
mod send_msg_status_sub_command;
mod switch_timer_engine_sub_command;
mod update_cold_data_flow_ctr_group_config_sub_command;

Expand All @@ -28,11 +28,11 @@ use rocketmq_error::RocketMQResult;
use rocketmq_remoting::runtime::RPCHook;

use crate::commands::broker_commands::clean_expired_cq_sub_command::CleanExpiredCQSubCommand;
use crate::commands::broker_commands::clean_unused_topic_command::CleanUnusedTopicCommand;
use crate::commands::broker_commands::delete_expired_commit_log_command::DeleteExpiredCommitLogCommand;
use crate::commands::broker_commands::clean_unused_topic_sub_command::CleanUnusedTopicSubCommand;
use crate::commands::broker_commands::delete_expired_commit_log_sub_command::DeleteExpiredCommitLogSubCommand;
use crate::commands::broker_commands::get_broker_config_sub_command::GetBrokerConfigSubCommand;
use crate::commands::broker_commands::reset_master_flush_offset_sub_command::ResetMasterFlushOffsetSubCommand;
use crate::commands::broker_commands::send_msg_status_command::SendMsgStatusCommand;
use crate::commands::broker_commands::send_msg_status_sub_command::SendMsgStatusSubCommand;
use crate::commands::broker_commands::switch_timer_engine_sub_command::SwitchTimerEngineSubCommand;
use crate::commands::broker_commands::update_cold_data_flow_ctr_group_config_sub_command::UpdateColdDataFlowCtrGroupConfigSubCommand;
use crate::commands::CommandExecute;
Expand All @@ -51,21 +51,21 @@ pub enum BrokerCommands {
about = "Clean unused topic on broker.",
long_about = None,
)]
CleanUnusedTopic(CleanUnusedTopicCommand),
CleanUnusedTopic(CleanUnusedTopicSubCommand),

#[command(
name = "deleteExpiredCommitLog",
about = "Delete expired CommitLog files.",
long_about = None,
)]
DeleteExpiredCommitLog(DeleteExpiredCommitLogCommand),
DeleteExpiredCommitLog(DeleteExpiredCommitLogSubCommand),

#[command(
name = "getBrokerConfig",
about = "Get broker config by cluster or special broker.",
long_about = None,
)]
GetBrokerConfigSubCommand(GetBrokerConfigSubCommand),
GetBrokerConfig(GetBrokerConfigSubCommand),

#[command(
name = "resetMasterFlushOffset",
Expand All @@ -79,7 +79,7 @@ pub enum BrokerCommands {
about = "Send msg to broker.",
long_about = None,
)]
SendMsgStatus(SendMsgStatusCommand),
SendMsgStatus(SendMsgStatusSubCommand),

#[command(
name = "switchTimerEngine",
Expand All @@ -102,7 +102,7 @@ impl CommandExecute for BrokerCommands {
BrokerCommands::CleanExpiredCQ(value) => value.execute(rpc_hook).await,
BrokerCommands::CleanUnusedTopic(value) => value.execute(rpc_hook).await,
BrokerCommands::DeleteExpiredCommitLog(value) => value.execute(rpc_hook).await,
BrokerCommands::GetBrokerConfigSubCommand(cmd) => cmd.execute(rpc_hook).await,
BrokerCommands::GetBrokerConfig(cmd) => cmd.execute(rpc_hook).await,
BrokerCommands::ResetMasterFlushOffset(value) => value.execute(rpc_hook).await,
BrokerCommands::SendMsgStatus(value) => value.execute(rpc_hook).await,
BrokerCommands::SwitchTimerEngine(value) => value.execute(rpc_hook).await,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ use crate::commands::CommandExecute;
.required(false)
.args(&["broker_addr", "cluster_name"]))
)]
pub struct CleanUnusedTopicCommand {
pub struct CleanUnusedTopicSubCommand {
#[arg(short = 'b', long = "brokerAddr", required = false, help = "Broker address")]
broker_addr: Option<String>,

#[arg(short = 'c', long = "cluster", required = false, help = "Cluster name")]
cluster_name: Option<String>,
}

impl CommandExecute for CleanUnusedTopicCommand {
impl CommandExecute for CleanUnusedTopicSubCommand {
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
let mut default_mqadmin_ext = if let Some(rpc_hook) = rpc_hook {
DefaultMQAdminExt::with_rpc_hook(rpc_hook)
Expand All @@ -51,7 +51,7 @@ impl CommandExecute for CleanUnusedTopicCommand {
.set_instance_name(get_current_millis().to_string().into());

MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| {
RocketMQError::Internal(format!("CleanUnusedTopicCommand: Failed to start MQAdminExt: {}", e))
RocketMQError::Internal(format!("CleanUnusedTopicSubCommand: Failed to start MQAdminExt: {}", e))
})?;

let operation_result = clean_unused_topic(&default_mqadmin_ext, self).await;
Expand All @@ -63,7 +63,7 @@ impl CommandExecute for CleanUnusedTopicCommand {

async fn clean_unused_topic(
default_mqadmin_ext: &DefaultMQAdminExt,
command: &CleanUnusedTopicCommand,
command: &CleanUnusedTopicSubCommand,
) -> RocketMQResult<()> {
let addr = command
.broker_addr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ use crate::commands::CommandExecute;
.required(false)
.args(&["broker_addr", "cluster_name"]))
)]
pub struct DeleteExpiredCommitLogCommand {
pub struct DeleteExpiredCommitLogSubCommand {
#[arg(short = 'b', long = "brokerAddr", required = false, help = "Broker address")]
broker_addr: Option<String>,

#[arg(short = 'c', long = "cluster", required = false, help = "Cluster name")]
cluster_name: Option<String>,
}

impl CommandExecute for DeleteExpiredCommitLogCommand {
impl CommandExecute for DeleteExpiredCommitLogSubCommand {
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
let mut default_mqadmin_ext = if let Some(rpc_hook) = rpc_hook {
DefaultMQAdminExt::with_rpc_hook(rpc_hook)
Expand All @@ -52,7 +52,7 @@ impl CommandExecute for DeleteExpiredCommitLogCommand {

MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| {
RocketMQError::Internal(format!(
"DeleteExpiredCommitLogCommand: Failed to start MQAdminExt: {}",
"DeleteExpiredCommitLogSubCommand: Failed to start MQAdminExt: {}",
e
))
})?;
Expand All @@ -66,7 +66,7 @@ impl CommandExecute for DeleteExpiredCommitLogCommand {

async fn delete_expired_commit_log(
default_mqadmin_ext: &DefaultMQAdminExt,
command: &DeleteExpiredCommitLogCommand,
command: &DeleteExpiredCommitLogSubCommand,
) -> RocketMQResult<()> {
let addr = command
.broker_addr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn build_message(topic: &str, message_size: usize) -> Message {
}

#[derive(Debug, Clone, Parser)]
pub struct SendMsgStatusCommand {
pub struct SendMsgStatusSubCommand {
#[arg(
short = 'b',
long = "brokerName",
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct SendMsgStatusCommand {
count: u32,
}

impl CommandExecute for SendMsgStatusCommand {
impl CommandExecute for SendMsgStatusSubCommand {
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
let instance_name = format!("PID_SMSC_{}", get_current_millis());
let mut client_config = ClientConfig::default();
Expand All @@ -96,7 +96,7 @@ impl CommandExecute for SendMsgStatusCommand {
if let Err(e) = warmup_result {
producer.shutdown().await;
return Err(RocketMQError::Internal(format!(
"SendMsgStatusCommand command failed: {}",
"SendMsgStatusSubCommand command failed: {}",
e
)));
}
Expand All @@ -115,7 +115,7 @@ impl CommandExecute for SendMsgStatusCommand {
Err(e) => {
producer.shutdown().await;
return Err(RocketMQError::Internal(format!(
"SendMsgStatusCommand command failed: {}",
"SendMsgStatusSubCommand command failed: {}",
e
)));
}
Expand Down
Loading
Loading