Skip to content

Commit 543ff36

Browse files
committed
use the background scheduler instead of scheduling upon every request. very hackish for now, need to improve part selection and blocking
1 parent 46b1724 commit 543ff36

File tree

7 files changed

+119
-64
lines changed

7 files changed

+119
-64
lines changed

src/Storages/MergeTree/MergeMutateSelectedEntry.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ struct CurrentlyExportingPartsTagger
5555
{
5656
DataPartPtr part_to_export;
5757
StorageMergeTree & storage;
58-
CurrentlyExportingPartsTagger(DataPartPtr part_to_export_, StorageMergeTree & storage_);
5958
~CurrentlyExportingPartsTagger();
59+
CurrentlyExportingPartsTagger(DataPartPtr part_to_export_, StorageMergeTree & storage_);
6060
};
6161

6262
using CurrentlyExportingPartsTaggerPtr = std::shared_ptr<CurrentlyExportingPartsTagger>;

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
11531153
/// Schedules background job to like merge/mutate/fetch an executor
11541154
virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) = 0;
11551155
/// Schedules job to move parts between disks/volumes and so on.
1156-
bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee);
1156+
virtual bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee);
11571157
bool areBackgroundMovesNeeded() const;
11581158

11591159

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct MergeTreeExportManifest
5151
{
5252
String part_name;
5353
String remote_path; // empty until uploaded
54+
bool in_progress = false; /// this is just a hackish workaround for now
5455
};
5556

5657

@@ -174,6 +175,15 @@ struct MergeTreeExportManifest
174175
write();
175176
}
176177

178+
void setInProgress(const String & part_name)
179+
{
180+
for (auto & i : items)
181+
{
182+
if (i.part_name == part_name)
183+
i.in_progress = true;
184+
}
185+
}
186+
177187
std::vector<String> pendingParts() const
178188
{
179189
std::vector<String> res;

src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.cpp

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ namespace DB
1111

1212
ExportPartPlainMergeTreeTask::ExportPartPlainMergeTreeTask(
1313
StorageMergeTree & storage_,
14-
const std::shared_ptr<CurrentlyExportingPartsTagger> & exports_tagger_,
14+
const DataPartPtr & part_to_export_,
1515
const StoragePtr & destination_storage_,
1616
ContextPtr context_,
1717
std::shared_ptr<MergeTreeExportManifest> manifest_,
1818
IExecutableTask::TaskResultCallback & task_result_callback_,
1919
size_t max_retries_)
2020
: storage(storage_)
21-
, exports_tagger(exports_tagger_)
21+
, part_to_export(part_to_export_)
2222
, destination_storage(destination_storage_)
2323
, context(std::move(context_))
2424
, manifest(std::move(manifest_))
@@ -63,7 +63,7 @@ bool ExportPartPlainMergeTreeTask::executeStep()
6363
retry_count++;
6464
LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"),
6565
"Retrying export attempt {} for part {}",
66-
retry_count, exports_tagger->part_to_export->name);
66+
retry_count, part_to_export->name);
6767
state = State::NEED_EXECUTE;
6868
}
6969
else
@@ -84,7 +84,7 @@ bool ExportPartPlainMergeTreeTask::executeStep()
8484
retry_count++;
8585
LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"),
8686
"Retrying export attempt {} for part {}",
87-
retry_count, exports_tagger->part_to_export->name);
87+
retry_count, part_to_export->name);
8888
state = State::NEED_COMMIT;
8989
}
9090
else
@@ -101,16 +101,23 @@ bool ExportPartPlainMergeTreeTask::executeStep()
101101
manifest->status = MergeTreeExportManifest::Status::failed;
102102
manifest->write();
103103

104-
storage.already_exported_partition_ids.erase(manifest->partition_id);
104+
/// this is a mess, what if several fail? I need to re-think the architecture
105+
/// I'll leave this commented out for now
106+
// storage.already_exported_partition_ids.erase(manifest->partition_id);
107+
108+
storage.currently_merging_mutating_parts.erase(part_to_export);
105109

106110
return false;
107111
}
108112
case State::SUCCESS:
109113
{
114+
storage.currently_merging_mutating_parts.erase(part_to_export);
115+
110116
return false;
111117
}
112118
}
113119

120+
storage.currently_merging_mutating_parts.erase(part_to_export);
114121
return false;
115122
}
116123

@@ -157,15 +164,15 @@ bool ExportPartPlainMergeTreeTask::executeExport()
157164
{
158165
destination_storage->importMergeTreePart(
159166
storage,
160-
exports_tagger->part_to_export,
167+
part_to_export,
161168
context,
162169
part_log_wrapper);
163170

164171
return true;
165172
}
166173
catch (...)
167174
{
168-
LOG_ERROR(getLogger("ExportPartPlainMergeTreeTask"), "Failed to export part {}", exports_tagger->part_to_export->name);
175+
LOG_ERROR(getLogger("ExportPartPlainMergeTreeTask"), "Failed to export part {}", part_to_export->name);
169176

170177
return false;
171178
}

src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class ExportPartPlainMergeTreeTask : public IExecutableTask
1818
public:
1919
ExportPartPlainMergeTreeTask(
2020
StorageMergeTree & storage_,
21-
const std::shared_ptr<CurrentlyExportingPartsTagger> & exports_tagger_,
21+
const DataPartPtr & part_to_export_,
2222
const StoragePtr & destination_storage_,
2323
ContextPtr context_,
2424
std::shared_ptr<MergeTreeExportManifest> manifest_,
@@ -50,7 +50,7 @@ class ExportPartPlainMergeTreeTask : public IExecutableTask
5050
State state{State::NEED_PREPARE};
5151

5252
StorageMergeTree & storage;
53-
std::shared_ptr<CurrentlyExportingPartsTagger> exports_tagger;
53+
DataPartPtr part_to_export;
5454
StoragePtr destination_storage;
5555
ContextPtr context;
5656
std::shared_ptr<MergeTreeExportManifest> manifest;

src/Storages/StorageMergeTree.cpp

Lines changed: 89 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -548,8 +548,6 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
548548
return;
549549
}
550550

551-
std::vector<std::shared_ptr<CurrentlyExportingPartsTagger>> taggers;
552-
553551
{
554552
/// Do not put this in a scope because `CurrentlyExportingPartsTagger` instantiated above relies on this already being locked
555553
/// shitty design I came up with huh
@@ -559,12 +557,11 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
559557
{
560558
throw Exception(ErrorCodes::PART_IS_LOCKED, "Partition {} has already been exported", partition_id);
561559
}
562-
563-
taggers.reserve(all_parts.size());
564560

565561
for (const auto & part : all_parts)
566562
{
567-
taggers.push_back(std::make_shared<CurrentlyExportingPartsTagger>(part, *this));
563+
if (!currently_merging_mutating_parts.emplace(part).second)
564+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged part {}. This is a bug.", part->name);
568565
}
569566
}
570567

@@ -584,14 +581,7 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
584581
export_partition_transaction_id_to_manifest[transaction_id] = manifest;
585582
}
586583

587-
for (const auto & tagger : taggers)
588-
{
589-
auto task = std::make_shared<ExportPartPlainMergeTreeTask>(*this, tagger, dest_storage, getContext(), manifest, moves_assignee_trigger);
590-
if (!background_moves_assignee.scheduleMoveTask(task))
591-
{
592-
LOG_ERROR(log, "Failed to schedule export task for part {}", tagger->part_to_export->name);
593-
}
594-
}
584+
background_moves_assignee.trigger();
595585
}
596586

597587
/// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem.
@@ -1198,58 +1188,53 @@ void StorageMergeTree::readExportPartitionManifests()
11981188
}
11991189
}
12001190
}
1191+
1192+
background_moves_assignee.trigger();
12011193
}
12021194

12031195
void StorageMergeTree::resumeExportPartitionTasks()
12041196
{
12051197
/// Initially I opted for having two separate methods: read and resume because I wanted to schedule the tasks in order
12061198
/// but it turns out the background executor schedules tasks based on their priority, so it is likely this is not needed anymore.
1207-
for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest)
1208-
{
1209-
if (manifest->status != MergeTreeExportManifest::Status::pending)
1210-
continue;
1199+
// for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest)
1200+
// {
1201+
// if (manifest->status != MergeTreeExportManifest::Status::pending)
1202+
// continue;
12111203

1212-
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest->destination_storage_id, getContext());
1213-
if (!destination_storage)
1214-
{
1215-
LOG_ERROR(log, "Failed to reconstruct destination storage: {}", manifest->destination_storage_id.getNameForLogs());
1216-
continue;
1217-
}
1204+
// auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest->destination_storage_id, getContext());
1205+
// if (!destination_storage)
1206+
// {
1207+
// LOG_ERROR(log, "Failed to reconstruct destination storage: {}", manifest->destination_storage_id.getNameForLogs());
1208+
// continue;
1209+
// }
12181210

1219-
auto pending_part_names = manifest->pendingParts();
1211+
// auto pending_part_names = manifest->pendingParts();
12201212

1221-
/// apparently, it is possible that pending parts are empty
1222-
/// if it is empty, I have to somehow commit and mark as completed..
1213+
// /// apparently, it is possible that pending parts are empty
1214+
// /// if it is empty, I have to somehow commit and mark as completed..
12231215

1224-
std::vector<DataPartPtr> parts_to_export;
1216+
// std::vector<DataPartPtr> parts_to_export;
12251217

1226-
for (const auto & part_name : pending_part_names)
1227-
{
1228-
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active});
1218+
// for (const auto & part_name : pending_part_names)
1219+
// {
1220+
// auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active});
12291221

1230-
if (!part)
1231-
{
1232-
LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}",
1233-
part_name,
1234-
manifest->transaction_id,
1235-
getStorageID().getNameForLogs());
1236-
manifest->status = MergeTreeExportManifest::Status::failed;
1237-
manifest->write();
1238-
1239-
already_exported_partition_ids.erase(manifest->partition_id);
1240-
continue;
1241-
}
1222+
// if (!part)
1223+
// {
1224+
// LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}",
1225+
// part_name,
1226+
// manifest->transaction_id,
1227+
// getStorageID().getNameForLogs());
1228+
// manifest->status = MergeTreeExportManifest::Status::failed;
1229+
// manifest->write();
12421230

1243-
parts_to_export.emplace_back(part);
1244-
}
1231+
// already_exported_partition_ids.erase(manifest->partition_id);
1232+
// continue;
1233+
// }
12451234

1246-
for (const auto & part : parts_to_export)
1247-
{
1248-
auto tagger = std::make_shared<CurrentlyExportingPartsTagger>(part, *this);
1249-
auto task = std::make_shared<ExportPartPlainMergeTreeTask>(*this, tagger, destination_storage, getContext(), manifest, moves_assignee_trigger);
1250-
background_moves_assignee.scheduleMoveTask(task);
1251-
}
1252-
}
1235+
// parts_to_export.emplace_back(part);
1236+
// }
1237+
// }
12531238
}
12541239

12551240
void StorageMergeTree::loadExportPartition()
@@ -1662,6 +1647,57 @@ UInt32 StorageMergeTree::getMaxLevelInBetween(const PartProperties & left, const
16621647
return level;
16631648
}
16641649

1650+
1651+
bool StorageMergeTree::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
1652+
{
1653+
if (MergeTreeData::scheduleDataMovingJob(assignee))
1654+
{
1655+
return true;
1656+
}
1657+
1658+
/// Try to schedule one export part task if any pending export exists
1659+
{
1660+
std::lock_guard lock(export_partition_transaction_id_to_manifest_mutex);
1661+
for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest)
1662+
{
1663+
if (manifest->status != MergeTreeExportManifest::Status::pending)
1664+
continue;
1665+
1666+
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest->destination_storage_id, getContext());
1667+
if (!destination_storage)
1668+
{
1669+
LOG_ERROR(log, "Failed to reconstruct destination storage: {}", manifest->destination_storage_id.getNameForLogs());
1670+
continue;
1671+
}
1672+
1673+
for (auto & item : manifest->items)
1674+
{
1675+
if (item.in_progress)
1676+
continue;
1677+
1678+
auto part = getPartIfExists(item.part_name, {MergeTreeDataPartState::Active});
1679+
if (!part)
1680+
{
1681+
LOG_ERROR(log, "Part {} is present in the manifest file {}, but not found in the storage {}",
1682+
item.part_name,
1683+
manifest->transaction_id,
1684+
getStorageID().getNameForLogs());
1685+
manifest->status = MergeTreeExportManifest::Status::failed;
1686+
manifest->write();
1687+
already_exported_partition_ids.erase(manifest->partition_id);
1688+
continue;
1689+
}
1690+
1691+
auto task = std::make_shared<ExportPartPlainMergeTreeTask>(*this, part, destination_storage, getContext(), manifest, moves_assignee_trigger);
1692+
item.in_progress = background_moves_assignee.scheduleMoveTask(task);
1693+
return true;
1694+
1695+
}
1696+
}
1697+
}
1698+
return false;
1699+
}
1700+
16651701
bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
16661702
{
16671703
if (shutdown_called)
@@ -2949,8 +2985,8 @@ MutationCounters StorageMergeTree::getMutationCounters() const
29492985

29502986
void StorageMergeTree::startBackgroundMovesIfNeeded()
29512987
{
2952-
if (areBackgroundMovesNeeded())
2953-
background_moves_assignee.start();
2988+
// if (areBackgroundMovesNeeded())
2989+
background_moves_assignee.start();
29542990
}
29552991

29562992
std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const

src/Storages/StorageMergeTree.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ class StorageMergeTree final : public MergeTreeData
119119

120120
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
121121

122+
bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee) override;
123+
122124
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
123125

124126
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }

0 commit comments

Comments
 (0)