Skip to content

Commit b4d8a02

Browse files
watchgoujon
andauthored
[ISSUE #4059]Notify broker role changed(RequstCode:1008) (#4058)
* add notify min broker id change * add notify min broker id change * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Implementation Notification Broker Id Change Logic * Implementation Notification Broker Id Change Logic * Implementation Notification Broker Id Change Logic * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Broker id Change function external logic implementation * update broker ha info * Broker id Change function external logic implementation * update broker ha info * update logic * update logic * reset master flush offset * broker epoch cache handler * broker epoch cache handler * back ffi * Comment out entry encode * Comment out entry encode * add object encode * update unwrap function * back ffi push * notify broker role changed * Delete the imported module * Optimization logic --------- Co-authored-by: jon <zjon234@gmail.cm>
1 parent abcff44 commit b4d8a02

File tree

8 files changed

+213
-1
lines changed

8 files changed

+213
-1
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,6 +1848,11 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
18481848
&self.replicas_manager
18491849
}
18501850

1851+
#[inline]
1852+
pub fn replicas_manager_mut(&mut self) -> &mut Option<ReplicasManager> {
1853+
&mut self.replicas_manager
1854+
}
1855+
18511856
#[inline]
18521857
pub fn store_host(&self) -> SocketAddr {
18531858
self.store_host

rocketmq-broker/src/controller/replicas_manager.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry;
21
/*
32
* Licensed to the Apache Software Foundation (ASF) under one or more
43
* contributor license agreements. See the NOTICE file distributed with
@@ -15,7 +14,13 @@ use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry;
1514
* See the License for the specific language governing permissions and
1615
* limitations under the License.
1716
*/
17+
18+
use std::collections::HashSet;
19+
20+
use cheetah_string::CheetahString;
21+
use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry;
1822
use tracing::warn;
23+
1924
#[derive(Default)]
2025
pub struct ReplicasManager {}
2126

@@ -31,4 +36,15 @@ impl ReplicasManager {
3136
pub fn get_epoch_entries(&self) -> Vec<EpochEntry> {
3237
unimplemented!("")
3338
}
39+
40+
pub async fn change_broker_role(
41+
&mut self,
42+
_new_master_broker_id: Option<u64>,
43+
_new_master_address: Option<CheetahString>,
44+
_new_master_epoch: Option<i32>,
45+
_sync_state_set_epoch: Option<i32>,
46+
_sync_state_set: Option<&HashSet<i64>>,
47+
) -> rocketmq_error::RocketMQResult<()> {
48+
Ok(())
49+
}
3450
}

rocketmq-broker/src/processor/admin_broker_processor.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::processor::admin_broker_processor::batch_mq_handler::BatchMqHandler;
3030
use crate::processor::admin_broker_processor::broker_config_request_handler::BrokerConfigRequestHandler;
3131
use crate::processor::admin_broker_processor::broker_epoch_cache_handler::BrokerEpochCacheHandler;
3232
use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler;
33+
use crate::processor::admin_broker_processor::notify_broker_role_change_handler::NotifyBrokerRoleChangeHandler;
3334
use crate::processor::admin_broker_processor::notify_min_broker_id_handler::NotifyMinBrokerChangeIdHandler;
3435
use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler;
3536
use crate::processor::admin_broker_processor::reset_master_flusg_offset_handler::ResetMasterFlushOffsetHandler;
@@ -41,6 +42,7 @@ mod batch_mq_handler;
4142
mod broker_config_request_handler;
4243
mod broker_epoch_cache_handler;
4344
mod consumer_request_handler;
45+
mod notify_broker_role_change_handler;
4446
mod notify_min_broker_id_handler;
4547
mod offset_request_handler;
4648
mod reset_master_flusg_offset_handler;
@@ -62,6 +64,7 @@ pub struct AdminBrokerProcessor<MS: MessageStore> {
6264
update_broker_ha_handler: UpdateBrokerHaHandler<MS>,
6365
reset_master_flusg_offset_handler: ResetMasterFlushOffsetHandler<MS>,
6466
broker_epoch_cache_handler: BrokerEpochCacheHandler<MS>,
67+
notify_broker_role_change_handler: NotifyBrokerRoleChangeHandler<MS>,
6568
}
6669

6770
impl<MS> RequestProcessor for AdminBrokerProcessor<MS>
@@ -102,6 +105,9 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
102105

103106
let broker_epoch_cache_handler = BrokerEpochCacheHandler::new(broker_runtime_inner.clone());
104107

108+
let notify_broker_role_change_handler =
109+
NotifyBrokerRoleChangeHandler::new(broker_runtime_inner.clone());
110+
105111
AdminBrokerProcessor {
106112
topic_request_handler,
107113
broker_config_request_handler,
@@ -114,6 +120,7 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
114120
update_broker_ha_handler,
115121
reset_master_flusg_offset_handler,
116122
broker_epoch_cache_handler,
123+
notify_broker_role_change_handler,
117124
}
118125
}
119126
}
@@ -260,6 +267,11 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
260267
.get_broker_epoch_cache(channel, ctx, request_code, request)
261268
.await
262269
}
270+
RequestCode::NotifyBrokerRoleChanged => {
271+
self.notify_broker_role_change_handler
272+
.notify_broker_role_changed(channel, ctx, request_code, request)
273+
.await
274+
}
263275
_ => Some(get_unknown_cmd_response(request_code)),
264276
}
265277
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use rocketmq_remoting::code::request_code::RequestCode;
19+
use rocketmq_remoting::code::response_code::ResponseCode;
20+
use rocketmq_remoting::net::channel::Channel;
21+
use rocketmq_remoting::protocol::body::sync_state_set_body::SyncStateSet;
22+
use rocketmq_remoting::protocol::header::notify_broker_role_change_request_header::NotifyBrokerRoleChangedRequestHeader;
23+
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
24+
use rocketmq_remoting::protocol::RemotingDeserializable;
25+
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
26+
use rocketmq_rust::ArcMut;
27+
use rocketmq_store::base::message_store::MessageStore;
28+
use tracing::info;
29+
30+
use crate::broker_runtime::BrokerRuntimeInner;
31+
32+
#[derive(Clone)]
33+
pub struct NotifyBrokerRoleChangeHandler<MS: MessageStore> {
34+
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
35+
}
36+
37+
impl<MS: MessageStore> NotifyBrokerRoleChangeHandler<MS> {
38+
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
39+
Self {
40+
broker_runtime_inner,
41+
}
42+
}
43+
44+
pub async fn notify_broker_role_changed(
45+
&mut self,
46+
_channel: Channel,
47+
_ctx: ConnectionHandlerContext,
48+
_request_code: RequestCode,
49+
request: &mut RemotingCommand,
50+
) -> Option<RemotingCommand> {
51+
let request_header =
52+
request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();
53+
54+
let sync_state_set_info =
55+
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();
56+
57+
let response = RemotingCommand::create_response_command();
58+
59+
info!(
60+
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
61+
request_header.as_ref().expect("null")
62+
);
63+
64+
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
65+
if let Ok(request_header) = request_header {
66+
match replicas_mangesr
67+
.change_broker_role(
68+
request_header.master_broker_id,
69+
request_header.master_address,
70+
request_header.master_epoch,
71+
request_header.sync_state_set_epoch,
72+
sync_state_set_info.get_sync_state_set(),
73+
)
74+
.await
75+
{
76+
Ok(_) => {}
77+
Err(e) => {
78+
panic!("Failed to call method change_broker_role: {}", e);
79+
}
80+
}
81+
}
82+
}
83+
84+
Some(response.set_code(ResponseCode::Success))
85+
}
86+
}

rocketmq-remoting/src/protocol/body.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub mod request;
5656
pub mod response;
5757
pub mod set_message_request_mode_request_body;
5858
pub mod subscription_group_wrapper;
59+
pub mod sync_state_set_body;
5960
pub mod timer_metrics_serialize_wrapper;
6061
pub mod topic;
6162
pub mod topic_info_wrapper;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::collections::HashSet;
19+
20+
use serde::Deserialize;
21+
use serde::Serialize;
22+
23+
#[derive(Debug, Serialize, Deserialize, Default)]
24+
#[serde(rename_all = "camelCase")]
25+
pub struct SyncStateSet {
26+
sync_state_set: Option<HashSet<i64>>,
27+
sync_state_set_epoch: i32,
28+
}
29+
30+
impl SyncStateSet {
31+
pub fn get_sync_state_set(&self) -> Option<&HashSet<i64>> {
32+
self.sync_state_set.as_ref()
33+
}
34+
35+
pub fn get_sync_state_set_epoch(&self) -> i32 {
36+
self.sync_state_set_epoch
37+
}
38+
}

rocketmq-remoting/src/protocol/header.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub mod message_operation_header;
5252
pub mod namesrv;
5353
pub mod notification_request_header;
5454
pub mod notification_response_header;
55+
pub mod notify_broker_role_change_request_header;
5556
pub mod notify_consumer_ids_changed_request_header;
5657
pub mod pop_message_request_header;
5758
pub mod pop_message_response_header;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::fmt::Display;
19+
20+
use cheetah_string::CheetahString;
21+
use rocketmq_macros::RequestHeaderCodec;
22+
use serde::Deserialize;
23+
use serde::Serialize;
24+
25+
#[derive(Serialize, Deserialize, Debug, RequestHeaderCodec)]
26+
#[serde(rename_all = "camelCase")]
27+
pub struct NotifyBrokerRoleChangedRequestHeader {
28+
#[required]
29+
pub master_address: Option<CheetahString>,
30+
31+
#[required]
32+
pub master_epoch: Option<i32>,
33+
34+
#[required]
35+
pub sync_state_set_epoch: Option<i32>,
36+
37+
#[required]
38+
pub master_broker_id: Option<u64>,
39+
}
40+
41+
impl Display for NotifyBrokerRoleChangedRequestHeader {
42+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43+
write!(
44+
f,
45+
"(master_address={:?}, master_epoch={:?}, sync_state_set_epoch={:?}, \
46+
master_broker_id={:?})",
47+
self.master_address,
48+
self.master_epoch,
49+
self.sync_state_set_epoch,
50+
self.master_broker_id
51+
)
52+
}
53+
}

0 commit comments

Comments
 (0)