Skip to content

Commit 34727e6

Browse files
authored
Reset master offset (#4043)
1 parent 92b4b89 commit 34727e6

File tree

4 files changed

+135
-0
lines changed

4 files changed

+135
-0
lines changed

rocketmq-broker/src/processor/admin_broker_processor.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::processor::admin_broker_processor::broker_config_request_handler::Bro
3131
use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler;
3232
use crate::processor::admin_broker_processor::notify_min_broker_id_handler::NotifyMinBrokerChangeIdHandler;
3333
use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler;
34+
use crate::processor::admin_broker_processor::reset_master_flusg_offset_handler::ResetMasterFlushOffsetHandler;
3435
use crate::processor::admin_broker_processor::subscription_group_handler::SubscriptionGroupHandler;
3536
use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler;
3637
use crate::processor::admin_broker_processor::update_broker_ha_handler::UpdateBrokerHaHandler;
@@ -40,6 +41,7 @@ mod broker_config_request_handler;
4041
mod consumer_request_handler;
4142
mod notify_min_broker_id_handler;
4243
mod offset_request_handler;
44+
mod reset_master_flusg_offset_handler;
4345
mod subscription_group_handler;
4446
mod topic_request_handler;
4547
mod update_broker_ha_handler;
@@ -56,6 +58,7 @@ pub struct AdminBrokerProcessor<MS: MessageStore> {
5658

5759
notify_min_broker_handler: NotifyMinBrokerChangeIdHandler<MS>,
5860
update_broker_ha_handler: UpdateBrokerHaHandler<MS>,
61+
reset_master_flusg_offset_handler: ResetMasterFlushOffsetHandler<MS>,
5962
}
6063

6164
impl<MS> RequestProcessor for AdminBrokerProcessor<MS>
@@ -91,6 +94,9 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
9194

9295
let update_broker_ha_handler = UpdateBrokerHaHandler::new(broker_runtime_inner.clone());
9396

97+
let reset_master_flusg_offset_handler =
98+
ResetMasterFlushOffsetHandler::new(broker_runtime_inner.clone());
99+
94100
AdminBrokerProcessor {
95101
topic_request_handler,
96102
broker_config_request_handler,
@@ -101,6 +107,7 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
101107
broker_runtime_inner,
102108
notify_min_broker_handler,
103109
update_broker_ha_handler,
110+
reset_master_flusg_offset_handler,
104111
}
105112
}
106113
}
@@ -237,6 +244,11 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
237244
.update_broker_ha_info(channel, ctx, request_code, request)
238245
.await
239246
}
247+
RequestCode::ResetMasterFlushOffset => {
248+
self.reset_master_flusg_offset_handler
249+
.reset_master_flush_offset(channel, ctx, request_code, request)
250+
.await
251+
}
240252
_ => Some(get_unknown_cmd_response(request_code)),
241253
}
242254
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use cheetah_string::CheetahString;
19+
use rocketmq_common::common::mix_all::MASTER_ID;
20+
use rocketmq_remoting::code::request_code::RequestCode;
21+
use rocketmq_remoting::code::response_code::ResponseCode;
22+
use rocketmq_remoting::net::channel::Channel;
23+
use rocketmq_remoting::protocol::header::reset_master_flush_offset_header::ResetMasterFlushOffsetHeader;
24+
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
25+
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
26+
use rocketmq_rust::ArcMut;
27+
use rocketmq_store::base::message_store::MessageStore;
28+
29+
use crate::broker_runtime::BrokerRuntimeInner;
30+
31+
#[derive(Clone)]
32+
pub struct ResetMasterFlushOffsetHandler<MS: MessageStore> {
33+
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
34+
}
35+
36+
impl<MS: MessageStore> ResetMasterFlushOffsetHandler<MS> {
37+
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
38+
Self {
39+
broker_runtime_inner,
40+
}
41+
}
42+
43+
pub async fn reset_master_flush_offset(
44+
&mut self,
45+
_channel: Channel,
46+
_ctx: ConnectionHandlerContext,
47+
_request_code: RequestCode,
48+
request: &mut RemotingCommand,
49+
) -> Option<RemotingCommand> {
50+
let mut response = RemotingCommand::default();
51+
52+
let broker_id = self
53+
.broker_runtime_inner
54+
.broker_config()
55+
.broker_identity
56+
.broker_id;
57+
if broker_id != MASTER_ID {
58+
let request_header = request
59+
.decode_command_custom_header::<ResetMasterFlushOffsetHeader>()
60+
.unwrap();
61+
62+
if let Some(maset_flush_offset) = request_header.master_flush_offset {
63+
if let Some(message_store) = self.broker_runtime_inner.message_store() {
64+
message_store.set_master_flushed_offset(maset_flush_offset);
65+
}
66+
}
67+
}
68+
69+
response.set_code_ref(ResponseCode::Success);
70+
response.set_remark_mut(CheetahString::empty());
71+
Some(response)
72+
}
73+
}

rocketmq-remoting/src/protocol/header.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub mod query_subscription_by_consumer_request_header;
6666
pub mod query_topic_consume_by_who_request_header;
6767
pub mod query_topics_by_consumer_request_header;
6868
pub mod reply_message_request_header;
69+
pub mod reset_master_flush_offset_header;
6970
pub mod reset_offset_request_header;
7071
pub mod search_offset_response_header;
7172
pub mod unlock_batch_mq_request_header;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use rocketmq_macros::RequestHeaderCodec;
19+
use serde::Deserialize;
20+
use serde::Serialize;
21+
22+
#[derive(Clone, Debug, Serialize, Deserialize, RequestHeaderCodec)]
23+
#[serde(rename_all = "camelCase")]
24+
pub struct ResetMasterFlushOffsetHeader {
25+
pub master_flush_offset: Option<i64>,
26+
}
27+
28+
#[cfg(test)]
29+
mod tests {
30+
use crate::protocol::header::reset_master_flush_offset_header::ResetMasterFlushOffsetHeader;
31+
32+
#[test]
33+
fn reset_master_flush_offset_header_serializes_correctly() {
34+
let header = ResetMasterFlushOffsetHeader {
35+
master_flush_offset: Some(4231),
36+
};
37+
38+
let serialized = serde_json::to_string(&header).unwrap();
39+
let expected = r#"{"masterFlushOffset":4231}"#;
40+
assert_eq!(serialized, expected);
41+
}
42+
43+
#[test]
44+
fn reset_master_flush_offset_header_deserializes_correctly() {
45+
let data = r#"{"masterFlushOffset":9527}"#;
46+
let header: ResetMasterFlushOffsetHeader = serde_json::from_str(data).unwrap();
47+
assert_eq!(header.master_flush_offset, Some(9527));
48+
}
49+
}

0 commit comments

Comments
 (0)