@@ -559,7 +559,14 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
559559
560560 const auto transaction_id = std::to_string (generateSnowflakeID ());
561561
562- // / TODO missing parts lock here with tagger
562+ std::vector<std::shared_ptr<CurrentlyExportingPartsTagger>> taggers;
563+ taggers.reserve (all_parts.size ());
564+
565+ for (const auto & part : all_parts)
566+ {
567+ taggers.push_back (std::make_shared<CurrentlyExportingPartsTagger>(part, *this ));
568+ }
569+
563570 const auto manifest = MergeTreeExportManifest::create (
564571 getStoragePolicy ()->getAnyDisk (),
565572 relative_data_path,
@@ -574,9 +581,8 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
574581 export_partition_transaction_id_to_manifest[transaction_id] = manifest;
575582 }
576583
577- for (const auto & part : all_parts )
584+ for (const auto & tagger : taggers )
578585 {
579- auto tagger = std::make_shared<CurrentlyExportingPartsTagger>(std::vector<DataPartPtr>{part}, *this );
580586 auto task = std::make_shared<ExportPartPlainMergeTreeTask>(*this , tagger, dest_storage, getContext (), manifest, moves_assignee_trigger);
581587 background_moves_assignee.scheduleMoveTask (task);
582588 }
@@ -656,25 +662,21 @@ CurrentlyMergingPartsTagger::~CurrentlyMergingPartsTagger()
656662 storage.currently_processing_in_background_condition .notify_all ();
657663}
658664
659- CurrentlyExportingPartsTagger::CurrentlyExportingPartsTagger (std::vector< DataPartPtr> && parts_to_export_ , StorageMergeTree & storage_)
660- : parts_to_export (std::move(parts_to_export_ )), storage(storage_)
665+ CurrentlyExportingPartsTagger::CurrentlyExportingPartsTagger (DataPartPtr part_to_export_ , StorageMergeTree & storage_)
666+ : part_to_export (std::move(part_to_export_ )), storage(storage_)
661667{
662668 // / assume it is already locked
663- for (const auto & part : parts_to_export)
664- if (!storage.currently_merging_mutating_parts .emplace (part).second )
665- throw Exception (ErrorCodes::LOGICAL_ERROR, " Tagging already tagged part {}. This is a bug." , part->name );
669+ if (!storage.currently_merging_mutating_parts .emplace (part_to_export).second )
670+ throw Exception (ErrorCodes::LOGICAL_ERROR, " Tagging already tagged part {}. This is a bug." , part_to_export->name );
666671}
667672
668673CurrentlyExportingPartsTagger::~CurrentlyExportingPartsTagger ()
669674{
670675 std::lock_guard lock (storage.currently_processing_in_background_mutex );
671676
672- for (const auto & part : parts_to_export)
673- {
674- if (!storage.currently_merging_mutating_parts .contains (part))
675- std::terminate ();
676- storage.currently_merging_mutating_parts .erase (part);
677- }
677+ if (!storage.currently_merging_mutating_parts .contains (part_to_export))
678+ std::terminate ();
679+ storage.currently_merging_mutating_parts .erase (part_to_export);
678680
679681 storage.currently_processing_in_background_condition .notify_all ();
680682}
@@ -1230,7 +1232,7 @@ void StorageMergeTree::resumeExportPartitionTasks()
12301232
12311233 for (const auto & part : parts_to_export)
12321234 {
1233- auto tagger = std::make_shared<CurrentlyExportingPartsTagger>(std::vector<DataPartPtr>{ part} , *this );
1235+ auto tagger = std::make_shared<CurrentlyExportingPartsTagger>(part, *this );
12341236 auto task = std::make_shared<ExportPartPlainMergeTreeTask>(*this , tagger, destination_storage, getContext (), manifest, moves_assignee_trigger);
12351237 background_moves_assignee.scheduleMoveTask (task);
12361238 }
0 commit comments