Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion rocketmq-broker/src/processor/admin_broker_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::processor::admin_broker_processor::reset_master_flusg_offset_handler:
use crate::processor::admin_broker_processor::subscription_group_handler::SubscriptionGroupHandler;
use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler;
use crate::processor::admin_broker_processor::update_broker_ha_handler::UpdateBrokerHaHandler;
use crate::processor::admin_broker_processor::update_cold_data_flow_ctr_group_config::UpdateColdDataFlowCtrGroupConfigRequestHandler;
use crate::processor::admin_broker_processor::update_user_request_handler::UpdateUserRequestHandler;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
Expand All @@ -54,6 +55,7 @@ mod reset_master_flusg_offset_handler;
mod subscription_group_handler;
mod topic_request_handler;
mod update_broker_ha_handler;
mod update_cold_data_flow_ctr_group_config;
mod update_user_request_handler;

pub struct AdminBrokerProcessor<MS: MessageStore> {
Expand All @@ -76,6 +78,7 @@ pub struct AdminBrokerProcessor<MS: MessageStore> {
update_user_request_handler: UpdateUserRequestHandler<MS>,
list_users_request_handler: ListUsersRequestHandler<MS>,
get_user_request_handler: GetUserRequestHandler<MS>,
update_cold_data_flow_ctr_group_config_request_handler: UpdateColdDataFlowCtrGroupConfigRequestHandler<MS>,
}

impl<MS> RequestProcessor for AdminBrokerProcessor<MS>
Expand Down Expand Up @@ -117,6 +120,8 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
let update_user_request_handler = UpdateUserRequestHandler::new(broker_runtime_inner.clone());
let list_users_request_handler = ListUsersRequestHandler::new(broker_runtime_inner.clone());
let get_user_request_handler = GetUserRequestHandler::new(broker_runtime_inner.clone());
let update_cold_data_flow_ctr_group_config_request_handler =
UpdateColdDataFlowCtrGroupConfigRequestHandler::new(broker_runtime_inner.clone());

AdminBrokerProcessor {
topic_request_handler,
Expand All @@ -136,6 +141,7 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
update_user_request_handler,
list_users_request_handler,
get_user_request_handler,
update_cold_data_flow_ctr_group_config_request_handler,
}
}
}
Expand Down Expand Up @@ -181,7 +187,11 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
.get_broker_config(channel, ctx, request_code, request)
.await
}
RequestCode::UpdateColdDataFlowCtrConfig => Ok(get_unknown_cmd_response(request_code)),
RequestCode::UpdateColdDataFlowCtrConfig => {
self.update_cold_data_flow_ctr_group_config_request_handler
.update_cold_data_flow_ctr_group_config(channel, ctx, request_code, request)
.await
}
RequestCode::RemoveColdDataFlowCtrConfig => Ok(get_unknown_cmd_response(request_code)),
RequestCode::GetColdDataFlowCtrInfo => Ok(get_unknown_cmd_response(request_code)),
RequestCode::SetCommitlogReadMode => Ok(get_unknown_cmd_response(request_code)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::broker_runtime::BrokerRuntimeInner;
use rocketmq_common::common::mix_all;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_store::MessageStore;

#[derive(Clone)]
pub struct UpdateColdDataFlowCtrGroupConfigRequestHandler<MS: MessageStore> {
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
}

impl<MS: MessageStore> UpdateColdDataFlowCtrGroupConfigRequestHandler<MS> {
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
Self { broker_runtime_inner }
}

pub async fn update_cold_data_flow_ctr_group_config(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request_code: RequestCode,
request: &mut RemotingCommand,
) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
let response = RemotingCommand::default();

let body = match request.get_body() {
Some(body) => body,
None => {
return Ok(Some(
response
.set_code(ResponseCode::InvalidParameter)
.set_remark("Request body is empty"),
));
}
};

let body = mix_all::string_to_properties(&String::from_utf8_lossy(body));
match body {
Some(_body) => {
// TODO get broker controller and do operations
Ok(Some(response.set_code(ResponseCode::SystemError).set_remark(
"UpdateColdDataFlowCtrGroupConfig not implemented: ColdDataCgCtrService not configured in \
broker_runtime",
)))
Comment on lines +55 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handler is a stub: valid requests always return SystemError. Line 55–62 returns “not implemented” for any well-formed body, so the new admin command can never succeed. Please wire this to the ColdDataCgCtrService (or equivalent) and apply the updates, or gate the request with a clear unsupported response until implementation is ready.

🤖 Prompt for AI Agents
In
`@rocketmq-broker/src/processor/admin_broker_processor/update_cold_data_flow_ctr_group_config.rs`
around lines 55 - 62, The handler currently parses the request body but always
returns a SystemError stub (response.set_code(ResponseCode::SystemError))
instead of performing the update; replace the stub by retrieving the broker
controller/runtime, fetch the ColdDataCgCtrService from broker_runtime, call the
appropriate update method on ColdDataCgCtrService using the parsed properties
(from mix_all::string_to_properties) and set a successful response code/remark
on success, and on missing service return a clear unsupported response (not
SystemError) explaining ColdDataCgCtrService is not configured; ensure you
update the code paths in update_cold_data_flow_ctr_group_config.rs to use the
real service rather than the placeholder.

}
None => Ok(Some(
response
.set_code(ResponseCode::InvalidParameter)
.set_remark("Request body is empty"),
)),
}
}
}
7 changes: 6 additions & 1 deletion rocketmq-client/src/admin/default_mq_admin_ext_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,12 @@ impl MQAdminExt for DefaultMQAdminExtImpl {
broker_addr: CheetahString,
properties: HashMap<CheetahString, CheetahString>,
) -> rocketmq_error::RocketMQResult<()> {
todo!()
self.client_instance
.as_ref()
.unwrap()
.get_mq_client_api_impl()
.update_cold_data_flow_ctr_group_config(broker_addr, properties, self.timeout_millis.as_millis() as u64)
.await
}

async fn remove_cold_data_flow_ctr_group_config(
Expand Down
29 changes: 29 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,35 @@ impl MQClientAPIImpl {
)),
}
}

pub async fn update_cold_data_flow_ctr_group_config(
&self,
broker_addr: CheetahString,
properties: HashMap<CheetahString, CheetahString>,
timeout_millis: u64,
) -> RocketMQResult<()> {
let body = mix_all::properties_to_string(&properties);
if body.is_empty() {
return Ok(());
}

let request = RemotingCommand::create_remoting_command(RequestCode::UpdateColdDataFlowCtrConfig);
let request = request.set_body(body.to_string());
let broker_addr =
mix_all::broker_vip_channel(self.client_config.is_vip_channel_enabled(), broker_addr.as_str());
let response = self
.remoting_client
.invoke_request(Some(broker_addr).as_ref(), request, timeout_millis)
.await?;

match ResponseCode::from(response.code()) {
ResponseCode::Success => Ok(()),
_ => Err(mq_client_err!(
response.code(),
response.remark().map(|s| s.to_string()).unwrap_or_default()
)),
}
}
}

fn build_queue_offset_sorted_map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,9 @@ impl MQAdminExt for DefaultMQAdminExt {
broker_addr: CheetahString,
properties: HashMap<CheetahString, CheetahString>,
) -> rocketmq_error::RocketMQResult<()> {
todo!()
self.default_mqadmin_ext_impl
.update_cold_data_flow_ctr_group_config(broker_addr, properties)
.await
}

async fn remove_cold_data_flow_ctr_group_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod clean_unused_topic_command;
mod get_broker_config_sub_command;
mod send_msg_status_command;
mod switch_timer_engine_sub_command;
mod update_cold_data_flow_ctr_group_config_sub_command;

use std::sync::Arc;

Expand All @@ -27,6 +28,7 @@ use crate::commands::broker_commands::clean_unused_topic_command::CleanUnusedTop
use crate::commands::broker_commands::get_broker_config_sub_command::GetBrokerConfigSubCommand;
use crate::commands::broker_commands::send_msg_status_command::SendMsgStatusCommand;
use crate::commands::broker_commands::switch_timer_engine_sub_command::SwitchTimerEngineSubCommand;
use crate::commands::broker_commands::update_cold_data_flow_ctr_group_config_sub_command::UpdateColdDataFlowCtrGroupConfigSubCommand;
use crate::commands::CommandExecute;

#[derive(Subcommand)]
Expand Down Expand Up @@ -58,6 +60,13 @@ pub enum BrokerCommands {
long_about = None,
)]
SwitchTimerEngine(SwitchTimerEngineSubCommand),

#[command(
name = "updateColdDataFlowCtrGroupConfig",
about = "Add or update cold data flow ctr group config.",
long_about = None,
)]
UpdateColdDataFlowCtrGroupConfig(UpdateColdDataFlowCtrGroupConfigSubCommand),
}

impl CommandExecute for BrokerCommands {
Expand All @@ -67,6 +76,7 @@ impl CommandExecute for BrokerCommands {
BrokerCommands::GetBrokerConfigSubCommand(cmd) => cmd.execute(rpc_hook).await,
BrokerCommands::SendMsgStatus(value) => value.execute(rpc_hook).await,
BrokerCommands::SwitchTimerEngine(value) => value.execute(rpc_hook).await,
BrokerCommands::UpdateColdDataFlowCtrGroupConfig(value) => value.execute(rpc_hook).await,
}
}
}
Loading
Loading