Skip to content

Commit 61e43cf

Browse files
committed
do not lock parts, only hold references so they are not deleted from disk. first attempt
1 parent 543ff36 commit 61e43cf

File tree

4 files changed

+35
-49
lines changed

4 files changed

+35
-49
lines changed

src/Storages/MergeTree/MergeMutateSelectedEntry.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,4 @@ struct MergeMutateSelectedEntry
5151

5252
using MergeMutateSelectedEntryPtr = std::shared_ptr<MergeMutateSelectedEntry>;
5353

54-
struct CurrentlyExportingPartsTagger
55-
{
56-
DataPartPtr part_to_export;
57-
StorageMergeTree & storage;
58-
~CurrentlyExportingPartsTagger();
59-
CurrentlyExportingPartsTagger(DataPartPtr part_to_export_, StorageMergeTree & storage_);
60-
};
61-
62-
using CurrentlyExportingPartsTaggerPtr = std::shared_ptr<CurrentlyExportingPartsTagger>;
63-
6454
}

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ struct MergeTreeExportManifest
5252
String part_name;
5353
String remote_path; // empty until uploaded
5454
bool in_progress = false; /// this is just a hackish workaround for now
55+
DataPartPtr part; // hold reference to part so it does not get deleted from disk even if it is outdated. Should be null once we are done with it
5556
};
5657

5758

@@ -82,12 +83,15 @@ struct MergeTreeExportManifest
8283
manifest->file_path = std::filesystem::path(path_prefix) / ("export_partition_" + partition_id_ + "_transaction_" + transaction_id_ + ".json");
8384
manifest->items.reserve(data_parts.size());
8485
for (const auto & data_part : data_parts)
85-
manifest->items.push_back({data_part->name, ""});
86+
manifest->items.emplace_back(data_part->name, "", false, data_part);
8687
manifest->write();
8788
return manifest;
8889
}
8990

90-
static std::shared_ptr<MergeTreeExportManifest> read(const DiskPtr & disk_, const String & file_path_)
91+
/// will not fill parts ref, maybe I should.
92+
static std::shared_ptr<MergeTreeExportManifest> read(
93+
const DiskPtr & disk_,
94+
const String & file_path_)
9195
{
9296
auto manifest = std::make_shared<MergeTreeExportManifest>();
9397
manifest->disk = disk_;

src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,23 +101,18 @@ bool ExportPartPlainMergeTreeTask::executeStep()
101101
manifest->status = MergeTreeExportManifest::Status::failed;
102102
manifest->write();
103103

104-
/// this is a mess, what if several fail? I need to re-think the architecture
105-
/// I'll leave this commented out for now
106-
// storage.already_exported_partition_ids.erase(manifest->partition_id);
107-
108-
storage.currently_merging_mutating_parts.erase(part_to_export);
104+
/// doesn't sound ideal, but it is actually ok to allow this partition to be re-exported as soon as a single part fails
105+
/// this is because the ongoing export will never commit, so it won't cause duplicates
106+
storage.already_exported_partition_ids.erase(manifest->partition_id);
109107

110108
return false;
111109
}
112110
case State::SUCCESS:
113111
{
114-
storage.currently_merging_mutating_parts.erase(part_to_export);
115-
116112
return false;
117113
}
118114
}
119115

120-
storage.currently_merging_mutating_parts.erase(part_to_export);
121116
return false;
122117
}
123118

src/Storages/StorageMergeTree.cpp

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include <Common/ProfileEventsScope.h>
4848
#include <Common/escapeForFileName.h>
4949
#include "Core/BackgroundSchedulePool.h"
50+
#include "Storages/MergeTree/MergeTreeDataPartState.h"
5051
#include "Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h"
5152
#include <Core/Names.h>
5253
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@@ -549,20 +550,12 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
549550
}
550551

551552
{
552-
/// Do not put this in a scope because `CurrentlyExportingPartsTagger` instantiated above relies on this already being locked
553-
/// shitty design I came up with huh
554553
std::lock_guard lock_background_mutex(currently_processing_in_background_mutex);
555554

556555
if (!already_exported_partition_ids.emplace(partition_id).second)
557556
{
558557
throw Exception(ErrorCodes::PART_IS_LOCKED, "Partition {} has already been exported", partition_id);
559558
}
560-
561-
for (const auto & part : all_parts)
562-
{
563-
if (!currently_merging_mutating_parts.emplace(part).second)
564-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged part {}. This is a bug.", part->name);
565-
}
566559
}
567560

568561
const auto transaction_id = std::to_string(generateSnowflakeID());
@@ -658,26 +651,6 @@ CurrentlyMergingPartsTagger::~CurrentlyMergingPartsTagger()
658651
storage.currently_processing_in_background_condition.notify_all();
659652
}
660653

661-
CurrentlyExportingPartsTagger::CurrentlyExportingPartsTagger(DataPartPtr part_to_export_, StorageMergeTree & storage_)
662-
: part_to_export(std::move(part_to_export_)), storage(storage_)
663-
{
664-
/// assume it is already locked
665-
if (!storage.currently_merging_mutating_parts.emplace(part_to_export).second)
666-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged part {}. This is a bug.", part_to_export->name);
667-
}
668-
669-
CurrentlyExportingPartsTagger::~CurrentlyExportingPartsTagger()
670-
{
671-
std::lock_guard lock(storage.currently_processing_in_background_mutex);
672-
673-
if (!storage.currently_merging_mutating_parts.contains(part_to_export))
674-
std::terminate();
675-
storage.currently_merging_mutating_parts.erase(part_to_export);
676-
677-
storage.currently_processing_in_background_condition.notify_all();
678-
}
679-
680-
681654
Int64 StorageMergeTree::startMutation(const MutationCommands & commands, ContextPtr query_context)
682655
{
683656
/// Choose any disk, because when we load mutations we search them at each disk
@@ -1150,6 +1123,7 @@ void StorageMergeTree::loadMutations()
11501123

11511124
void StorageMergeTree::readExportPartitionManifests()
11521125
{
1126+
static const auto states = {MergeTreeDataPartState::Active, MergeTreeDataPartState::Deleting, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::DeleteOnDestroy};
11531127
for (const auto & disk : getDisks())
11541128
{
11551129
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
@@ -1177,6 +1151,28 @@ void StorageMergeTree::readExportPartitionManifests()
11771151
}
11781152
}
11791153

1154+
for (auto & item : manifest->items)
1155+
{
1156+
/// if this part has not been pushed yet
1157+
if (item.remote_path.empty())
1158+
{
1159+
item.part = getPartIfExists(item.part_name, states);
1160+
1161+
if (!item.part)
1162+
{
1163+
LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}",
1164+
item.part_name,
1165+
manifest->transaction_id,
1166+
getStorageID().getNameForLogs());
1167+
1168+
manifest->status = MergeTreeExportManifest::Status::failed;
1169+
manifest->write();
1170+
already_exported_partition_ids.erase(manifest->partition_id);
1171+
continue;
1172+
}
1173+
}
1174+
}
1175+
11801176
export_partition_transaction_id_to_manifest.emplace(manifest->transaction_id, manifest);
11811177

11821178
LOG_DEBUG(log, "Loaded export transaction manifest: {} (transaction_id: {})", name, manifest->transaction_id);
@@ -1670,12 +1666,13 @@ bool StorageMergeTree::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
16701666
continue;
16711667
}
16721668

1669+
static const auto states = {MergeTreeDataPartState::Active, MergeTreeDataPartState::Deleting, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::DeleteOnDestroy};
16731670
for (auto & item : manifest->items)
16741671
{
16751672
if (item.in_progress)
16761673
continue;
16771674

1678-
auto part = getPartIfExists(item.part_name, {MergeTreeDataPartState::Active});
1675+
auto part = getPartIfExists(item.part_name, states);
16791676
if (!part)
16801677
{
16811678
LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}",

0 commit comments

Comments
 (0)