Skip to content

Commit 7312bb5

Browse files
committed
tests
1 parent 7f5031a commit 7312bb5

File tree

3 files changed

+140
-0
lines changed

3 files changed

+140
-0
lines changed

tests/integration/test_export_merge_tree_part_to_object_storage/__init__.py

Whitespace-only changes.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<clickhouse>
2+
<named_collections>
3+
<s3_conn>
4+
<url>http://minio1:9001/root/data</url>
5+
<access_key_id>minio</access_key_id>
6+
<secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key>
7+
</s3_conn>
8+
</named_collections>
9+
</clickhouse>
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import logging
2+
import pytest
3+
import random
4+
import string
5+
import time
6+
from typing import Optional
7+
import uuid
8+
9+
from helpers.cluster import ClickHouseCluster
10+
from helpers.network import PartitionManager
11+
12+
@pytest.fixture(scope="module")
13+
def cluster():
14+
try:
15+
cluster = ClickHouseCluster(__file__)
16+
cluster.add_instance(
17+
"node1",
18+
main_configs=["configs/named_collections.xml"],
19+
with_minio=True,
20+
)
21+
logging.info("Starting cluster...")
22+
cluster.start()
23+
yield cluster
24+
finally:
25+
cluster.shutdown()
26+
27+
28+
def create_s3_table(node, s3_table):
29+
node.query(f"CREATE TABLE {s3_table} (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') PARTITION BY year")
30+
31+
32+
def create_tables_and_insert_data(node, mt_table, s3_table):
33+
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()")
34+
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)")
35+
36+
create_s3_table(node, s3_table)
37+
38+
39+
def test_drop_column_during_export_snapshot(cluster):
40+
node = cluster.instances["node1"]
41+
42+
mt_table = "mutations_snapshot_mt_table"
43+
s3_table = "mutations_snapshot_s3_table"
44+
45+
create_tables_and_insert_data(node, mt_table, s3_table)
46+
47+
# Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style
48+
minio_ip = cluster.minio_ip
49+
minio_port = cluster.minio_port
50+
51+
# Ensure export sees a consistent snapshot at start time even if we mutate the source later
52+
with PartitionManager() as pm:
53+
# Block responses from MinIO (source_port matches MinIO service)
54+
pm_rule_reject_responses = {
55+
"destination": node.ip_address,
56+
"source_port": minio_port,
57+
"action": "REJECT --reject-with tcp-reset",
58+
}
59+
pm._add_rule(pm_rule_reject_responses)
60+
61+
# Block requests to MinIO (destination: MinIO, destination_port: minio_port)
62+
pm_rule_reject_requests = {
63+
"destination": minio_ip,
64+
"destination_port": minio_port,
65+
"action": "REJECT --reject-with tcp-reset",
66+
}
67+
pm._add_rule(pm_rule_reject_requests)
68+
69+
# Start export of 2020
70+
node.query(
71+
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};"
72+
)
73+
74+
# Drop a column that is required for the export
75+
node.query(f"ALTER TABLE {mt_table} DROP COLUMN id")
76+
77+
time.sleep(3)
78+
# assert the mutation has been applied AND the data has not been exported yet
79+
assert "Unknown expression identifier `id`" in node.query_and_get_error(f"SELECT id FROM {mt_table}"), "Column id is not removed"
80+
81+
# Wait for export to finish and then verify destination still reflects the original snapshot (3 rows)
82+
time.sleep(5)
83+
assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation"
84+
85+
86+
def test_add_column_during_export(cluster):
87+
node = cluster.instances["node1"]
88+
89+
mt_table = "add_column_during_export_mt_table"
90+
s3_table = "add_column_during_export_s3_table"
91+
92+
create_tables_and_insert_data(node, mt_table, s3_table)
93+
94+
# Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style
95+
minio_ip = cluster.minio_ip
96+
minio_port = cluster.minio_port
97+
98+
# Ensure export sees a consistent snapshot at start time even if we mutate the source later
99+
with PartitionManager() as pm:
100+
# Block responses from MinIO (source_port matches MinIO service)
101+
pm_rule_reject_responses = {
102+
"destination": node.ip_address,
103+
"source_port": minio_port,
104+
"action": "REJECT --reject-with tcp-reset",
105+
}
106+
pm._add_rule(pm_rule_reject_responses)
107+
108+
# Block requests to MinIO (destination: MinIO, destination_port: minio_port)
109+
pm_rule_reject_requests = {
110+
"destination": minio_ip,
111+
"destination_port": minio_port,
112+
"action": "REJECT --reject-with tcp-reset",
113+
}
114+
pm._add_rule(pm_rule_reject_requests)
115+
116+
# Start export of 2020
117+
node.query(
118+
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};"
119+
)
120+
121+
node.query(f"ALTER TABLE {mt_table} ADD COLUMN id2 UInt64")
122+
123+
time.sleep(3)
124+
125+
# assert the mutation has been applied AND the data has not been exported yet
126+
assert node.query(f"SELECT count(id2) FROM {mt_table}") == '4\n', "Column id2 is not added"
127+
128+
# Wait for export to finish and then verify destination still reflects the original snapshot (3 rows)
129+
time.sleep(5)
130+
assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation"
131+
assert "Unknown expression identifier `id2`" in node.query_and_get_error(f"SELECT id2 FROM {s3_table}"), "Column id2 is present in the exported data"

0 commit comments

Comments
 (0)