Skip to content

Commit 031a50e

Browse files
[ISSUE #6391]🚀Implement ExportMetadataInRocksDB Command in rocketmq-admin-core
1 parent 7616f22 commit 031a50e

File tree

5 files changed

+172
-0
lines changed

5 files changed

+172
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ tracing = { workspace = true }
4949
cheetah-string = { workspace = true }
5050
chrono = "0.4"
5151
clap = { version = "4.5.60", features = ["derive"] }
52+
rocksdb = "0.24"
5253
clap_complete = "4.5"
5354
tabled = { version = "0.20.0", features = ["derive"] }
5455
futures = "0.3.32"

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,12 @@ impl CommandExecute for ClassificationTablePrint {
359359
command: "exportMetadata",
360360
remark: "Export metadata.",
361361
},
362+
Command {
363+
category: "Export",
364+
command: "exportMetadataInRocksDB",
365+
remark: "Export RocksDB kv config (topics/subscriptionGroups). Recommend to use [mqadmin \
366+
rocksDBConfigToJson]",
367+
},
362368
Command {
363369
category: "HA",
364370
command: "getSyncStateSet",

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod export_metadata_in_rocks_db_sub_command;
1516
mod export_metadata_sub_command;
1617

1718
use std::sync::Arc;
@@ -20,6 +21,7 @@ use clap::Subcommand;
2021
use rocketmq_error::RocketMQResult;
2122
use rocketmq_remoting::runtime::RPCHook;
2223

24+
use crate::commands::export::export_metadata_in_rocks_db_sub_command::ExportMetadataInRocksDBSubCommand;
2325
use crate::commands::export::export_metadata_sub_command::ExportMetadataSubCommand;
2426
use crate::commands::CommandExecute;
2527

@@ -31,12 +33,20 @@ pub enum ExportCommands {
3133
long_about = None,
3234
)]
3335
ExportMetadata(ExportMetadataSubCommand),
36+
37+
#[command(
38+
name = "exportMetadataInRocksDB",
39+
about = "Export RocksDB kv config (topics/subscriptionGroups). Recommend to use [mqadmin rocksDBConfigToJson]",
40+
long_about = None,
41+
)]
42+
ExportMetadataInRocksDB(ExportMetadataInRocksDBSubCommand),
3443
}
3544

3645
impl CommandExecute for ExportCommands {
3746
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
3847
match self {
3948
ExportCommands::ExportMetadata(cmd) => cmd.execute(rpc_hook).await,
49+
ExportCommands::ExportMetadataInRocksDB(cmd) => cmd.execute(rpc_hook).await,
4050
}
4151
}
4252
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2023 The RocketMQ Rust Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use clap::Parser;
18+
use rocketmq_error::RocketMQError;
19+
use rocketmq_error::RocketMQResult;
20+
use rocketmq_remoting::runtime::RPCHook;
21+
use rocksdb::Options;
22+
use rocksdb::DB;
23+
24+
use crate::commands::CommandExecute;
25+
26+
const TOPICS_JSON_CONFIG: &str = "topics";
27+
const SUBSCRIPTION_GROUP_JSON_CONFIG: &str = "subscriptionGroups";
28+
29+
#[derive(Debug, Clone, Parser)]
30+
pub struct ExportMetadataInRocksDBSubCommand {
31+
#[arg(
32+
short = 'p',
33+
long = "path",
34+
required = true,
35+
help = "Absolute path for the metadata directory"
36+
)]
37+
path: String,
38+
39+
#[arg(
40+
short = 't',
41+
long = "configType",
42+
required = true,
43+
help = "Name of kv config, e.g. topics/subscriptionGroups"
44+
)]
45+
config_type: String,
46+
47+
#[arg(
48+
short = 'j',
49+
long = "jsonEnable",
50+
required = false,
51+
help = "Json format enable, Default: false"
52+
)]
53+
json_enable: bool,
54+
}
55+
56+
impl ExportMetadataInRocksDBSubCommand {
57+
fn handle_export_metadata(db: &DB, config_type: &str, json_enable: bool) -> RocketMQResult<()> {
58+
if json_enable {
59+
let mut config_table = serde_json::Map::new();
60+
61+
Self::iterate_kv_store(db, |key, value| {
62+
let config_key = String::from_utf8_lossy(key).to_string();
63+
let config_value = String::from_utf8_lossy(value).to_string();
64+
match serde_json::from_str::<serde_json::Value>(&config_value) {
65+
Ok(json_object) => {
66+
config_table.insert(config_key, json_object);
67+
}
68+
Err(_) => {
69+
config_table.insert(config_key, serde_json::Value::String(config_value));
70+
}
71+
}
72+
})?;
73+
74+
let table_key = if config_type.eq_ignore_ascii_case(TOPICS_JSON_CONFIG) {
75+
"topicConfigTable"
76+
} else {
77+
"subscriptionGroupTable"
78+
};
79+
80+
let mut json_config = serde_json::Map::new();
81+
json_config.insert(table_key.to_string(), serde_json::Value::Object(config_table));
82+
83+
let json_config_str = serde_json::to_string_pretty(&json_config)
84+
.map_err(|e| RocketMQError::Internal(format!("Failed to serialize JSON: {}", e)))?;
85+
println!("{}", json_config_str);
86+
} else {
87+
let mut count: u64 = 0;
88+
Self::iterate_kv_store(db, |key, value| {
89+
count += 1;
90+
let config_key = String::from_utf8_lossy(key);
91+
let config_value = String::from_utf8_lossy(value);
92+
println!("{}, Key: {}, Value: {}", count, config_key, config_value);
93+
})?;
94+
}
95+
Ok(())
96+
}
97+
98+
fn iterate_kv_store<F>(db: &DB, mut consumer: F) -> RocketMQResult<()>
99+
where
100+
F: FnMut(&[u8], &[u8]),
101+
{
102+
let iter = db.iterator(rocksdb::IteratorMode::Start);
103+
for item in iter {
104+
match item {
105+
Ok((key, value)) => {
106+
consumer(&key, &value);
107+
}
108+
Err(e) => {
109+
return Err(RocketMQError::Internal(format!("RocksDB iterator error: {}", e)));
110+
}
111+
}
112+
}
113+
Ok(())
114+
}
115+
}
116+
117+
impl CommandExecute for ExportMetadataInRocksDBSubCommand {
118+
async fn execute(&self, _rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
119+
let path = self.path.trim();
120+
if path.is_empty() || !std::path::Path::new(path).exists() {
121+
println!("RocksDB path is invalid.");
122+
return Ok(());
123+
}
124+
125+
let config_type = self.config_type.trim();
126+
let full_path = if path.ends_with('/') {
127+
format!("{}{}", path, config_type)
128+
} else {
129+
format!("{}/{}", path, config_type)
130+
};
131+
132+
if !config_type.eq_ignore_ascii_case(TOPICS_JSON_CONFIG)
133+
&& !config_type.eq_ignore_ascii_case(SUBSCRIPTION_GROUP_JSON_CONFIG)
134+
{
135+
println!(
136+
"Invalid config type={}, Options: topics,subscriptionGroups",
137+
config_type
138+
);
139+
return Ok(());
140+
}
141+
142+
let mut opts = Options::default();
143+
opts.create_if_missing(false);
144+
145+
let db = DB::open_for_read_only(&opts, &full_path, false).map_err(|e| {
146+
println!("RocksDB load error, path={}", full_path);
147+
RocketMQError::Internal(format!("Failed to open RocksDB: {}", e))
148+
})?;
149+
150+
let result = Self::handle_export_metadata(&db, config_type, self.json_enable);
151+
drop(db);
152+
result
153+
}
154+
}

0 commit comments

Comments
 (0)