Skip to content

Commit 4e6df7f

Browse files
Backport ClickHouse#88484 to 25.8: Dynamic backups.max_attempts_after_bad_version to work on big clusters
1 parent dcc7561 commit 4e6df7f

File tree

5 files changed

+173
-111
lines changed

5 files changed

+173
-111
lines changed

src/Backups/BackupCoordinationStageSync.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ BackupCoordinationStageSync::BackupCoordinationStageSync(
115115
, failure_after_host_disconnected_for_seconds(with_retries.getKeeperSettings().failure_after_host_disconnected_for_seconds)
116116
, finish_timeout_after_error(with_retries.getKeeperSettings().finish_timeout_after_error)
117117
, sync_period_ms(with_retries.getKeeperSettings().sync_period_ms)
118-
, max_attempts_after_bad_version(with_retries.getKeeperSettings().max_attempts_after_bad_version)
118+
// all_hosts.size() is added to max_attempts_after_bad_version since each host change the num_hosts node once, and it's a valid case
119+
, max_attempts_after_bad_version(with_retries.getKeeperSettings().max_attempts_after_bad_version + all_hosts.size())
119120
, zookeeper_path(zookeeper_path_)
120121
, root_zookeeper_path(zookeeper_path.parent_path().parent_path())
121122
, operation_zookeeper_path(zookeeper_path.parent_path())
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from pathlib import Path
2+
from typing import Callable, List
3+
4+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
5+
6+
7+
def generate_cluster_def(file: str, num_nodes: int) -> str:
8+
path = (
9+
Path(__file__).parent / f"_gen/cluster_{Path(file).stem}_{num_nodes}_nodes.xml"
10+
)
11+
path.parent.mkdir(parents=True, exist_ok=True)
12+
replicas = "\n".join(
13+
f""" <replica>
14+
<host>node{i}</host>
15+
<port>9000</port>
16+
</replica>"""
17+
for i in range(num_nodes)
18+
)
19+
path.write_text(
20+
encoding="utf-8",
21+
data=f"""<clickhouse>
22+
<remote_servers>
23+
<cluster>
24+
<shard>
25+
{replicas}
26+
</shard>
27+
</cluster>
28+
</remote_servers>
29+
</clickhouse>""",
30+
)
31+
return str(path.absolute())
32+
33+
34+
def add_nodes_to_cluster(
35+
cluster: ClickHouseCluster,
36+
num_nodes: int,
37+
main_configs: List[str],
38+
user_configs: List[str],
39+
) -> List[ClickHouseInstance]:
40+
nodes = [
41+
cluster.add_instance(
42+
f"node{i}",
43+
main_configs=main_configs,
44+
user_configs=user_configs,
45+
external_dirs=["/backups/"],
46+
macros={"replica": f"node{i}", "shard": "shard1"},
47+
with_zookeeper=True,
48+
)
49+
for i in range(num_nodes)
50+
]
51+
return nodes
52+
53+
54+
def create_test_table(node: ClickHouseInstance) -> None:
55+
node.query(
56+
"""CREATE TABLE tbl ON CLUSTER 'cluster' ( x UInt64 )
57+
ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')
58+
ORDER BY tuple()"""
59+
)

tests/integration/test_backup_restore_on_cluster/test_concurrency.py

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,32 @@
11
import concurrent
2-
import os.path
32
import time
43
from random import randint, random
4+
from typing import List
55

66
import pytest
77

8-
from helpers.cluster import ClickHouseCluster
8+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
99
from helpers.test_tools import TSV, assert_eq_with_retry
1010

11+
from .concurrency_helper import (
12+
add_nodes_to_cluster,
13+
create_test_table,
14+
generate_cluster_def,
15+
)
16+
1117
cluster = ClickHouseCluster(__file__)
1218

1319
num_nodes = 10
1420

1521

16-
def generate_cluster_def():
17-
path = os.path.join(
18-
os.path.dirname(os.path.realpath(__file__)),
19-
"./_gen/cluster_for_concurrency_test.xml",
20-
)
21-
os.makedirs(os.path.dirname(path), exist_ok=True)
22-
with open(path, "w") as f:
23-
f.write("<clickhouse>\n\t<remote_servers>\n\t\t<cluster>\n\t\t\t<shard>\n")
24-
for i in range(num_nodes):
25-
f.write(
26-
f"\t\t\t\t<replica>\n\t\t\t\t\t<host>node{i}</host>\n\t\t\t\t\t<port>9000</port>\n\t\t\t\t</replica>\n"
27-
)
28-
f.write("\t\t\t</shard>\n\t\t</cluster>\n\t</remote_servers>\n</clickhouse>")
29-
return path
30-
31-
32-
main_configs = ["configs/backups_disk.xml", generate_cluster_def()]
22+
main_configs = [
23+
"configs/backups_disk.xml",
24+
generate_cluster_def(__file__, num_nodes),
25+
]
3326
# No [Zoo]Keeper retries for tests with concurrency
3427
user_configs = ["configs/allow_database_types.xml"]
3528

36-
nodes = []
37-
for i in range(num_nodes):
38-
nodes.append(
39-
cluster.add_instance(
40-
f"node{i}",
41-
main_configs=main_configs,
42-
user_configs=user_configs,
43-
external_dirs=["/backups/"],
44-
macros={"replica": f"node{i}", "shard": "shard1"},
45-
with_zookeeper=True,
46-
)
47-
)
29+
nodes = add_nodes_to_cluster(cluster, num_nodes, main_configs, user_configs)
4830

4931
node0 = nodes[0]
5032

@@ -70,23 +52,18 @@ def drop_after_test():
7052
backup_id_counter = 0
7153

7254

55+
def create_and_fill_table() -> None:
56+
create_test_table(node0)
57+
for i, node in enumerate(nodes):
58+
node.query(f"INSERT INTO tbl VALUES ({i})")
59+
60+
7361
def new_backup_name():
7462
global backup_id_counter
7563
backup_id_counter += 1
7664
return f"Disk('backups', '{backup_id_counter}')"
7765

7866

79-
def create_and_fill_table():
80-
node0.query(
81-
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
82-
"x Int32"
83-
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
84-
"ORDER BY tuple()"
85-
)
86-
for i in range(num_nodes):
87-
nodes[i].query(f"INSERT INTO tbl VALUES ({i})")
88-
89-
9067
expected_sum = num_nodes * (num_nodes - 1) // 2
9168

9269

tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py

Lines changed: 15 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,24 @@
11
import concurrent
2-
import os.path
3-
import re
4-
import time
5-
from random import randint
62

73
import pytest
84

9-
from helpers.cluster import ClickHouseCluster
10-
from helpers.test_tools import TSV, assert_eq_with_retry
5+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
6+
from helpers.test_tools import assert_eq_with_retry
7+
8+
from .concurrency_helper import (
9+
add_nodes_to_cluster,
10+
create_test_table,
11+
generate_cluster_def,
12+
)
1113

1214
cluster = ClickHouseCluster(__file__)
1315

1416
num_nodes = 2
1517

16-
17-
def generate_cluster_def():
18-
path = os.path.join(
19-
os.path.dirname(os.path.realpath(__file__)),
20-
"./_gen/cluster_for_test_disallow_concurrency.xml",
21-
)
22-
os.makedirs(os.path.dirname(path), exist_ok=True)
23-
with open(path, "w") as f:
24-
f.write(
25-
"""
26-
<clickhouse>
27-
<remote_servers>
28-
<cluster>
29-
<shard>
30-
"""
31-
)
32-
for i in range(num_nodes):
33-
f.write(
34-
"""
35-
<replica>
36-
<host>node"""
37-
+ str(i)
38-
+ """</host>
39-
<port>9000</port>
40-
</replica>
41-
"""
42-
)
43-
f.write(
44-
"""
45-
</shard>
46-
</cluster>
47-
</remote_servers>
48-
</clickhouse>
49-
"""
50-
)
51-
return path
52-
53-
54-
main_configs = ["configs/disallow_concurrency.xml", generate_cluster_def()]
5518
# No [Zoo]Keeper retries for tests with concurrency
5619
user_configs = ["configs/allow_database_types.xml"]
5720

58-
nodes = []
59-
for i in range(num_nodes):
60-
nodes.append(
61-
cluster.add_instance(
62-
f"node{i}",
63-
main_configs=main_configs,
64-
user_configs=user_configs,
65-
external_dirs=["/backups/"],
66-
macros={"replica": f"node{i}", "shard": "shard1"},
67-
with_zookeeper=True,
68-
)
69-
)
21+
nodes = add_nodes_to_cluster(cluster, num_nodes, main_configs, user_configs)
7022

7123
node0 = nodes[0]
7224

@@ -96,23 +48,18 @@ def drop_after_test():
9648
backup_id_counter = 0
9749

9850

51+
def create_and_fill_table() -> None:
52+
create_test_table(node0)
53+
for node in nodes:
54+
node.query("INSERT INTO tbl SELECT number FROM numbers(40000000)")
55+
56+
9957
def new_backup_name():
10058
global backup_id_counter
10159
backup_id_counter += 1
10260
return f"Disk('backups', '{backup_id_counter}')"
10361

10462

105-
def create_and_fill_table():
106-
node0.query(
107-
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
108-
"x UInt64"
109-
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
110-
"ORDER BY x"
111-
)
112-
for i in range(num_nodes):
113-
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)")
114-
115-
11663
def get_status_and_error(node, backup_or_restore_id):
11764
return (
11865
node.query(
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from typing import List
2+
3+
import pytest
4+
5+
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
6+
from helpers.test_tools import TSV
7+
8+
from .concurrency_helper import (
9+
add_nodes_to_cluster,
10+
create_test_table,
11+
generate_cluster_def,
12+
)
13+
14+
cluster = ClickHouseCluster(__file__)
15+
16+
# Testing backups.max_attempts_after_bad_version is dynamic, and depends on num_nodes
17+
num_nodes = 20
18+
19+
main_configs = [
20+
"configs/backups_disk.xml",
21+
generate_cluster_def(__file__, num_nodes),
22+
]
23+
# No [Zoo]Keeper retries for tests with concurrency
24+
user_configs = ["configs/allow_database_types.xml"]
25+
26+
nodes = add_nodes_to_cluster(cluster, num_nodes, main_configs, user_configs)
27+
28+
node0 = nodes[0]
29+
30+
31+
@pytest.fixture(scope="module", autouse=True)
32+
def start_cluster():
33+
try:
34+
cluster.start()
35+
yield cluster
36+
finally:
37+
cluster.shutdown()
38+
39+
40+
@pytest.fixture(autouse=True)
41+
def drop_after_test():
42+
try:
43+
yield
44+
finally:
45+
node0.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' SYNC")
46+
node0.query("DROP DATABASE IF EXISTS mydb ON CLUSTER 'cluster' SYNC")
47+
48+
49+
backup_id_counter = 0
50+
51+
52+
def new_backup_name():
53+
global backup_id_counter
54+
backup_id_counter += 1
55+
return f"Disk('backups', '{backup_id_counter}')"
56+
57+
58+
def create_and_fill_table() -> None:
59+
create_test_table(node0)
60+
for i, node in enumerate(nodes):
61+
node.query(f"INSERT INTO tbl VALUES ({i})")
62+
63+
64+
expected_sum = num_nodes * (num_nodes - 1) // 2
65+
66+
67+
def test_backup_restore_huge_cluster():
68+
create_and_fill_table()
69+
70+
backup_name = new_backup_name()
71+
node0.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}")
72+
73+
node0.query("DROP TABLE tbl ON CLUSTER 'cluster' SYNC")
74+
node0.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
75+
node0.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
76+
77+
for i in range(num_nodes):
78+
assert nodes[i].query("SELECT sum(x) FROM tbl") == TSV([expected_sum])

0 commit comments

Comments
 (0)