Skip to content

Commit 2fbf395

Browse files
committed
working kill export, update next idx upon lock and lock as many parts as one can at once
1 parent 8c02b95 commit 2fbf395

File tree

6 files changed

+348
-58
lines changed

6 files changed

+348
-58
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
#include <Storages/MergeTree/ExportPartTask.h>
2+
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
3+
#include <Storages/MergeTree/MergeTreeData.h>
4+
#include <Interpreters/Context.h>
5+
#include <Interpreters/DatabaseCatalog.h>
6+
#include <Core/Settings.h>
7+
#include <Interpreters/ExpressionActions.h>
8+
#include <Processors/Executors/CompletedPipelineExecutor.h>
9+
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
10+
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
11+
#include <Processors/QueryPlan/QueryPlan.h>
12+
#include <QueryPipeline/QueryPipelineBuilder.h>
13+
#include <Common/ProfileEventsScope.h>
14+
15+
namespace ProfileEvents
16+
{
17+
extern const Event PartsExportDuplicated;
18+
extern const Event PartsExportFailures;
19+
extern const Event PartsExports;
20+
extern const Event PartsExportTotalMilliseconds;
21+
}
22+
23+
namespace DB
24+
{
25+
26+
namespace ErrorCodes
27+
{
28+
extern const int UNKNOWN_TABLE;
29+
extern const int FILE_ALREADY_EXISTS;
30+
extern const int LOGICAL_ERROR;
31+
}
32+
33+
namespace Setting
34+
{
35+
extern const SettingsUInt64 min_bytes_to_use_direct_io;
36+
}
37+
38+
ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_, ContextPtr context_)
39+
: storage(storage_),
40+
manifest(manifest_),
41+
local_context(context_)
42+
{
43+
}
44+
45+
bool ExportPartTask::executeStep()
46+
{
47+
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
48+
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
49+
StorageSnapshotPtr storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, local_context);
50+
51+
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
52+
53+
NamesAndTypesList partition_columns;
54+
if (metadata_snapshot->hasPartitionKey())
55+
{
56+
const auto & partition_key = metadata_snapshot->getPartitionKey();
57+
if (!partition_key.column_names.empty())
58+
partition_columns = partition_key.expression->getRequiredColumnsWithTypes();
59+
}
60+
61+
auto block_with_partition_values = manifest.data_part->partition.getBlockWithPartitionValues(partition_columns);
62+
63+
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, local_context);
64+
if (!destination_storage)
65+
{
66+
std::lock_guard inner_lock(storage.export_manifests_mutex);
67+
68+
const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs();
69+
storage.export_manifests.erase(manifest);
70+
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name);
71+
}
72+
73+
SinkToStoragePtr sink;
74+
std::string destination_file_path;
75+
76+
try
77+
{
78+
auto context_copy = Context::createCopy(local_context);
79+
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
80+
81+
sink = destination_storage->import(
82+
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
83+
block_with_partition_values,
84+
destination_file_path,
85+
manifest.overwrite_file_if_exists,
86+
context_copy);
87+
}
88+
catch (const Exception & e)
89+
{
90+
if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS)
91+
{
92+
ProfileEvents::increment(ProfileEvents::PartsExportDuplicated);
93+
}
94+
95+
ProfileEvents::incrementNoTrace(ProfileEvents::PartsExportFailures);
96+
97+
std::lock_guard inner_lock(storage.export_manifests_mutex);
98+
storage.export_manifests.erase(manifest);
99+
100+
if (manifest.completion_callback)
101+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e.message()));
102+
return false;
103+
}
104+
105+
bool apply_deleted_mask = true;
106+
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
107+
bool prefetch = false;
108+
109+
MergeTreeData::IMutationsSnapshot::Params params
110+
{
111+
.metadata_version = metadata_snapshot->getMetadataVersion(),
112+
.min_part_metadata_version = manifest.data_part->getMetadataVersion(),
113+
};
114+
115+
auto mutations_snapshot = storage.getMutationsSnapshot(params);
116+
117+
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
118+
manifest.data_part,
119+
mutations_snapshot,
120+
local_context);
121+
122+
QueryPlan plan_for_part;
123+
124+
createReadFromPartStep(
125+
read_type,
126+
plan_for_part,
127+
storage,
128+
storage_snapshot,
129+
RangesInDataPart(manifest.data_part),
130+
alter_conversions,
131+
nullptr,
132+
columns_to_read,
133+
nullptr,
134+
apply_deleted_mask,
135+
std::nullopt,
136+
read_with_direct_io,
137+
prefetch,
138+
local_context,
139+
getLogger("ExportPartition"));
140+
141+
auto exports_list_entry = storage.getContext()->getExportsList().insert(
142+
getStorageID(),
143+
manifest.destination_storage_id,
144+
manifest.data_part->getBytesOnDisk(),
145+
manifest.data_part->name,
146+
destination_file_path,
147+
manifest.data_part->rows_count,
148+
manifest.data_part->getBytesOnDisk(),
149+
manifest.data_part->getBytesUncompressedOnDisk(),
150+
manifest.create_time,
151+
local_context);
152+
153+
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");
154+
155+
QueryPlanOptimizationSettings optimization_settings(local_context);
156+
auto pipeline_settings = BuildQueryPipelineSettings(local_context);
157+
auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings);
158+
159+
builder->setProgressCallback([&exports_list_entry](const Progress & progress)
160+
{
161+
(*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes;
162+
(*exports_list_entry)->rows_read += progress.read_rows;
163+
(*exports_list_entry)->elapsed = (*exports_list_entry)->watch.elapsedSeconds();
164+
});
165+
166+
pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
167+
168+
pipeline.complete(sink);
169+
170+
try
171+
{
172+
CompletedPipelineExecutor exec(pipeline);
173+
exec.execute();
174+
175+
std::lock_guard inner_lock(storage.export_manifests_mutex);
176+
storage.writePartLog(
177+
PartLogElement::Type::EXPORT_PART,
178+
{},
179+
static_cast<UInt64>((*exports_list_entry)->elapsed * 1000000000),
180+
manifest.data_part->name,
181+
manifest.data_part,
182+
{manifest.data_part},
183+
nullptr,
184+
nullptr,
185+
exports_list_entry.get());
186+
187+
storage.export_manifests.erase(manifest);
188+
189+
ProfileEvents::increment(ProfileEvents::PartsExports);
190+
ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast<UInt64>((*exports_list_entry)->elapsed * 1000));
191+
192+
if (manifest.completion_callback)
193+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess(destination_file_path));
194+
}
195+
catch (const Exception & e)
196+
{
197+
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name));
198+
199+
ProfileEvents::increment(ProfileEvents::PartsExportFailures);
200+
201+
std::lock_guard inner_lock(storage.export_manifests_mutex);
202+
storage.writePartLog(
203+
PartLogElement::Type::EXPORT_PART,
204+
ExecutionStatus::fromCurrentException("", true),
205+
static_cast<UInt64>((*exports_list_entry)->elapsed * 1000000000),
206+
manifest.data_part->name,
207+
manifest.data_part,
208+
{manifest.data_part},
209+
nullptr,
210+
nullptr,
211+
exports_list_entry.get());
212+
213+
storage.export_manifests.erase(manifest);
214+
215+
if (manifest.completion_callback)
216+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e.message()));
217+
218+
throw;
219+
}
220+
return false;
221+
}
222+
223+
void ExportPartTask::cancel() noexcept
224+
{
225+
pipeline.cancel();
226+
}
227+
228+
void ExportPartTask::onCompleted()
229+
{
230+
}
231+
232+
StorageID ExportPartTask::getStorageID() const
233+
{
234+
return manifest.destination_storage_id;
235+
}
236+
237+
Priority ExportPartTask::getPriority() const
238+
{
239+
return Priority{};
240+
}
241+
242+
String ExportPartTask::getQueryId() const
243+
{
244+
return manifest.query_id;
245+
}
246+
247+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#pragma once
2+
3+
#include <Storages/MergeTree/IExecutableTask.h>
4+
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
5+
#include <Storages/MergeTree/MergeTreeData.h>
6+
7+
namespace DB
8+
{
9+
10+
class ExportPartTask : public IExecutableTask
11+
{
12+
public:
13+
explicit ExportPartTask(
14+
MergeTreeData & storage_,
15+
const MergeTreePartExportManifest & manifest_,
16+
ContextPtr context_);
17+
bool executeStep() override;
18+
void onCompleted() override;
19+
StorageID getStorageID() const override;
20+
Priority getPriority() const override;
21+
String getQueryId() const override;
22+
23+
void cancel() noexcept override;
24+
25+
private:
26+
MergeTreeData & storage;
27+
MergeTreePartExportManifest manifest;
28+
ContextPtr local_context;
29+
QueryPipeline pipeline;
30+
};
31+
32+
}

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <Common/scope_guard_safe.h>
2525
#include <Common/typeid_cast.h>
2626
#include <Common/thread_local_rng.h>
27+
#include "Storages/MergeTree/ExportPartTask.h"
2728
#include <Processors/Executors/CompletedPipelineExecutor.h>
2829
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
2930
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
@@ -6098,9 +6099,6 @@ void MergeTreeData::exportPartToTableImpl(
60986099

60996100
pipeline.complete(sink);
61006101

6101-
/// oh boy, is there another way?
6102-
manifest.pipeline = &pipeline;
6103-
61046102
try
61056103
{
61066104
CompletedPipelineExecutor exec(pipeline);
@@ -6157,18 +6155,16 @@ void MergeTreeData::killExportPart(const String & query_id)
61576155
{
61586156
std::lock_guard lock(export_manifests_mutex);
61596157

6160-
const auto it = std::find_if(export_manifests.begin(), export_manifests.end(), [&](const auto & manifest)
6158+
std::erase_if(export_manifests, [&](const auto & manifest)
61616159
{
6162-
return manifest.query_id == query_id;
6160+
if (manifest.query_id == query_id)
6161+
{
6162+
if (manifest.task)
6163+
manifest.task->cancel();
6164+
return true;
6165+
}
6166+
return false;
61636167
});
6164-
6165-
if (it == export_manifests.end())
6166-
return;
6167-
6168-
if (it->pipeline)
6169-
it->pipeline->cancel();
6170-
6171-
export_manifests.erase(it);
61726168
}
61736169

61746170
void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/)
@@ -8903,17 +8899,18 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
89038899
continue;
89048900
}
89058901

8906-
manifest.in_progress = assignee.scheduleMoveTask(std::make_shared<ExecutableLambdaAdapter>(
8907-
[this, &manifest] () mutable {
8908-
/// TODO arthur fix this: I need to be able to modify the real manifest
8909-
/// but grabbing it by reference is causing problems
8910-
exportPartToTableImpl(manifest, getContext());
8911-
return true;
8912-
},
8913-
moves_assignee_trigger,
8914-
getStorageID()));
8902+
auto task = std::make_shared<ExportPartTask>(*this, manifest, getContext());
8903+
8904+
manifest.in_progress = assignee.scheduleMoveTask(task);
8905+
8906+
if (!manifest.in_progress)
8907+
{
8908+
continue;
8909+
}
89158910

8916-
return manifest.in_progress;
8911+
manifest.task = task;
8912+
8913+
return true;
89178914
}
89188915

89198916
return false;

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
12581258
friend class MergeTask;
12591259
friend class IPartMetadataManager;
12601260
friend class IMergedBlockOutputStream; // for access to log
1261+
friend class ExportPartTask;
12611262

12621263
bool require_part_metadata;
12631264

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
#pragma once
2+
13
#include <Interpreters/StorageID.h>
24
#include <Storages/MergeTree/IMergeTreeDataPart.h>
3-
#include "QueryPipeline/QueryPipeline.h"
5+
#include <QueryPipeline/QueryPipeline.h>
46

57
namespace DB
68
{
79

10+
class ExportPartTask;
11+
812
struct MergeTreePartExportManifest
913
{
1014
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
@@ -58,8 +62,7 @@ struct MergeTreePartExportManifest
5862

5963
time_t create_time;
6064
mutable bool in_progress = false;
61-
/// Used for killing the export
62-
mutable QueryPipeline * pipeline = nullptr;
65+
mutable std::shared_ptr<ExportPartTask> task = nullptr;
6366

6467
bool operator<(const MergeTreePartExportManifest & rhs) const
6568
{

0 commit comments

Comments
 (0)