Skip to content

Commit 8c02b95

Browse files
committed
almost done with kill export partition
1 parent 8948a90 commit 8c02b95

12 files changed

+212
-34
lines changed

src/Interpreters/InterpreterKillQueryQuery.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,77 @@ BlockIO InterpreterKillQueryQuery::execute()
250250

251251
break;
252252
}
253+
case ASTKillQueryQuery::Type::ExportPartition:
254+
{
255+
Block exports_block = getSelectResult(
256+
"source_database, source_table, transaction_id, destination_database, destination_table, partition_id",
257+
"system.replicated_partition_exports");
258+
if (!exports_block)
259+
return res_io;
260+
261+
const ColumnString & src_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_database").column);
262+
const ColumnString & src_tbl_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_table").column);
263+
const ColumnString & dst_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_database").column);
264+
const ColumnString & dst_tbl_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_table").column);
265+
const ColumnString & tx_col = typeid_cast<const ColumnString &>(*exports_block.getByName("transaction_id").column);
266+
267+
auto header = exports_block.cloneEmpty();
268+
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
269+
270+
MutableColumns res_columns = header.cloneEmptyColumns();
271+
auto table_id = StorageID::createEmpty();
272+
AccessRightsElements required_access_rights;
273+
auto access = getContext()->getAccess();
274+
bool access_denied = false;
275+
276+
for (size_t i = 0; i < exports_block.rows(); ++i)
277+
{
278+
const auto src_database = src_db_col.getDataAt(i).toString();
279+
const auto src_table = src_tbl_col.getDataAt(i).toString();
280+
const auto dst_database = dst_db_col.getDataAt(i).toString();
281+
const auto dst_table = dst_tbl_col.getDataAt(i).toString();
282+
283+
table_id = StorageID{src_database, src_table};
284+
auto transaction_id = tx_col.getDataAt(i).toString();
285+
286+
CancellationCode code = CancellationCode::Unknown;
287+
if (!query.test)
288+
{
289+
auto storage = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
290+
if (!storage)
291+
code = CancellationCode::NotFound;
292+
else
293+
{
294+
ASTAlterCommand alter_command{};
295+
alter_command.type = ASTAlterCommand::EXPORT_PARTITION;
296+
alter_command.move_destination_type = DataDestinationType::TABLE;
297+
alter_command.from_database = src_database;
298+
alter_command.from_table = src_table;
299+
alter_command.to_database = dst_database;
300+
alter_command.to_table = dst_table;
301+
302+
required_access_rights = InterpreterAlterQuery::getRequiredAccessForCommand(
303+
alter_command, table_id.database_name, table_id.table_name);
304+
if (!access->isGranted(required_access_rights))
305+
{
306+
access_denied = true;
307+
continue;
308+
}
309+
code = storage->killExportPartition(transaction_id);
310+
}
311+
}
312+
313+
insertResultRow(i, code, exports_block, header, res_columns);
314+
}
315+
316+
if (res_columns[0]->empty() && access_denied)
317+
throw Exception(ErrorCodes::ACCESS_DENIED, "Not allowed to kill export partition. "
318+
"To execute this query, it's necessary to have the grant {}", required_access_rights.toString());
319+
320+
res_io.pipeline = QueryPipeline(Pipe(std::make_shared<SourceFromSingleChunk>(header.cloneWithColumns(std::move(res_columns)))));
321+
322+
break;
323+
}
253324
case ASTKillQueryQuery::Type::Mutation:
254325
{
255326
Block mutations_block = getSelectResult("database, table, mutation_id, command", "system.mutations");
@@ -462,6 +533,9 @@ AccessRightsElements InterpreterKillQueryQuery::getRequiredAccessForDDLOnCluster
462533
| AccessType::ALTER_MATERIALIZE_COLUMN
463534
| AccessType::ALTER_MATERIALIZE_TTL
464535
);
536+
/// todo arthur think about this
537+
else if (query.type == ASTKillQueryQuery::Type::ExportPartition)
538+
required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION);
465539
return required_access;
466540
}
467541

src/Parsers/ASTKillQueryQuery.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ void ASTKillQueryQuery::formatQueryImpl(WriteBuffer & ostr, const FormatSettings
2727
case Type::Transaction:
2828
ostr << "TRANSACTION";
2929
break;
30+
case Type::ExportPartition:
31+
ostr << "EXPORT PARTITION";
32+
break;
3033
}
3134

3235
ostr << (settings.hilite ? hilite_none : "");

src/Parsers/ASTKillQueryQuery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class ASTKillQueryQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluste
1313
{
1414
Query, /// KILL QUERY
1515
Mutation, /// KILL MUTATION
16+
ExportPartition, /// KILL EXPORT_PARTITION
1617
PartMoveToShard, /// KILL PART_MOVE_TO_SHARD
1718
Transaction, /// KILL TRANSACTION
1819
};

src/Parsers/ParserKillQueryQuery.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
1717
ParserKeyword p_kill{Keyword::KILL};
1818
ParserKeyword p_query{Keyword::QUERY};
1919
ParserKeyword p_mutation{Keyword::MUTATION};
20+
ParserKeyword p_export_partition{Keyword::EXPORT_PARTITION};
2021
ParserKeyword p_part_move_to_shard{Keyword::PART_MOVE_TO_SHARD};
2122
ParserKeyword p_transaction{Keyword::TRANSACTION};
2223
ParserKeyword p_on{Keyword::ON};
@@ -33,6 +34,8 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
3334
query->type = ASTKillQueryQuery::Type::Query;
3435
else if (p_mutation.ignore(pos, expected))
3536
query->type = ASTKillQueryQuery::Type::Mutation;
37+
else if (p_export_partition.ignore(pos, expected))
38+
query->type = ASTKillQueryQuery::Type::ExportPartition;
3639
else if (p_part_move_to_shard.ignore(pos, expected))
3740
query->type = ASTKillQueryQuery::Type::PartMoveToShard;
3841
else if (p_transaction.ignore(pos, expected))

src/Storages/IStorage.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,11 @@ CancellationCode IStorage::killPartMoveToShard(const UUID & /*task_uuid*/)
300300
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part moves between shards are not supported by storage {}", getName());
301301
}
302302

303+
CancellationCode IStorage::killExportPartition(const String & /*transaction_id*/)
304+
{
305+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Export partition is not supported by storage {}", getName());
306+
}
307+
303308
StorageID IStorage::getStorageID() const
304309
{
305310
std::lock_guard lock(id_mutex);

src/Storages/IStorage.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,9 @@ It is currently only implemented in StorageObjectStorage.
574574

575575
virtual void setMutationCSN(const String & /*mutation_id*/, UInt64 /*csn*/);
576576

577+
/// Cancel a replicated partition export by transaction id.
578+
virtual CancellationCode killExportPartition(const String & /*transaction_id*/);
579+
577580
/// Cancel a part move to shard.
578581
virtual CancellationCode killPartMoveToShard(const UUID & /*task_uuid*/);
579582

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5910,14 +5910,15 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
59105910

59115911
const auto database_name = query_context->resolveDatabase(command.to_database);
59125912

5913-
exportPartToTable(part_name, StorageID{database_name, command.to_table}, query_context);
5913+
exportPartToTable(part_name, StorageID{database_name, command.to_table}, query_context->getCurrentQueryId(), query_context);
59145914
}
59155915

59165916
void MergeTreeData::exportPartToTable(
59175917
const std::string & part_name,
59185918
const StorageID & destination_storage_id,
5919+
const String & transaction_id,
59195920
ContextPtr query_context,
5920-
std::function<void(MergeTreeExportManifest::CompletionCallbackResult)> completion_callback)
5921+
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback)
59215922
{
59225923
auto dest_storage = DatabaseCatalog::instance().getTable(destination_storage_id, query_context);
59235924

@@ -5950,9 +5951,10 @@ void MergeTreeData::exportPartToTable(
59505951
part_name, getStorageID().getFullTableName());
59515952

59525953
{
5953-
MergeTreeExportManifest manifest(
5954+
MergeTreePartExportManifest manifest(
59545955
dest_storage->getStorageID(),
59555956
part,
5957+
transaction_id,
59565958
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
59575959
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
59585960
completion_callback);
@@ -5970,7 +5972,7 @@ void MergeTreeData::exportPartToTable(
59705972
}
59715973

59725974
void MergeTreeData::exportPartToTableImpl(
5973-
const MergeTreeExportManifest & manifest,
5975+
const MergeTreePartExportManifest & manifest,
59745976
ContextPtr local_context)
59755977
{
59765978
auto metadata_snapshot = getInMemoryMetadataPtr();
@@ -6027,7 +6029,7 @@ void MergeTreeData::exportPartToTableImpl(
60276029
export_manifests.erase(manifest);
60286030

60296031
if (manifest.completion_callback)
6030-
manifest.completion_callback(MergeTreeExportManifest::CompletionCallbackResult::createFailure(e.message()));
6032+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e.message()));
60316033
return;
60326034
}
60336035

@@ -6096,6 +6098,9 @@ void MergeTreeData::exportPartToTableImpl(
60966098

60976099
pipeline.complete(sink);
60986100

6101+
/// oh boy, is there another way?
6102+
manifest.pipeline = &pipeline;
6103+
60996104
try
61006105
{
61016106
CompletedPipelineExecutor exec(pipeline);
@@ -6119,7 +6124,7 @@ void MergeTreeData::exportPartToTableImpl(
61196124
ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast<UInt64>((*exports_list_entry)->elapsed * 1000));
61206125

61216126
if (manifest.completion_callback)
6122-
manifest.completion_callback(MergeTreeExportManifest::CompletionCallbackResult::createSuccess(destination_file_path));
6127+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess(destination_file_path));
61236128
}
61246129
catch (const Exception & e)
61256130
{
@@ -6142,12 +6147,30 @@ void MergeTreeData::exportPartToTableImpl(
61426147
export_manifests.erase(manifest);
61436148

61446149
if (manifest.completion_callback)
6145-
manifest.completion_callback(MergeTreeExportManifest::CompletionCallbackResult::createFailure(e.message()));
6150+
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e.message()));
61466151

61476152
throw;
61486153
}
61496154
}
61506155

6156+
void MergeTreeData::killExportPart(const String & query_id)
6157+
{
6158+
std::lock_guard lock(export_manifests_mutex);
6159+
6160+
const auto it = std::find_if(export_manifests.begin(), export_manifests.end(), [&](const auto & manifest)
6161+
{
6162+
return manifest.query_id == query_id;
6163+
});
6164+
6165+
if (it == export_manifests.end())
6166+
return;
6167+
6168+
if (it->pipeline)
6169+
it->pipeline->cancel();
6170+
6171+
export_manifests.erase(it);
6172+
}
6173+
61516174
void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/)
61526175
{
61536176
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MOVE PARTITION TO SHARD is not supported by storage {}", getName());
@@ -8881,7 +8904,9 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
88818904
}
88828905

88838906
manifest.in_progress = assignee.scheduleMoveTask(std::make_shared<ExecutableLambdaAdapter>(
8884-
[this, manifest] () mutable {
8907+
[this, &manifest] () mutable {
8908+
/// TODO arthur fix this: I need to be able to modify the real manifest
8909+
/// but grabbing it by reference is causing problems
88858910
exportPartToTableImpl(manifest, getContext());
88868911
return true;
88878912
},

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
#include <Interpreters/PartLog.h>
3636
#include <Poco/Timestamp.h>
3737
#include <Common/threadPoolCallbackRunner.h>
38-
#include <Storages/MergeTree/MergeTreeExportStatus.h>
39-
#include <Storages/MergeTree/MergeTreeExportManifest.h>
38+
#include <Storages/MergeTree/MergeTreePartExportStatus.h>
39+
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
4040

4141
#include <boost/multi_index_container.hpp>
4242
#include <boost/multi_index/ordered_index.hpp>
@@ -908,16 +908,20 @@ class MergeTreeData : public IStorage, public WithMutableContext
908908

909909
void exportPartToTable(
910910
const std::string & part_name,
911-
const StorageID & destination_storage_id, ContextPtr query_context,
912-
std::function<void(MergeTreeExportManifest::CompletionCallbackResult)> completion_callback = {});
911+
const StorageID & destination_storage_id,
912+
const String & transaction_id,
913+
ContextPtr query_context,
914+
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback = {});
915+
916+
void killExportPart(const String & query_id);
913917

914918
virtual void exportPartitionToTable(const PartitionCommand &, ContextPtr)
915919
{
916920
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "EXPORT PARTITION is not implemented");
917921
}
918922

919923
void exportPartToTableImpl(
920-
const MergeTreeExportManifest & manifest,
924+
const MergeTreePartExportManifest & manifest,
921925
ContextPtr local_context);
922926

923927
/// Checks that Partition could be dropped right now
@@ -1166,7 +1170,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
11661170

11671171
mutable std::mutex export_manifests_mutex;
11681172

1169-
std::set<MergeTreeExportManifest> export_manifests;
1173+
std::set<MergeTreePartExportManifest> export_manifests;
11701174

11711175
PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
11721176

src/Storages/MergeTree/MergeTreeExportManifest.h renamed to src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
#include <Interpreters/StorageID.h>
22
#include <Storages/MergeTree/IMergeTreeDataPart.h>
3+
#include "QueryPipeline/QueryPipeline.h"
34

45
namespace DB
56
{
67

7-
struct MergeTreeExportManifest
8+
struct MergeTreePartExportManifest
89
{
910
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
1011

@@ -30,21 +31,25 @@ struct MergeTreeExportManifest
3031
String exception;
3132
};
3233

33-
MergeTreeExportManifest(
34+
MergeTreePartExportManifest(
3435
const StorageID & destination_storage_id_,
3536
const DataPartPtr & data_part_,
37+
const String & query_id_,
3638
bool overwrite_file_if_exists_,
3739
bool parallel_formatting_,
3840
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
3941
: destination_storage_id(destination_storage_id_),
4042
data_part(data_part_),
43+
query_id(query_id_),
4144
overwrite_file_if_exists(overwrite_file_if_exists_),
4245
parallel_formatting(parallel_formatting_),
4346
completion_callback(completion_callback_),
4447
create_time(time(nullptr)) {}
4548

4649
StorageID destination_storage_id;
4750
DataPartPtr data_part;
51+
/// Used for killing the export.
52+
String query_id;
4853
bool overwrite_file_if_exists;
4954
bool parallel_formatting;
5055

@@ -53,8 +58,10 @@ struct MergeTreeExportManifest
5358

5459
time_t create_time;
5560
mutable bool in_progress = false;
61+
/// Used for killing the export
62+
mutable QueryPipeline * pipeline = nullptr;
5663

57-
bool operator<(const MergeTreeExportManifest & rhs) const
64+
bool operator<(const MergeTreePartExportManifest & rhs) const
5865
{
5966
// Lexicographic comparison: first compare destination storage, then part name
6067
auto lhs_storage = destination_storage_id.getQualifiedName();
@@ -66,7 +73,7 @@ struct MergeTreeExportManifest
6673
return data_part->name < rhs.data_part->name;
6774
}
6875

69-
bool operator==(const MergeTreeExportManifest & rhs) const
76+
bool operator==(const MergeTreePartExportManifest & rhs) const
7077
{
7178
return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName()
7279
&& data_part->name == rhs.data_part->name;
File renamed without changes.

0 commit comments

Comments
 (0)