Skip to content

Commit 685cd4f

Browse files
authored
refactor(metactl): when restore without --initial-cluster, add default single node cluster config (#17969)
* refactor(metactl): when bendsave restores, add default single node cluster config When using `bendsave` to restore a cluster, it should fill in a default databend-meta cluster config which has only one nodes. So that it restores a working one node cluster at once. Other changes: rewrite meta new-cluster test by python. * chore: add test to ensure restore a single node cluster from 3 nodes cluster data
1 parent b32ab7c commit 685cd4f

File tree

9 files changed

+294
-126
lines changed

9 files changed

+294
-126
lines changed

.github/actions/test_metactl/action.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ runs:
1515
- name: Test metactl restore new cluster
1616
shell: bash
1717
run: |
18-
bash ./tests/metactl/test-metactl-restore-new-cluster.sh
18+
python -m pip install -r ./tests/metactl/requirements.txt
19+
python ./tests/metactl/test-metactl-restore-new-cluster.py

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ stateless-cluster-test-tls: build
9696

9797
metactl-test:
9898
bash ./tests/metactl/test-metactl.sh
99-
bash ./tests/metactl/test-metactl-restore-new-cluster.sh
99+
python ./tests/metactl/test-metactl-restore-new-cluster.py
100100

101101
meta-kvapi-test:
102102
bash ./tests/meta-kvapi/test-meta-kvapi.sh

src/bendsave/src/restore.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,16 @@ pub async fn restore_meta(efs: Operator, meta_cfg: &databend_meta::configs::Conf
5858
local_file.sync_all().await?;
5959
local_file.shutdown().await?;
6060

61+
let id = meta_cfg.raft_config.id;
62+
6163
let import_args = ImportArgs {
6264
raft_dir: Some(meta_cfg.raft_config.raft_dir.to_string()),
6365
db: DATABEND_META_BACKUP_PATH.to_string(),
64-
initial_cluster: vec![],
65-
id: meta_cfg.raft_config.id,
66+
67+
// when restoring, create a single node cluster, so that it will serve at once.
68+
// And the address in the following will be replaced with the content in the config upon `databend-meta` startup.
69+
initial_cluster: vec![format!("{id}=127.0.0.1:28004")],
70+
id,
6671
};
6772
import_data(&import_args).await?;
6873

src/meta/control/src/args.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,24 @@ pub struct ImportArgs {
148148
#[clap(long, default_value = "")]
149149
pub db: String,
150150

151-
/// initial_cluster format: node_id=endpoint,grpc_api_addr
151+
/// initial_cluster format: <node_id>=<raft_api_host>:<raft_api_port>
152+
///
153+
/// For example, the following command restores the node(id=1) of a cluster of two:
154+
/// ```text
155+
/// $0 \
156+
/// --raft-dir ./meta_dir \
157+
/// --db meta.db \
158+
/// --id=1 \
159+
/// --initial-cluster 1=localhost:29103 \
160+
/// --initial-cluster 1=localhost:29103
161+
/// ```
162+
///
163+
/// If it is empty, the cluster information in the imported data is kept.
164+
/// For example:
165+
/// ```text
166+
/// ["state_machine/0",{"StateMachineMeta":{"key":"LastMembership","value":{"Membership":{"log_id":null,"membership":{"configs":[[4]],"nodes":{"4":{}}}}}}}]
167+
/// ["state_machine/0",{"Nodes":{"key":4,"value":{"name":"4","endpoint":{"addr":"127.0.0.1","port":28004},"grpc_api_advertise_address":null}}}]
168+
/// ```
152169
#[clap(long)]
153170
pub initial_cluster: Vec<String>,
154171

src/meta/control/src/import.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,8 @@ fn build_nodes(initial_cluster: Vec<String>, id: u64) -> anyhow::Result<BTreeMap
284284
return Ok(nodes);
285285
}
286286

287+
eprintln!("new cluster: {:?}", nodes);
288+
287289
if !nodes.contains_key(&id) {
288290
return Err(anyhow::anyhow!(
289291
"node id ({}) has to be one of cluster member({:?})",

tests/metactl/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
requests
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
#!/usr/bin/env python3
2+
3+
import json
4+
import os
5+
import shutil
6+
import subprocess
7+
import time
8+
import sys
9+
import re
10+
import signal
11+
from typing import Dict
12+
import requests
13+
from pathlib import Path
14+
import socket
15+
16+
# Configuration
17+
BUILD_PROFILE = os.environ.get("BUILD_PROFILE", "debug")
18+
SCRIPT_PATH = Path(__file__).parent.absolute()
19+
metactl_bin = f"./target/{BUILD_PROFILE}/databend-metactl"
20+
21+
22+
# Helper functions
23+
def run_command(cmd, check=True, shell=False):
24+
"""Run a command and return its output"""
25+
if isinstance(cmd, str) and not shell:
26+
cmd = cmd.split()
27+
28+
print(f"Running: {cmd}")
29+
result = subprocess.run(cmd,
30+
check=check, shell=shell,
31+
text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
32+
return result.stdout
33+
34+
35+
def wait_for_port(port, timeout=10):
36+
now = time.time()
37+
38+
while time.time() - now < timeout:
39+
try:
40+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
41+
sock.connect(("0.0.0.0", port))
42+
print("OK :{} is listening".format(port))
43+
sys.stdout.flush()
44+
return
45+
except Exception:
46+
print("... connecting to :{}".format(port))
47+
sys.stdout.flush()
48+
time.sleep(1)
49+
50+
raise Exception("fail to connect to :{}".format(port))
51+
52+
53+
def kill_databend_meta():
54+
print_step("Kill databend-meta processes")
55+
try:
56+
run_command("killall databend-meta", check=False)
57+
time.sleep(2)
58+
except subprocess.CalledProcessError:
59+
pass # It's okay if there are no processes to kill
60+
61+
62+
def start_meta_node(node_id, is_new: bool):
63+
meta_bin = f"./target/{BUILD_PROFILE}/databend-meta"
64+
65+
ports = {
66+
1: (9191, 19191),
67+
2: (28202, 29191),
68+
3: (28302, 39191),
69+
}
70+
71+
if is_new:
72+
config_fn = f"new-databend-meta-node-{node_id}.toml"
73+
port = ports[node_id][1]
74+
else:
75+
config_fn = f"databend-meta-node-{node_id}.toml"
76+
port = ports[node_id][0]
77+
78+
config_file = f"./tests/metactl/config/{config_fn}"
79+
80+
subprocess.Popen([meta_bin, "--config-file", config_file],
81+
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
82+
83+
wait_for_port(port)
84+
time.sleep(1)
85+
86+
87+
# return the stdout of metactl export
88+
def metactl_export(meta_dir: str, output_path: str) -> str:
89+
print_step(f"Start: Export meta data from {meta_dir} to {output_path}")
90+
91+
cmd = [metactl_bin, "export", "--raft-dir", meta_dir]
92+
93+
if output_path:
94+
cmd.append("--db")
95+
cmd.append(output_path)
96+
97+
result = subprocess.Popen(cmd,
98+
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
99+
result.wait()
100+
101+
print_step(f"Done: Exported meta data to {output_path}")
102+
return result.stdout.read().decode()
103+
104+
105+
def metactl_import(meta_dir: str, id: int, db_path: str, initial_cluster: Dict[int, str]):
106+
cmd = [metactl_bin, "import", "--raft-dir", meta_dir,
107+
"--id", str(id),
108+
"--db", db_path, ]
109+
110+
for id, addr in initial_cluster.items():
111+
cmd.append("--initial-cluster")
112+
cmd.append(f"{id}={addr}")
113+
114+
result = subprocess.Popen(cmd,
115+
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
116+
result.wait()
117+
print(result.stdout.read().decode())
118+
print(result.stderr.read().decode())
119+
120+
121+
def cluster_status(msg: str) -> str:
122+
print_step(f"Check /v1/cluster/status {msg} start")
123+
124+
response = requests.get("http://127.0.0.1:28101/v1/cluster/status")
125+
status = response.json()
126+
127+
print_step(f"Check /v1/cluster/status {msg} end")
128+
129+
return status
130+
131+
132+
def print_title(title):
133+
print()
134+
print(" ===")
135+
print(f" === {title}")
136+
print(" ===")
137+
138+
139+
def print_step(step: str):
140+
print(f" === {step}")
141+
142+
143+
def main():
144+
kill_databend_meta()
145+
146+
shutil.rmtree(".databend", ignore_errors=True)
147+
148+
print_title("1. Start 3 meta node cluster to generate raft logs")
149+
150+
start_meta_node(1, False)
151+
start_meta_node(2, False)
152+
start_meta_node(3, False)
153+
154+
old_status = cluster_status("old cluster")
155+
print(json.dumps(old_status, indent=2))
156+
157+
kill_databend_meta()
158+
159+
print_title("2. Export meta node data")
160+
161+
metactl_export("./.databend/meta1", "meta.db")
162+
163+
exported = metactl_export("./.databend/meta1", None)
164+
print_step("Exported meta data. start")
165+
print(exported)
166+
print_step("Exported meta data. end")
167+
168+
print_step("Clear meta service data dirs")
169+
shutil.rmtree(".databend")
170+
171+
print_title("3. Import old meta node data to new cluster")
172+
173+
cluster = {
174+
4: "localhost:29103",
175+
5: "localhost:29203",
176+
6: "localhost:29303",
177+
}
178+
179+
metactl_import("./.databend/new_meta1", 4, "meta.db", cluster)
180+
metactl_import("./.databend/new_meta2", 5, "meta.db", cluster)
181+
metactl_import("./.databend/new_meta3", 6, "meta.db", cluster)
182+
183+
print_step("3.1 Check if state machine is complete by checking key 'LastMembership'")
184+
185+
meta1_data = metactl_export("./.databend/new_meta1", None)
186+
print(meta1_data)
187+
188+
meta2_data = metactl_export("./.databend/new_meta2", None)
189+
print(meta2_data)
190+
191+
meta3_data = metactl_export("./.databend/new_meta3", None)
192+
print(meta3_data)
193+
194+
assert "LastMembership" in meta1_data
195+
assert "LastMembership" in meta2_data
196+
assert "LastMembership" in meta3_data
197+
198+
print_title("3.2 Start 3 new meta node cluster")
199+
200+
start_meta_node(1, True)
201+
start_meta_node(2, True)
202+
start_meta_node(3, True)
203+
204+
print_step("sleep 5 sec to wait for membership to commit")
205+
time.sleep(5)
206+
207+
print_title("3.3 Check membership in new cluster")
208+
209+
new_status = cluster_status("new cluster")
210+
print(json.dumps(new_status, indent=2))
211+
212+
voters = new_status["voters"]
213+
214+
names = dict([(voter["name"], voter) for voter in voters])
215+
216+
assert names["4"]["endpoint"] == {"addr": "localhost", "port": 29103}
217+
assert names["5"]["endpoint"] == {"addr": "localhost", "port": 29203}
218+
assert names["6"]["endpoint"] == {"addr": "localhost", "port": 29303}
219+
220+
kill_databend_meta()
221+
shutil.rmtree(".databend")
222+
223+
224+
print_title("4. Import with --initial-cluster of one node")
225+
226+
cluster = {
227+
# This will be replaced when meta node starts
228+
4: "127.0.0.1:12345",
229+
}
230+
231+
metactl_import("./.databend/new_meta1", 4, "meta.db", cluster)
232+
233+
print_step("4.1 Check if state machine is complete by checking key 'LastMembership'")
234+
meta1_data = metactl_export("./.databend/new_meta1", None)
235+
print(meta1_data)
236+
assert "LastMembership" in meta1_data
237+
238+
print_title("4.2 Start new meta node cluster")
239+
240+
start_meta_node(1, True)
241+
242+
print_step("sleep 3 sec to wait for membership to commit")
243+
time.sleep(3)
244+
245+
print_title("4.3 Check membership in new cluster")
246+
247+
new_status = cluster_status("new single node cluster")
248+
print(json.dumps(new_status, indent=2))
249+
250+
voters = new_status["voters"]
251+
252+
names = dict([(voter["name"], voter) for voter in voters])
253+
254+
# The address is replaced with the content in config.
255+
assert names["4"]["endpoint"] == {"addr": "localhost", "port": 29103}
256+
257+
kill_databend_meta()
258+
shutil.rmtree(".databend")
259+
260+
if __name__ == "__main__":
261+
main()

0 commit comments

Comments
 (0)