Skip to content

Commit 7f5031a

Browse files
committed
fix
1 parent d09fb0b commit 7f5031a

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo
4747

4848
bool ExportPartTask::executeStep()
4949
{
50-
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
50+
const auto & metadata_snapshot = manifest.storage_snapshot->metadata;
51+
5152
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
52-
StorageSnapshotPtr storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, local_context);
5353

5454
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
5555

@@ -142,13 +142,8 @@ bool ExportPartTask::executeStep()
142142
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
143143
bool prefetch = false;
144144

145-
MergeTreeData::IMutationsSnapshot::Params params
146-
{
147-
.metadata_version = metadata_snapshot->getMetadataVersion(),
148-
.min_part_metadata_version = manifest.data_part->getMetadataVersion(),
149-
};
150-
151-
auto mutations_snapshot = storage.getMutationsSnapshot(params);
145+
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*manifest.storage_snapshot->data);
146+
auto mutations_snapshot = snapshot_data.mutations_snapshot;
152147

153148
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
154149
manifest.data_part,
@@ -161,7 +156,7 @@ bool ExportPartTask::executeStep()
161156
read_type,
162157
plan_for_part,
163158
storage,
164-
storage_snapshot,
159+
manifest.storage_snapshot,
165160
RangesInDataPart(manifest.data_part),
166161
alter_conversions,
167162
nullptr,

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6239,13 +6239,13 @@ void MergeTreeData::exportPartToTable(
62396239
return ast ? ast->formatWithSecretsOneLine() : "";
62406240
};
62416241

6242-
auto src_snapshot = getInMemoryMetadataPtr();
6243-
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();
6242+
auto source_metadata_ptr = getInMemoryMetadataPtr();
6243+
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();
62446244

6245-
if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
6245+
if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
62466246
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
62476247

6248-
if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST()))
6248+
if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))
62496249
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");
62506250

62516251
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
@@ -6262,6 +6262,7 @@ void MergeTreeData::exportPartToTable(
62626262
transaction_id,
62636263
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
62646264
format_settings,
6265+
getStorageSnapshot(source_metadata_ptr, query_context),
62656266
completion_callback);
62666267

62676268
std::lock_guard lock(export_manifests_mutex);

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <Interpreters/StorageID.h>
44
#include <Storages/MergeTree/IMergeTreeDataPart.h>
5+
#include <Storages/StorageSnapshot.h>
56
#include <QueryPipeline/QueryPipeline.h>
67
#include <optional>
78

@@ -46,12 +47,14 @@ struct MergeTreePartExportManifest
4647
const String & transaction_id_,
4748
FileAlreadyExistsPolicy file_already_exists_policy_,
4849
const FormatSettings & format_settings_,
50+
const StorageSnapshotPtr & storage_snapshot_,
4951
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
5052
: destination_storage_id(destination_storage_id_),
5153
data_part(data_part_),
5254
transaction_id(transaction_id_),
5355
file_already_exists_policy(file_already_exists_policy_),
5456
format_settings(format_settings_),
57+
storage_snapshot(storage_snapshot_),
5558
completion_callback(completion_callback_),
5659
create_time(time(nullptr)) {}
5760

@@ -62,6 +65,10 @@ struct MergeTreePartExportManifest
6265
FileAlreadyExistsPolicy file_already_exists_policy;
6366
FormatSettings format_settings;
6467

68+
/// Storage snapshot captured at the time of query validation to prevent race conditions with mutations
69+
/// Otherwise the export could fail if the schema changes between validation and execution
70+
StorageSnapshotPtr storage_snapshot;
71+
6572
std::function<void(CompletionCallbackResult)> completion_callback;
6673

6774
time_t create_time;

0 commit comments

Comments
 (0)