Skip to content

Commit 8bc5cdb

Browse files
authored
Merge pull request #1150 from Altinity/fix_crash_on_mutation_x_export
Fix crash due to incompatible headers when mutation changed the schema between scheduling and executing part exports
2 parents 35b06fe + 7312bb5 commit 8bc5cdb

File tree

6 files changed

+157
-14
lines changed

6 files changed

+157
-14
lines changed

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo
4747

4848
bool ExportPartTask::executeStep()
4949
{
50-
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
50+
const auto & metadata_snapshot = manifest.storage_snapshot->metadata;
51+
5152
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
52-
StorageSnapshotPtr storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, local_context);
5353

5454
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
5555

@@ -142,13 +142,8 @@ bool ExportPartTask::executeStep()
142142
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
143143
bool prefetch = false;
144144

145-
MergeTreeData::IMutationsSnapshot::Params params
146-
{
147-
.metadata_version = metadata_snapshot->getMetadataVersion(),
148-
.min_part_metadata_version = manifest.data_part->getMetadataVersion(),
149-
};
150-
151-
auto mutations_snapshot = storage.getMutationsSnapshot(params);
145+
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*manifest.storage_snapshot->data);
146+
auto mutations_snapshot = snapshot_data.mutations_snapshot;
152147

153148
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
154149
manifest.data_part,
@@ -161,7 +156,7 @@ bool ExportPartTask::executeStep()
161156
read_type,
162157
plan_for_part,
163158
storage,
164-
storage_snapshot,
159+
manifest.storage_snapshot,
165160
RangesInDataPart(manifest.data_part),
166161
alter_conversions,
167162
nullptr,

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6239,13 +6239,13 @@ void MergeTreeData::exportPartToTable(
62396239
return ast ? ast->formatWithSecretsOneLine() : "";
62406240
};
62416241

6242-
auto src_snapshot = getInMemoryMetadataPtr();
6243-
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();
6242+
auto source_metadata_ptr = getInMemoryMetadataPtr();
6243+
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();
62446244

6245-
if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
6245+
if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
62466246
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
62476247

6248-
if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST()))
6248+
if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))
62496249
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");
62506250

62516251
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
@@ -6262,6 +6262,7 @@ void MergeTreeData::exportPartToTable(
62626262
transaction_id,
62636263
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
62646264
format_settings,
6265+
getStorageSnapshot(source_metadata_ptr, query_context),
62656266
completion_callback);
62666267

62676268
std::lock_guard lock(export_manifests_mutex);

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <Interpreters/StorageID.h>
44
#include <Storages/MergeTree/IMergeTreeDataPart.h>
5+
#include <Storages/StorageSnapshot.h>
56
#include <QueryPipeline/QueryPipeline.h>
67
#include <optional>
78

@@ -46,12 +47,14 @@ struct MergeTreePartExportManifest
4647
const String & transaction_id_,
4748
FileAlreadyExistsPolicy file_already_exists_policy_,
4849
const FormatSettings & format_settings_,
50+
const StorageSnapshotPtr & storage_snapshot_,
4951
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
5052
: destination_storage_id(destination_storage_id_),
5153
data_part(data_part_),
5254
transaction_id(transaction_id_),
5355
file_already_exists_policy(file_already_exists_policy_),
5456
format_settings(format_settings_),
57+
storage_snapshot(storage_snapshot_),
5558
completion_callback(completion_callback_),
5659
create_time(time(nullptr)) {}
5760

@@ -62,6 +65,10 @@ struct MergeTreePartExportManifest
6265
FileAlreadyExistsPolicy file_already_exists_policy;
6366
FormatSettings format_settings;
6467

68+
/// Storage snapshot captured at the time of query validation to prevent race conditions with mutations
69+
/// Otherwise the export could fail if the schema changes between validation and execution
70+
StorageSnapshotPtr storage_snapshot;
71+
6572
std::function<void(CompletionCallbackResult)> completion_callback;
6673

6774
time_t create_time;

tests/integration/test_export_merge_tree_part_to_object_storage/__init__.py

Whitespace-only changes.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<clickhouse>
2+
<named_collections>
3+
<s3_conn>
4+
<url>http://minio1:9001/root/data</url>
5+
<access_key_id>minio</access_key_id>
6+
<secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key>
7+
</s3_conn>
8+
</named_collections>
9+
</clickhouse>
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import logging
2+
import pytest
3+
import random
4+
import string
5+
import time
6+
from typing import Optional
7+
import uuid
8+
9+
from helpers.cluster import ClickHouseCluster
10+
from helpers.network import PartitionManager
11+
12+
@pytest.fixture(scope="module")
13+
def cluster():
14+
try:
15+
cluster = ClickHouseCluster(__file__)
16+
cluster.add_instance(
17+
"node1",
18+
main_configs=["configs/named_collections.xml"],
19+
with_minio=True,
20+
)
21+
logging.info("Starting cluster...")
22+
cluster.start()
23+
yield cluster
24+
finally:
25+
cluster.shutdown()
26+
27+
28+
def create_s3_table(node, s3_table):
29+
node.query(f"CREATE TABLE {s3_table} (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') PARTITION BY year")
30+
31+
32+
def create_tables_and_insert_data(node, mt_table, s3_table):
33+
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()")
34+
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)")
35+
36+
create_s3_table(node, s3_table)
37+
38+
39+
def test_drop_column_during_export_snapshot(cluster):
40+
node = cluster.instances["node1"]
41+
42+
mt_table = "mutations_snapshot_mt_table"
43+
s3_table = "mutations_snapshot_s3_table"
44+
45+
create_tables_and_insert_data(node, mt_table, s3_table)
46+
47+
# Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style
48+
minio_ip = cluster.minio_ip
49+
minio_port = cluster.minio_port
50+
51+
# Ensure export sees a consistent snapshot at start time even if we mutate the source later
52+
with PartitionManager() as pm:
53+
# Block responses from MinIO (source_port matches MinIO service)
54+
pm_rule_reject_responses = {
55+
"destination": node.ip_address,
56+
"source_port": minio_port,
57+
"action": "REJECT --reject-with tcp-reset",
58+
}
59+
pm._add_rule(pm_rule_reject_responses)
60+
61+
# Block requests to MinIO (destination: MinIO, destination_port: minio_port)
62+
pm_rule_reject_requests = {
63+
"destination": minio_ip,
64+
"destination_port": minio_port,
65+
"action": "REJECT --reject-with tcp-reset",
66+
}
67+
pm._add_rule(pm_rule_reject_requests)
68+
69+
# Start export of 2020
70+
node.query(
71+
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};"
72+
)
73+
74+
# Drop a column that is required for the export
75+
node.query(f"ALTER TABLE {mt_table} DROP COLUMN id")
76+
77+
time.sleep(3)
78+
# assert the mutation has been applied AND the data has not been exported yet
79+
assert "Unknown expression identifier `id`" in node.query_and_get_error(f"SELECT id FROM {mt_table}"), "Column id is not removed"
80+
81+
# Wait for export to finish and then verify destination still reflects the original snapshot (3 rows)
82+
time.sleep(5)
83+
assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation"
84+
85+
86+
def test_add_column_during_export(cluster):
87+
node = cluster.instances["node1"]
88+
89+
mt_table = "add_column_during_export_mt_table"
90+
s3_table = "add_column_during_export_s3_table"
91+
92+
create_tables_and_insert_data(node, mt_table, s3_table)
93+
94+
# Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style
95+
minio_ip = cluster.minio_ip
96+
minio_port = cluster.minio_port
97+
98+
# Ensure export sees a consistent snapshot at start time even if we mutate the source later
99+
with PartitionManager() as pm:
100+
# Block responses from MinIO (source_port matches MinIO service)
101+
pm_rule_reject_responses = {
102+
"destination": node.ip_address,
103+
"source_port": minio_port,
104+
"action": "REJECT --reject-with tcp-reset",
105+
}
106+
pm._add_rule(pm_rule_reject_responses)
107+
108+
# Block requests to MinIO (destination: MinIO, destination_port: minio_port)
109+
pm_rule_reject_requests = {
110+
"destination": minio_ip,
111+
"destination_port": minio_port,
112+
"action": "REJECT --reject-with tcp-reset",
113+
}
114+
pm._add_rule(pm_rule_reject_requests)
115+
116+
# Start export of 2020
117+
node.query(
118+
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};"
119+
)
120+
121+
node.query(f"ALTER TABLE {mt_table} ADD COLUMN id2 UInt64")
122+
123+
time.sleep(3)
124+
125+
# assert the mutation has been applied AND the data has not been exported yet
126+
assert node.query(f"SELECT count(id2) FROM {mt_table}") == '4\n', "Column id2 is not added"
127+
128+
# Wait for export to finish and then verify destination still reflects the original snapshot (3 rows)
129+
time.sleep(5)
130+
assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation"
131+
assert "Unknown expression identifier `id2`" in node.query_and_get_error(f"SELECT id2 FROM {s3_table}"), "Column id2 is present in the exported data"

0 commit comments

Comments
 (0)