Skip to content

Commit 8a51270

Browse files
committed
implement single part task
1 parent 8f171b8 commit 8a51270

11 files changed

+435
-27
lines changed

src/Core/Settings.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6877,8 +6877,8 @@ Experimental timeSeries* aggregate functions for Prometheus-like timeseries resa
68776877
DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_partition, false, R"(
68786878
Experimental export merge tree partition.
68796879
)", EXPERIMENTAL, allow_experimental_export_merge_tree_partition) \
6880-
DECLARE_WITH_ALIAS(Bool, export_merge_tree_partition_background_execution, true, R"(
6881-
Process exports asynchronously in background threads
6880+
DECLARE_WITH_ALIAS(Bool, export_merge_tree_partition_individual_part_executor, true, R"(
6881+
Use the part task instead of the partition task
68826882
)", EXPERIMENTAL, export_merge_tree_partition_background_execution) \
68836883
\
68846884
/* ####################################################### */ \

src/Storages/IStorage.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,13 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
462462
ContextPtr /*context*/,
463463
std::function<void(MergeTreePartImportStats)>) {}
464464

465+
virtual void importMergeTreePart(
466+
const MergeTreeData &,
467+
const DataPartPtr &,
468+
ContextPtr,
469+
std::function<void(MergeTreePartImportStats)>
470+
) {}
471+
465472
/** Writes the data to a table in distributed manner.
466473
* It is supposed that implementation looks into SELECT part of the query and executes distributed
467474
* INSERT SELECT if it is possible with current storage as a receiver and query SELECT part as a producer.

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
12331233
friend class IPartMetadataManager;
12341234
friend class IMergedBlockOutputStream; // for access to log
12351235
friend class ExportPartitionPlainMergeTreeTask;
1236+
friend class ExportPartPlainMergeTreeTask;
12361237

12371238
bool require_part_metadata;
12381239

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ struct MergeTreeExportManifest
138138
out->sync();
139139
}
140140

141+
void deleteFile()
142+
{
143+
disk->removeFile(file_path);
144+
}
145+
141146
void updateRemotePathAndWrite(const String & part_name, const String & remote_path)
142147
{
143148
for (auto & i : items)
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
#include <Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h>
2+
#include <Storages/MergeTree/MergeTreeData.h>
3+
#include <Storages/MergeTree/MergeTreePartInfo.h>
4+
#include <Interpreters/PartLog.h>
5+
#include <Common/logger_useful.h>
6+
#include <Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.h>
7+
#include <Storages/StorageMergeTree.h>
8+
9+
namespace DB
10+
{
11+
12+
ExportPartPlainMergeTreeTask::ExportPartPlainMergeTreeTask(
13+
StorageMergeTree & storage_,
14+
const std::shared_ptr<CurrentlyExportingPartsTagger> & exports_tagger_,
15+
const StoragePtr & destination_storage_,
16+
ContextPtr context_,
17+
std::shared_ptr<MergeTreeExportManifest> manifest_,
18+
IExecutableTask::TaskResultCallback & task_result_callback_,
19+
size_t max_retries_)
20+
: storage(storage_)
21+
, exports_tagger(exports_tagger_)
22+
, destination_storage(destination_storage_)
23+
, context(std::move(context_))
24+
, manifest(std::move(manifest_))
25+
, task_result_callback(task_result_callback_)
26+
, max_retries(max_retries_)
27+
{
28+
UInt64 transaction_id = std::stoull(manifest->transaction_id);
29+
priority.value = transaction_id;
30+
}
31+
32+
StorageID ExportPartPlainMergeTreeTask::getStorageID() const
33+
{
34+
return storage.getStorageID();
35+
}
36+
37+
String ExportPartPlainMergeTreeTask::getQueryId() const
38+
{
39+
return getStorageID().getShortName() + "::export_partition::" + manifest->transaction_id;
40+
}
41+
42+
bool ExportPartPlainMergeTreeTask::executeStep()
43+
{
44+
if (cancelled)
45+
return false;
46+
47+
switch (state)
48+
{
49+
case State::NEED_PREPARE:
50+
{
51+
prepare();
52+
state = State::NEED_EXECUTE;
53+
return true;
54+
}
55+
case State::NEED_EXECUTE:
56+
{
57+
if (executeExport())
58+
{
59+
state = State::NEED_COMMIT;
60+
return true;
61+
}
62+
63+
if (retry_count < max_retries)
64+
{
65+
retry_count++;
66+
LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"),
67+
"Retrying export attempt {} for part {}",
68+
retry_count, exports_tagger->parts_to_export[0]->name);
69+
state = State::NEED_EXECUTE;
70+
71+
return true;
72+
}
73+
74+
return false;
75+
}
76+
case State::NEED_COMMIT:
77+
{
78+
if (commitExport())
79+
{
80+
state = State::SUCCESS;
81+
return true;
82+
}
83+
84+
if (retry_count < max_retries)
85+
{
86+
retry_count++;
87+
LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"),
88+
"Retrying export attempt {} for part {}",
89+
retry_count, exports_tagger->parts_to_export[0]->name);
90+
state = State::NEED_COMMIT;
91+
92+
return true;
93+
}
94+
95+
return false;
96+
}
97+
case State::SUCCESS:
98+
{
99+
return false;
100+
}
101+
}
102+
103+
return false;
104+
}
105+
106+
107+
void ExportPartPlainMergeTreeTask::prepare()
108+
{
109+
stopwatch_ptr = std::make_unique<Stopwatch>();
110+
}
111+
112+
bool ExportPartPlainMergeTreeTask::executeExport()
113+
{
114+
if (cancelled)
115+
return false;
116+
117+
std::function<void(MergeTreePartImportStats)> part_log_wrapper = [this](MergeTreePartImportStats stats) {
118+
119+
std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex);
120+
auto table_id = storage.getStorageID();
121+
122+
if (stats.status.code != 0)
123+
{
124+
LOG_INFO(getLogger("ExportMergeTreePartitionToObjectStorageTask"), "Error importing part {}: {}", stats.part->name, stats.status.message);
125+
return;
126+
}
127+
128+
storage.export_partition_transaction_id_to_manifest[manifest->transaction_id]->updateRemotePathAndWrite(
129+
stats.part->name,
130+
stats.file_path);
131+
132+
UInt64 elapsed_ns = stopwatch_ptr->elapsedNanoseconds();
133+
134+
storage.writePartLog(
135+
PartLogElement::Type::EXPORT_PART,
136+
stats.status,
137+
elapsed_ns,
138+
stats.part->name,
139+
stats.part,
140+
{stats.part},
141+
nullptr,
142+
nullptr);
143+
};
144+
145+
try
146+
{
147+
destination_storage->importMergeTreePart(
148+
storage,
149+
exports_tagger->parts_to_export[0],
150+
context,
151+
part_log_wrapper);
152+
153+
return true;
154+
}
155+
catch (...)
156+
{
157+
LOG_ERROR(getLogger("ExportPartPlainMergeTreeTask"), "Failed to export part {}", exports_tagger->parts_to_export[0]->name);
158+
159+
return false;
160+
}
161+
}
162+
163+
bool ExportPartPlainMergeTreeTask::commitExport()
164+
{
165+
std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex);
166+
167+
if (manifest->exportedPaths().size() == manifest->items.size())
168+
{
169+
destination_storage->commitExportPartitionTransaction(
170+
manifest->transaction_id,
171+
manifest->partition_id,
172+
manifest->exportedPaths(),
173+
context);
174+
manifest->completed = true;
175+
manifest->write();
176+
storage.export_partition_transaction_id_to_manifest.erase(manifest->transaction_id);
177+
LOG_INFO(getLogger("ExportMergeTreePartitionToObjectStorageTask"),
178+
"Successfully committed export transaction {} for partition {}",
179+
manifest->transaction_id, manifest->partition_id);
180+
}
181+
182+
LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"), "Not all parts have been exported yet for transaction id {}, not comitting for this part", manifest->transaction_id);
183+
184+
return true;
185+
}
186+
187+
void ExportPartPlainMergeTreeTask::onCompleted()
188+
{
189+
bool success = (state == State::SUCCESS);
190+
task_result_callback(success);
191+
}
192+
193+
void ExportPartPlainMergeTreeTask::cancel() noexcept
194+
{
195+
cancelled = true;
196+
}
197+
198+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#pragma once
2+
3+
#include <Storages/MergeTree/IExecutableTask.h>
4+
#include <Storages/MergeTree/IMergeTreeDataPart.h>
5+
#include <Storages/MergeTree/MergeTreeExportManifest.h>
6+
#include <Storages/ObjectStorage/StorageObjectStorage.h>
7+
#include <Interpreters/Context.h>
8+
#include <Common/Stopwatch.h>
9+
#include <Storages/MergeTree/MergeMutateSelectedEntry.h>
10+
11+
namespace DB
12+
{
13+
14+
class MergeTreeData;
15+
16+
class ExportPartPlainMergeTreeTask : public IExecutableTask
17+
{
18+
public:
19+
ExportPartPlainMergeTreeTask(
20+
StorageMergeTree & storage_,
21+
const std::shared_ptr<CurrentlyExportingPartsTagger> & exports_tagger_,
22+
const StoragePtr & destination_storage_,
23+
ContextPtr context_,
24+
std::shared_ptr<MergeTreeExportManifest> manifest_,
25+
IExecutableTask::TaskResultCallback & task_result_callback_,
26+
size_t max_retries_ = 3);
27+
28+
void onCompleted() override;
29+
bool executeStep() override;
30+
void cancel() noexcept override;
31+
StorageID getStorageID() const override;
32+
Priority getPriority() const override { return priority; }
33+
String getQueryId() const override;
34+
35+
private:
36+
void prepare();
37+
bool executeExport();
38+
bool commitExport();
39+
bool exportedAllIndividualParts() const;
40+
41+
enum class State : uint8_t
42+
{
43+
NEED_PREPARE,
44+
NEED_EXECUTE,
45+
NEED_COMMIT,
46+
SUCCESS
47+
};
48+
49+
State state{State::NEED_PREPARE};
50+
51+
StorageMergeTree & storage;
52+
std::shared_ptr<CurrentlyExportingPartsTagger> exports_tagger;
53+
StoragePtr destination_storage;
54+
ContextPtr context;
55+
std::shared_ptr<MergeTreeExportManifest> manifest;
56+
IExecutableTask::TaskResultCallback task_result_callback;
57+
58+
size_t max_retries;
59+
size_t retry_count = 0;
60+
Priority priority;
61+
std::unique_ptr<Stopwatch> stopwatch_ptr;
62+
63+
bool cancelled = false;
64+
std::exception_ptr current_exception;
65+
};
66+
67+
using ExportPartPlainMergeTreeTaskPtr = std::shared_ptr<ExportPartPlainMergeTreeTask>;
68+
69+
}

src/Storages/ObjectStorage/MergeTree/ExportPartitionPlainMergeTreeTask.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,13 @@ void ExportPartitionPlainMergeTreeTask::executeExport()
168168
{
169169
LOG_ERROR(getLogger("ExportMergeTreePartitionToObjectStorageTask"),
170170
"Export attempt failed completely: {}", getCurrentExceptionMessage(true));
171-
171+
172+
std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex);
173+
manifest->deleteFile();
174+
storage.already_exported_partition_ids.erase(manifest->partition_id);
175+
storage.export_partition_transaction_id_to_manifest.erase(manifest->transaction_id);
176+
177+
172178
throw;
173179
}
174180
}

0 commit comments

Comments
 (0)