Skip to content

Commit abcff44

Browse files
watchgoujon
andauthored
[ISSUE #4057]Broker epoch cache handler (#4056)
* add notify min broker id change * add notify min broker id change * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Implementation Notification Broker Id Change Logic * Implementation Notification Broker Id Change Logic * Implementation Notification Broker Id Change Logic * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Broker id Change function external logic implementation * Broker id Change function external logic implementation * update broker ha info * Broker id Change function external logic implementation * update broker ha info * update logic * update logic * reset master flush offset * broker epoch cache handler * broker epoch cache handler * back ffi * Comment out entry encode * Comment out entry encode * add object encode * update unwrap function * back ffi push --------- Co-authored-by: jon <zjon234@gmail.cm>
1 parent 8c4201c commit abcff44

File tree

6 files changed

+115
-9
lines changed

6 files changed

+115
-9
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1843,6 +1843,11 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
18431843
&mut self.pop_inflight_message_counter
18441844
}
18451845

1846+
#[inline]
1847+
pub fn replicas_manager(&self) -> &Option<ReplicasManager> {
1848+
&self.replicas_manager
1849+
}
1850+
18461851
#[inline]
18471852
pub fn store_host(&self) -> SocketAddr {
18481853
self.store_host

rocketmq-broker/src/controller/replicas_manager.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry;
12
/*
23
* Licensed to the Apache Software Foundation (ASF) under one or more
34
* contributor license agreements. See the NOTICE file distributed with
@@ -15,7 +16,6 @@
1516
* limitations under the License.
1617
*/
1718
use tracing::warn;
18-
1919
#[derive(Default)]
2020
pub struct ReplicasManager {}
2121

@@ -27,4 +27,8 @@ impl ReplicasManager {
2727
pub fn shutdown(&mut self) {
2828
warn!("ReplicasManager shutdown not implemented");
2929
}
30+
31+
pub fn get_epoch_entries(&self) -> Vec<EpochEntry> {
32+
unimplemented!("")
33+
}
3034
}

rocketmq-broker/src/processor/admin_broker_processor.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tracing::warn;
2828
use crate::broker_runtime::BrokerRuntimeInner;
2929
use crate::processor::admin_broker_processor::batch_mq_handler::BatchMqHandler;
3030
use crate::processor::admin_broker_processor::broker_config_request_handler::BrokerConfigRequestHandler;
31+
use crate::processor::admin_broker_processor::broker_epoch_cache_handler::BrokerEpochCacheHandler;
3132
use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler;
3233
use crate::processor::admin_broker_processor::notify_min_broker_id_handler::NotifyMinBrokerChangeIdHandler;
3334
use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler;
@@ -38,6 +39,7 @@ use crate::processor::admin_broker_processor::update_broker_ha_handler::UpdateBr
3839

3940
mod batch_mq_handler;
4041
mod broker_config_request_handler;
42+
mod broker_epoch_cache_handler;
4143
mod consumer_request_handler;
4244
mod notify_min_broker_id_handler;
4345
mod offset_request_handler;
@@ -59,6 +61,7 @@ pub struct AdminBrokerProcessor<MS: MessageStore> {
5961
notify_min_broker_handler: NotifyMinBrokerChangeIdHandler<MS>,
6062
update_broker_ha_handler: UpdateBrokerHaHandler<MS>,
6163
reset_master_flusg_offset_handler: ResetMasterFlushOffsetHandler<MS>,
64+
broker_epoch_cache_handler: BrokerEpochCacheHandler<MS>,
6265
}
6366

6467
impl<MS> RequestProcessor for AdminBrokerProcessor<MS>
@@ -97,6 +100,8 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
97100
let reset_master_flusg_offset_handler =
98101
ResetMasterFlushOffsetHandler::new(broker_runtime_inner.clone());
99102

103+
let broker_epoch_cache_handler = BrokerEpochCacheHandler::new(broker_runtime_inner.clone());
104+
100105
AdminBrokerProcessor {
101106
topic_request_handler,
102107
broker_config_request_handler,
@@ -108,6 +113,7 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
108113
notify_min_broker_handler,
109114
update_broker_ha_handler,
110115
reset_master_flusg_offset_handler,
116+
broker_epoch_cache_handler,
111117
}
112118
}
113119
}
@@ -249,6 +255,11 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
249255
.reset_master_flush_offset(channel, ctx, request_code, request)
250256
.await
251257
}
258+
RequestCode::GetBrokerEpochCache => {
259+
self.broker_epoch_cache_handler
260+
.get_broker_epoch_cache(channel, ctx, request_code, request)
261+
.await
262+
}
252263
_ => Some(get_unknown_cmd_response(request_code)),
253264
}
254265
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use rocketmq_remoting::code::request_code::RequestCode;
19+
use rocketmq_remoting::code::response_code::ResponseCode;
20+
use rocketmq_remoting::net::channel::Channel;
21+
use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntryCache;
22+
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
23+
use rocketmq_remoting::protocol::RemotingSerializable;
24+
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
25+
use rocketmq_rust::ArcMut;
26+
use rocketmq_store::base::message_store::MessageStore;
27+
28+
use crate::broker_runtime::BrokerRuntimeInner;
29+
30+
#[derive(Clone)]
31+
pub struct BrokerEpochCacheHandler<MS: MessageStore> {
32+
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
33+
}
34+
35+
impl<MS: MessageStore> BrokerEpochCacheHandler<MS> {
36+
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
37+
Self {
38+
broker_runtime_inner,
39+
}
40+
}
41+
42+
pub async fn get_broker_epoch_cache(
43+
&mut self,
44+
_channel: Channel,
45+
_ctx: ConnectionHandlerContext,
46+
_request_code: RequestCode,
47+
_request: &mut RemotingCommand,
48+
) -> Option<RemotingCommand> {
49+
let broker_runtime_inner = self.broker_runtime_inner.as_mut();
50+
51+
let replicas_manage = if let Some(replicas_manage) = broker_runtime_inner.replicas_manager()
52+
{
53+
replicas_manage
54+
} else {
55+
panic!("`replicas_manage` object is empty")
56+
};
57+
58+
let broker_config = broker_runtime_inner.broker_config();
59+
let response = RemotingCommand::create_response_command();
60+
61+
if !broker_config.enable_controller_mode {
62+
return Some(
63+
response
64+
.set_code(ResponseCode::SystemError)
65+
.set_remark("this request only for controllerMode"),
66+
);
67+
}
68+
69+
let broker_identity = &broker_config.broker_identity;
70+
let broker_cluster_name = &broker_identity.broker_cluster_name;
71+
let broker_name = broker_config.broker_name();
72+
let broker_id = broker_identity.broker_id;
73+
74+
let epoch_list = replicas_manage.get_epoch_entries();
75+
76+
let message_store = broker_runtime_inner.message_store().as_ref().unwrap();
77+
let max_offset = message_store.get_max_phy_offset() as u64;
78+
79+
let entry_code = EpochEntryCache::new(
80+
broker_cluster_name,
81+
broker_name,
82+
broker_id,
83+
epoch_list,
84+
max_offset,
85+
);
86+
87+
let cache = entry_code.encode().unwrap_or_default();
88+
Some(response.set_body(cache).set_code(ResponseCode::Success))
89+
}
90+
}

rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,8 @@ impl<MS: MessageStore> NotifyMinBrokerChangeIdHandler<MS> {
8989

9090
self.update_min_broker(change_header).await;
9191

92-
let mut response = RemotingCommand::default();
93-
response.set_code_ref(ResponseCode::Success);
94-
Some(response)
92+
let response = RemotingCommand::create_response_command();
93+
Some(response.set_code(ResponseCode::Success))
9594
}
9695

9796
async fn update_min_broker(&mut self, change_header: NotifyMinBrokerIdChangeRequestHeader) {

rocketmq-broker/src/processor/admin_broker_processor/reset_master_flusg_offset_handler.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
* limitations under the License.
1616
*/
1717

18-
use cheetah_string::CheetahString;
1918
use rocketmq_common::common::mix_all::MASTER_ID;
2019
use rocketmq_remoting::code::request_code::RequestCode;
2120
use rocketmq_remoting::code::response_code::ResponseCode;
@@ -47,7 +46,7 @@ impl<MS: MessageStore> ResetMasterFlushOffsetHandler<MS> {
4746
_request_code: RequestCode,
4847
request: &mut RemotingCommand,
4948
) -> Option<RemotingCommand> {
50-
let mut response = RemotingCommand::default();
49+
let response = RemotingCommand::create_response_command();
5150

5251
let broker_id = self
5352
.broker_runtime_inner
@@ -66,8 +65,6 @@ impl<MS: MessageStore> ResetMasterFlushOffsetHandler<MS> {
6665
}
6766
}
6867

69-
response.set_code_ref(ResponseCode::Success);
70-
response.set_remark_mut(CheetahString::empty());
71-
Some(response)
68+
Some(response.set_code(ResponseCode::Success))
7269
}
7370
}

0 commit comments

Comments
 (0)