@@ -188,6 +188,7 @@ namespace Setting
188188 extern const SettingsInt64 replication_wait_for_inactive_replica_timeout;
189189 extern const SettingsUInt64 select_sequential_consistency;
190190 extern const SettingsBool allow_experimental_export_merge_tree_part;
191+ extern const SettingsBool export_merge_tree_partition_force_export;
191192}
192193
193194namespace MergeTreeSetting
@@ -4374,9 +4375,6 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionUpdatingTask()
43744375 /// If we have the cleanup lock, also remove stale entries from zk and local
43754376 for (const auto & key : zk_children)
43764377 {
4377- if (!cleanup_lock_acquired && export_merge_tree_partition_task_entries.contains(key))
4378- continue;
4379-
43804378 const std::string entry_path = fs::path(exports_path) / key;
43814379
43824380 std::string metadata_json;
@@ -4385,6 +4383,19 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionUpdatingTask()
43854383 LOG_INFO(log, "Skipping {}: missing metadata.json", key);
43864384 continue;
43874385 }
4386+
4387+ const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json);
4388+
4389+ const auto local_entry = export_merge_tree_partition_task_entries.find(key);
4390+
4391+ /// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough
4392+ /// we need to make sure it is the same transaction id. If it is not, it needs to be replaced.
4393+ bool has_local_entry_and_is_up_to_date = local_entry != export_merge_tree_partition_task_entries.end()
4394+ && local_entry->second.manifest.transaction_id == metadata.transaction_id;
4395+
4396+ /// If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done.
4397+ if (!cleanup_lock_acquired && has_local_entry_and_is_up_to_date)
4398+ continue;
43884399
43894400 std::string status;
43904401 if (!zk->tryGet(fs::path(entry_path) / "status", status))
@@ -4395,8 +4406,6 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionUpdatingTask()
43954406
43964407 bool is_not_pending = status != "PENDING";
43974408
4398- const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json);
4399-
44004409 if (cleanup_lock_acquired)
44014410 {
44024411 bool has_expired = metadata.create_time < now - 90;
@@ -4416,7 +4425,7 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionUpdatingTask()
44164425 continue;
44174426 }
44184427
4419- if (export_merge_tree_partition_task_entries.contains(key) )
4428+ if (has_local_entry_and_is_up_to_date )
44204429 {
44214430 LOG_INFO(log, "Skipping {}: already exists", key);
44224431 continue;
@@ -4456,9 +4465,8 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionUpdatingTask()
44564465 }
44574466 }
44584467
4459- export_merge_tree_partition_task_entries.emplace(
4460- key,
4461- ExportReplicatedMergeTreePartitionTaskEntry {metadata, parts_to_do, std::move(part_references)});
4468+ /// It is important to use the operator[] because it updates the existing entry if it already exists.
4469+ export_merge_tree_partition_task_entries[key] = ExportReplicatedMergeTreePartitionTaskEntry {metadata, parts_to_do, std::move(part_references)};
44624470 }
44634471
44644472 /// Remove entries that were deleted by someone else
@@ -8694,20 +8702,20 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
86948702 start_time: 123445
86958703 */
86968704
8697- /// maybe this should go in initialization somewhere else
8698- if (!zookeeper->exists(exports_path))
8699- {
8700- ops.emplace_back(zkutil::makeCreateRequest(exports_path, "", zkutil::CreateMode::Persistent));
8701- }
8702-
87038705 const auto export_key = partition_id + "_" + dest_storage_id.getNameForLogs();
87048706
87058707 const auto partition_exports_path = fs::path(exports_path) / export_key;
87068708
87078709 /// check if entry already exists
87088710 if (zookeeper->exists(partition_exports_path))
87098711 {
8710- throw Exception(ErrorCodes::BAD_ARGUMENTS, "Partition {} already exported or it is being exported", partition_id);
8712+ if (!query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export])
8713+ {
8714+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Partition {} already exported or it is being exported", partition_id);
8715+ }
8716+
8717+ /// The check for existence and entry removal are not atomic, so this actually might fail.
8718+ ops.emplace_back(zkutil::makeRemoveRecursiveRequest(partition_exports_path, -1));
87118719 }
87128720
87138721 ops.emplace_back(zkutil::makeCreateRequest(partition_exports_path, "", zkutil::CreateMode::Persistent));
0 commit comments