Skip to content

Commit 0134da1

Browse files
Concurrent merges, reorganize files
1 parent 9be28c0 commit 0134da1

File tree

5 files changed

+307
-154
lines changed

5 files changed

+307
-154
lines changed

s3/tests/export_part/concurrent_actions.py renamed to s3/tests/export_part/concurrent_alter.py

Lines changed: 14 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def get_alter_functions():
8585
create_partitions_with_random_uint64,
8686
{"number_of_partitions": 5, "number_of_parts": 1},
8787
),
88+
(optimize, {"node": ""}),
8889
]
8990

9091

@@ -155,13 +156,15 @@ def alter_before_export(self, alter_function, kwargs):
155156
partitioned_replicated_merge_tree_table(
156157
table_name=source_table,
157158
partition_by="p",
159+
number_of_parts=2,
158160
columns=default_columns(simple=False),
159161
query_settings="storage_policy = 'tiered_storage'",
160162
)
161163
else:
162164
partitioned_merge_tree_table(
163165
table_name=source_table,
164166
partition_by="p",
167+
number_of_parts=2,
165168
columns=default_columns(simple=False),
166169
query_settings="storage_policy = 'tiered_storage'",
167170
)
@@ -182,6 +185,8 @@ def alter_before_export(self, alter_function, kwargs):
182185
kwargs["path_to_backup"] = (
183186
f"/clickhouse/tables/shard0/{source_table}_temp"
184187
)
188+
if alter_function == optimize:
189+
kwargs["node"] = self.context.node
185190

186191
with When(f"I {alter_function.__name__} on the source table"):
187192
if alter_function == alter_table_move_partition:
@@ -247,13 +252,15 @@ def alter_after_export(self, alter_function, kwargs):
247252
partitioned_replicated_merge_tree_table(
248253
table_name=source_table,
249254
partition_by="p",
255+
number_of_parts=2,
250256
columns=default_columns(simple=False),
251257
query_settings="storage_policy = 'tiered_storage'",
252258
)
253259
else:
254260
partitioned_merge_tree_table(
255261
table_name=source_table,
256262
partition_by="p",
263+
number_of_parts=2,
257264
columns=default_columns(simple=False),
258265
query_settings="storage_policy = 'tiered_storage'",
259266
)
@@ -286,6 +293,8 @@ def alter_after_export(self, alter_function, kwargs):
286293
kwargs["path_to_backup"] = (
287294
f"/clickhouse/tables/shard0/{source_table}_temp"
288295
)
296+
if alter_function == optimize:
297+
kwargs["node"] = self.context.node
289298

290299
with When("I read data on the S3 table"):
291300
initial_destination_data = select_all_ordered(
@@ -337,13 +346,15 @@ def alter_during_export(self, alter_function, kwargs):
337346
partitioned_replicated_merge_tree_table(
338347
table_name=source_table,
339348
partition_by="p",
349+
number_of_parts=2,
340350
columns=default_columns(simple=False),
341351
query_settings="storage_policy = 'tiered_storage'",
342352
)
343353
else:
344354
partitioned_merge_tree_table(
345355
table_name=source_table,
346356
partition_by="p",
357+
number_of_parts=2,
347358
columns=default_columns(simple=False),
348359
query_settings="storage_policy = 'tiered_storage'",
349360
)
@@ -384,6 +395,8 @@ def alter_during_export(self, alter_function, kwargs):
384395
kwargs["path_to_backup"] = (
385396
f"/clickhouse/tables/shard0/{source_table}_temp"
386397
)
398+
if alter_function == optimize:
399+
kwargs["node"] = self.context.node
387400

388401
with And(f"I {alter_function.__name__} on the source table"):
389402
if alter_function == alter_table_move_partition:
@@ -416,63 +429,8 @@ def alter_during_export(self, alter_function, kwargs):
416429
assert initial_source_data == destination_data, error()
417430

418431

419-
@TestScenario
420-
def select_and_export(self):
421-
"""Test selecting from the source table before, during, and after exports."""
422-
423-
with Given("I create a populated source table and empty S3 table"):
424-
source_table = "source_" + getuid()
425-
426-
partitioned_merge_tree_table(
427-
table_name=source_table,
428-
partition_by="p",
429-
columns=default_columns(simple=False),
430-
stop_merges=True,
431-
)
432-
s3_table_name = create_s3_table(
433-
table_name="s3",
434-
create_new_bucket=True,
435-
columns=default_columns(simple=False),
436-
)
437-
438-
with And("I select data from the source table before exporting parts"):
439-
before_export_data = select_all_ordered(
440-
table_name=source_table, node=self.context.node
441-
)
442-
443-
with When("I slow the network"):
444-
network_packet_rate_limit(node=self.context.node, rate_mbit=0.05)
445-
446-
with And("I export parts to the S3 table"):
447-
export_parts(
448-
source_table=source_table,
449-
destination_table=s3_table_name,
450-
node=self.context.node,
451-
)
452-
453-
with And("I select data from the source table during exporting parts"):
454-
during_export_data = select_all_ordered(
455-
table_name=source_table, node=self.context.node
456-
)
457-
458-
with And("I select data from the source and destination after exporting parts"):
459-
sleep(5)
460-
after_export_data = select_all_ordered(
461-
table_name=source_table, node=self.context.node
462-
)
463-
destination_data = select_all_ordered(
464-
table_name=s3_table_name, node=self.context.node
465-
)
466-
467-
with Then("Check data is consistent before, during, and after exports"):
468-
assert before_export_data == during_export_data, error()
469-
assert during_export_data == after_export_data, error()
470-
assert before_export_data == after_export_data, error()
471-
assert before_export_data == destination_data, error()
472-
473-
474432
@TestFeature
475-
@Name("concurrent actions")
433+
@Name("concurrent alter")
476434
def feature(self):
477435
"""Check concurrent actions on the source table during exporting parts to S3 storage."""
478436

@@ -482,4 +440,3 @@ def feature(self):
482440
Scenario(run=alter_before_export)
483441
Scenario(run=alter_during_export)
484442
Scenario(run=alter_after_export)
485-
Scenario(run=select_and_export)
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
from time import sleep
2+
from testflows.core import *
3+
from s3.tests.export_part.steps import *
4+
from helpers.create import *
5+
from helpers.queries import *
6+
from s3.requirements.export_part import *
7+
from alter.stress.tests.tc_netem import *
8+
9+
10+
@TestScenario
11+
@Requirements(RQ_ClickHouse_ExportPart_Concurrency("1.0"))
12+
def parallel_insert(self):
13+
"""Check that exports work correctly with concurrent inserts of source data."""
14+
15+
with Given("I create an empty source and S3 table"):
16+
source_table = "source_" + getuid()
17+
18+
partitioned_merge_tree_table(
19+
table_name=source_table,
20+
partition_by="p",
21+
columns=default_columns(),
22+
stop_merges=True,
23+
populate=False,
24+
)
25+
s3_table_name = create_s3_table(table_name="s3", create_new_bucket=True)
26+
27+
with When(
28+
"I insert data and export it in parallel",
29+
description="""
30+
5 partitions with 1 part each are inserted.
31+
The export is queued in parallel and usually behaves by exporting
32+
a snapshot of the source data, often getting just the first partition
33+
which means the export happens right after the first INSERT query completes.
34+
""",
35+
):
36+
Step(test=create_partitions_with_random_uint64, parallel=True)(
37+
table_name=source_table,
38+
number_of_partitions=5,
39+
number_of_parts=1,
40+
)
41+
Step(test=export_parts, parallel=True)(
42+
source_table=source_table,
43+
destination_table=s3_table_name,
44+
node=self.context.node,
45+
)
46+
join()
47+
48+
with Then("Destination data should be a subset of source data"):
49+
source_data = select_all_ordered(
50+
table_name=source_table, node=self.context.node
51+
)
52+
destination_data = select_all_ordered(
53+
table_name=s3_table_name, node=self.context.node
54+
)
55+
assert set(source_data) >= set(destination_data), error()
56+
57+
with And("Inserts should have completed successfully"):
58+
assert len(source_data) == 15, error()
59+
60+
61+
@TestScenario
62+
@Requirements(RQ_ClickHouse_ExportPart_Concurrency("1.0"))
63+
def multiple_sources_same_destination(self, num_tables):
64+
"""Check concurrent exports from different sources to the same S3 table."""
65+
66+
with Given(f"I create {num_tables} populated source tables and an empty S3 table"):
67+
source_tables, destination_tables = concurrent_export_tables(
68+
num_tables=num_tables
69+
)
70+
71+
with And("I read data from all tables"):
72+
source_data = []
73+
destination_data = []
74+
for i in range(num_tables):
75+
data = select_all_ordered(
76+
table_name=source_tables[i], node=self.context.node
77+
)
78+
source_data.extend(data)
79+
data = select_all_ordered(
80+
table_name=destination_tables[i], node=self.context.node
81+
)
82+
destination_data.extend(data)
83+
84+
with Then("All data should be present in the S3 table"):
85+
assert set(source_data) == set(destination_data), error()
86+
87+
with And("Exports should have run concurrently"):
88+
verify_export_concurrency(node=self.context.node, source_tables=source_tables)
89+
90+
91+
@TestScenario
92+
def select_parts(self):
93+
"""Test selecting from the source table before, during, and after exports."""
94+
95+
with Given("I create a populated source table and empty S3 table"):
96+
source_table = "source_" + getuid()
97+
98+
partitioned_merge_tree_table(
99+
table_name=source_table,
100+
partition_by="p",
101+
columns=default_columns(simple=False),
102+
stop_merges=True,
103+
)
104+
s3_table_name = create_s3_table(
105+
table_name="s3",
106+
create_new_bucket=True,
107+
columns=default_columns(simple=False),
108+
)
109+
110+
with And("I select data from the source table before exporting parts"):
111+
before_export_data = select_all_ordered(
112+
table_name=source_table, node=self.context.node
113+
)
114+
115+
with When("I slow the network"):
116+
network_packet_rate_limit(node=self.context.node, rate_mbit=0.05)
117+
118+
with And("I export parts to the S3 table"):
119+
export_parts(
120+
source_table=source_table,
121+
destination_table=s3_table_name,
122+
node=self.context.node,
123+
)
124+
125+
with And("I select data from the source table during exporting parts"):
126+
during_export_data = select_all_ordered(
127+
table_name=source_table, node=self.context.node
128+
)
129+
130+
with And("I select data from the source and destination after exporting parts"):
131+
sleep(5)
132+
after_export_data = select_all_ordered(
133+
table_name=source_table, node=self.context.node
134+
)
135+
destination_data = select_all_ordered(
136+
table_name=s3_table_name, node=self.context.node
137+
)
138+
139+
with Then("Check data is consistent before, during, and after exports"):
140+
assert before_export_data == during_export_data, error()
141+
assert during_export_data == after_export_data, error()
142+
assert before_export_data == after_export_data, error()
143+
assert before_export_data == destination_data, error()
144+
145+
146+
@TestScenario
147+
def merge_parts(self):
148+
"""Test merging parts from the source table before, during, and after exporting parts."""
149+
150+
with Given("I create a populated source table and empty S3 table"):
151+
source_table = "source_" + getuid()
152+
partitioned_merge_tree_table(
153+
table_name=source_table,
154+
partition_by="p",
155+
number_of_parts=2,
156+
columns=default_columns(simple=False),
157+
)
158+
s3_table_name = create_s3_table(
159+
table_name="s3",
160+
create_new_bucket=True,
161+
columns=default_columns(simple=False),
162+
)
163+
164+
with And("I optimize partition 1 before export"):
165+
optimize_partition(
166+
table_name=source_table,
167+
partition="1",
168+
)
169+
170+
with And("I read source parts before export"):
171+
source_parts_before_export = get_parts_per_partition(table_name=source_table)
172+
173+
with When("I slow the network to make export take longer"):
174+
network_packet_rate_limit(node=self.context.node, rate_mbit=0.05)
175+
176+
with And("I export parts to the S3 table"):
177+
export_parts(
178+
source_table=source_table,
179+
destination_table=s3_table_name,
180+
node=self.context.node,
181+
)
182+
183+
with And("I optimize partition 2 during export"):
184+
optimize_partition(
185+
table_name=source_table,
186+
partition="2",
187+
)
188+
189+
with And("I optimize partition 3 after export"):
190+
sleep(5)
191+
optimize_partition(
192+
table_name=source_table,
193+
partition="3",
194+
)
195+
196+
with Then("I verify destination partition structure is correct"):
197+
destination_parts_after_export = get_s3_parts_per_partition(table_name=s3_table_name)
198+
assert destination_parts_after_export == source_parts_before_export, error()
199+
200+
with And("Source matches destination"):
201+
source_matches_destination(
202+
source_table=source_table,
203+
destination_table=s3_table_name,
204+
)
205+
206+
with And("Final partition structure is correct"):
207+
source_parts_after_export = get_parts_per_partition(table_name=source_table)
208+
assert source_parts_after_export == {"1": 1, "2": 1, "3": 1, "4": 2, "5": 2}, error()
209+
assert destination_parts_after_export == {"1": 1, "2": 2, "3": 2, "4": 2, "5": 2}, error()
210+
211+
212+
@TestFeature
213+
@Name("concurrent other")
214+
def feature(self):
215+
"""Check that exports work correctly with explicitly parallel tests."""
216+
217+
Scenario(test=multiple_sources_same_destination)(num_tables=5)
218+
Scenario(run=parallel_insert)
219+
Scenario(run=select_parts)
220+
Scenario(run=merge_parts)

s3/tests/export_part/feature.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ def minio(self, uri, bucket_prefix):
2727
Feature(run=load("s3.tests.export_part.engines_volumes", "feature"))
2828
Feature(run=load("s3.tests.export_part.datatypes", "feature"))
2929
Feature(run=load("s3.tests.export_part.network", "feature"))
30-
Feature(run=load("s3.tests.export_part.parallel_actions", "feature"))
3130
Feature(run=load("s3.tests.export_part.system_monitoring", "feature"))
32-
Feature(run=load("s3.tests.export_part.concurrent_actions", "feature"))
31+
Feature(run=load("s3.tests.export_part.concurrent_alter", "feature"))
32+
Feature(run=load("s3.tests.export_part.concurrent_other", "feature"))

0 commit comments

Comments
 (0)