Skip to content

Commit dd40e50

Browse files
authored
Merge pull request ClickHouse#88649 from ClickHouse/backport/25.8/88484
Backport ClickHouse#88484 to 25.8: Dynamic `backups.max_attempts_after_bad_version` to work on big clusters
2 parents 17fafc3 + 976a559 commit dd40e50

File tree

5 files changed

+184
-110
lines changed

5 files changed

+184
-110
lines changed

src/Backups/BackupCoordinationStageSync.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ BackupCoordinationStageSync::BackupCoordinationStageSync(
115115
, failure_after_host_disconnected_for_seconds(with_retries.getKeeperSettings().failure_after_host_disconnected_for_seconds)
116116
, finish_timeout_after_error(with_retries.getKeeperSettings().finish_timeout_after_error)
117117
, sync_period_ms(with_retries.getKeeperSettings().sync_period_ms)
118-
, max_attempts_after_bad_version(with_retries.getKeeperSettings().max_attempts_after_bad_version)
118+
// all_hosts.size() is added to max_attempts_after_bad_version since each host change the num_hosts node once, and it's a valid case
119+
, max_attempts_after_bad_version(with_retries.getKeeperSettings().max_attempts_after_bad_version + all_hosts.size())
119120
, zookeeper_path(zookeeper_path_)
120121
, root_zookeeper_path(zookeeper_path.parent_path().parent_path())
121122
, operation_zookeeper_path(zookeeper_path.parent_path())
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from pathlib import Path
2+
from typing import List
3+
4+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
5+
6+
7+
def generate_cluster_def(file: str, num_nodes: int) -> str:
8+
# For multiple workers, it has race and sometimes errors out,
9+
# so we generate it once and reuse
10+
path = (
11+
Path(__file__).parent / f"_gen/cluster_{Path(file).stem}_{num_nodes}_nodes.xml"
12+
)
13+
replicas = "\n".join(
14+
f""" <replica>
15+
<host>node{i}</host>
16+
<port>9000</port>
17+
</replica>"""
18+
for i in range(num_nodes)
19+
)
20+
config = f"""<clickhouse>
21+
<remote_servers>
22+
<cluster>
23+
<shard>
24+
{replicas}
25+
</shard>
26+
</cluster>
27+
</remote_servers>
28+
</clickhouse>"""
29+
if path.is_file():
30+
existing = path.read_text(encoding="utf-8")
31+
if existing == config:
32+
return str(path.absolute())
33+
path.parent.mkdir(parents=True, exist_ok=True)
34+
path.write_text(
35+
encoding="utf-8",
36+
data=config,
37+
)
38+
return str(path.absolute())
39+
40+
41+
def add_nodes_to_cluster(
42+
cluster: ClickHouseCluster,
43+
num_nodes: int,
44+
main_configs: List[str],
45+
user_configs: List[str],
46+
) -> List[ClickHouseInstance]:
47+
nodes = [
48+
cluster.add_instance(
49+
f"node{i}",
50+
main_configs=main_configs,
51+
user_configs=user_configs,
52+
external_dirs=["/backups/"],
53+
macros={"replica": f"node{i}", "shard": "shard1"},
54+
with_zookeeper=True,
55+
)
56+
for i in range(num_nodes)
57+
]
58+
return nodes
59+
60+
61+
def create_test_table(node: ClickHouseInstance) -> None:
62+
node.query(
63+
"""CREATE TABLE tbl ON CLUSTER 'cluster' ( x UInt64 )
64+
ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')
65+
ORDER BY tuple()"""
66+
)

tests/integration/test_backup_restore_on_cluster/test_concurrency.py

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,32 @@
11
import concurrent
2-
import os.path
32
import time
43
from random import randint, random
4+
from typing import List
55

66
import pytest
77

8-
from helpers.cluster import ClickHouseCluster
8+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
99
from helpers.test_tools import TSV, assert_eq_with_retry
1010

11+
from .concurrency_helper import (
12+
add_nodes_to_cluster,
13+
create_test_table,
14+
generate_cluster_def,
15+
)
16+
1117
cluster = ClickHouseCluster(__file__)
1218

1319
num_nodes = 10
1420

1521

16-
def generate_cluster_def():
17-
path = os.path.join(
18-
os.path.dirname(os.path.realpath(__file__)),
19-
"./_gen/cluster_for_concurrency_test.xml",
20-
)
21-
os.makedirs(os.path.dirname(path), exist_ok=True)
22-
with open(path, "w") as f:
23-
f.write("<clickhouse>\n\t<remote_servers>\n\t\t<cluster>\n\t\t\t<shard>\n")
24-
for i in range(num_nodes):
25-
f.write(
26-
f"\t\t\t\t<replica>\n\t\t\t\t\t<host>node{i}</host>\n\t\t\t\t\t<port>9000</port>\n\t\t\t\t</replica>\n"
27-
)
28-
f.write("\t\t\t</shard>\n\t\t</cluster>\n\t</remote_servers>\n</clickhouse>")
29-
return path
30-
31-
32-
main_configs = ["configs/backups_disk.xml", generate_cluster_def()]
22+
main_configs = [
23+
"configs/backups_disk.xml",
24+
generate_cluster_def(__file__, num_nodes),
25+
]
3326
# No [Zoo]Keeper retries for tests with concurrency
3427
user_configs = ["configs/allow_database_types.xml"]
3528

36-
nodes = []
37-
for i in range(num_nodes):
38-
nodes.append(
39-
cluster.add_instance(
40-
f"node{i}",
41-
main_configs=main_configs,
42-
user_configs=user_configs,
43-
external_dirs=["/backups/"],
44-
macros={"replica": f"node{i}", "shard": "shard1"},
45-
with_zookeeper=True,
46-
)
47-
)
29+
nodes = add_nodes_to_cluster(cluster, num_nodes, main_configs, user_configs)
4830

4931
node0 = nodes[0]
5032

@@ -70,23 +52,18 @@ def drop_after_test():
7052
backup_id_counter = 0
7153

7254

55+
def create_and_fill_table() -> None:
56+
create_test_table(node0)
57+
for i, node in enumerate(nodes):
58+
node.query(f"INSERT INTO tbl VALUES ({i})")
59+
60+
7361
def new_backup_name():
7462
global backup_id_counter
7563
backup_id_counter += 1
7664
return f"Disk('backups', '{backup_id_counter}')"
7765

7866

79-
def create_and_fill_table():
80-
node0.query(
81-
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
82-
"x Int32"
83-
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
84-
"ORDER BY tuple()"
85-
)
86-
for i in range(num_nodes):
87-
nodes[i].query(f"INSERT INTO tbl VALUES ({i})")
88-
89-
9067
expected_sum = num_nodes * (num_nodes - 1) // 2
9168

9269

tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py

Lines changed: 19 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,29 @@
11
import concurrent
2-
import os.path
3-
import re
4-
import time
5-
from random import randint
62

73
import pytest
84

9-
from helpers.cluster import ClickHouseCluster
10-
from helpers.test_tools import TSV, assert_eq_with_retry
5+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
6+
from helpers.test_tools import assert_eq_with_retry
7+
8+
from .concurrency_helper import (
9+
add_nodes_to_cluster,
10+
create_test_table,
11+
generate_cluster_def,
12+
)
1113

1214
cluster = ClickHouseCluster(__file__)
1315

1416
num_nodes = 2
1517

1618

17-
def generate_cluster_def():
18-
path = os.path.join(
19-
os.path.dirname(os.path.realpath(__file__)),
20-
"./_gen/cluster_for_test_disallow_concurrency.xml",
21-
)
22-
os.makedirs(os.path.dirname(path), exist_ok=True)
23-
with open(path, "w") as f:
24-
f.write(
25-
"""
26-
<clickhouse>
27-
<remote_servers>
28-
<cluster>
29-
<shard>
30-
"""
31-
)
32-
for i in range(num_nodes):
33-
f.write(
34-
"""
35-
<replica>
36-
<host>node"""
37-
+ str(i)
38-
+ """</host>
39-
<port>9000</port>
40-
</replica>
41-
"""
42-
)
43-
f.write(
44-
"""
45-
</shard>
46-
</cluster>
47-
</remote_servers>
48-
</clickhouse>
49-
"""
50-
)
51-
return path
52-
53-
54-
main_configs = ["configs/disallow_concurrency.xml", generate_cluster_def()]
19+
main_configs = [
20+
"configs/disallow_concurrency.xml",
21+
generate_cluster_def(__file__, num_nodes),
22+
]
5523
# No [Zoo]Keeper retries for tests with concurrency
5624
user_configs = ["configs/allow_database_types.xml"]
5725

58-
nodes = []
59-
for i in range(num_nodes):
60-
nodes.append(
61-
cluster.add_instance(
62-
f"node{i}",
63-
main_configs=main_configs,
64-
user_configs=user_configs,
65-
external_dirs=["/backups/"],
66-
macros={"replica": f"node{i}", "shard": "shard1"},
67-
with_zookeeper=True,
68-
)
69-
)
26+
nodes = add_nodes_to_cluster(cluster, num_nodes, main_configs, user_configs)
7027

7128
node0 = nodes[0]
7229

@@ -96,23 +53,18 @@ def drop_after_test():
9653
backup_id_counter = 0
9754

9855

56+
def create_and_fill_table() -> None:
57+
create_test_table(node0)
58+
for node in nodes:
59+
node.query("INSERT INTO tbl SELECT number FROM numbers(40000000)")
60+
61+
9962
def new_backup_name():
10063
global backup_id_counter
10164
backup_id_counter += 1
10265
return f"Disk('backups', '{backup_id_counter}')"
10366

10467

105-
def create_and_fill_table():
106-
node0.query(
107-
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
108-
"x UInt64"
109-
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
110-
"ORDER BY x"
111-
)
112-
for i in range(num_nodes):
113-
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)")
114-
115-
11668
def get_status_and_error(node, backup_or_restore_id):
11769
return (
11870
node.query(
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from typing import List
2+
3+
import pytest
4+
5+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
6+
from helpers.test_tools import TSV
7+
8+
from .concurrency_helper import (
9+
add_nodes_to_cluster,
10+
create_test_table,
11+
generate_cluster_def,
12+
)
13+
14+
cluster = ClickHouseCluster(__file__)
15+
16+
# Testing backups.max_attempts_after_bad_version is dynamic, and depends on num_nodes
17+
num_nodes = 20
18+
19+
main_configs = [
20+
"configs/backups_disk.xml",
21+
generate_cluster_def(__file__, num_nodes),
22+
]
23+
# No [Zoo]Keeper retries for tests with concurrency
24+
user_configs = ["configs/allow_database_types.xml"]
25+
26+
nodes = add_nodes_to_cluster(cluster, num_nodes, main_configs, user_configs)
27+
28+
node0 = nodes[0]
29+
30+
31+
@pytest.fixture(scope="module", autouse=True)
32+
def start_cluster():
33+
try:
34+
cluster.start()
35+
yield cluster
36+
finally:
37+
cluster.shutdown()
38+
39+
40+
@pytest.fixture(autouse=True)
41+
def drop_after_test():
42+
try:
43+
yield
44+
finally:
45+
node0.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' SYNC")
46+
node0.query("DROP DATABASE IF EXISTS mydb ON CLUSTER 'cluster' SYNC")
47+
48+
49+
backup_id_counter = 0
50+
51+
52+
def new_backup_name():
53+
global backup_id_counter
54+
backup_id_counter += 1
55+
return f"Disk('backups', '{backup_id_counter}')"
56+
57+
58+
def create_and_fill_table() -> None:
59+
create_test_table(node0)
60+
for i, node in enumerate(nodes):
61+
node.query(f"INSERT INTO tbl VALUES ({i})")
62+
63+
64+
expected_sum = num_nodes * (num_nodes - 1) // 2
65+
66+
67+
def test_backup_restore_huge_cluster():
68+
create_and_fill_table()
69+
70+
backup_name = new_backup_name()
71+
node0.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}")
72+
73+
node0.query("DROP TABLE tbl ON CLUSTER 'cluster' SYNC")
74+
node0.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
75+
node0.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
76+
77+
for i in range(num_nodes):
78+
assert nodes[i].query("SELECT sum(x) FROM tbl") == TSV([expected_sum])

0 commit comments

Comments
 (0)