Skip to content

Commit 502b501

Browse files
committed
improve system.exports, show failed exports
1 parent af3352b commit 502b501

11 files changed

+231
-56
lines changed

src/Core/Settings.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6877,9 +6877,9 @@ Experimental timeSeries* aggregate functions for Prometheus-like timeseries resa
68776877
DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_partition, false, R"(
68786878
Experimental export merge tree partition.
68796879
)", EXPERIMENTAL, allow_experimental_export_merge_tree_partition) \
6880-
DECLARE_WITH_ALIAS(Bool, export_merge_tree_partition_individual_part_executor, true, R"(
6880+
DECLARE_WITH_ALIAS(Bool, export_merge_tree_partition_executor, false, R"(
68816881
Use the part task instead of the partition task
6882-
)", EXPERIMENTAL, export_merge_tree_partition_background_execution) \
6882+
)", EXPERIMENTAL, export_merge_tree_partition_executor) \
68836883
\
68846884
/* ####################################################### */ \
68856885
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <Storages/MergeTree/MergeTreeIndices.h>
1919
#include <Storages/MergeTree/MergeTreePartInfo.h>
2020
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
21+
#include <Storages/MergeTree/MergeTreeExportStatus.h>
2122
#include <Storages/MergeTree/MergeList.h>
2223
#include <Storages/MergeTree/IMergeTreeDataPart.h>
2324
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
@@ -966,6 +967,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
966967
bool must_on_same_disk);
967968

968969
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
970+
virtual std::vector<MergeTreeExportStatus> getExportsStatus() const { return {}; }
969971

970972
/// Returns true if table can create new parts with adaptive granularity
971973
/// Has additional constraint in replicated version

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#include <Poco/JSON/Stringifier.h>
1515
#include <Poco/Dynamic/Var.h>
1616
#include <Storages/MergeTree/IMergeTreeDataPart.h>
17+
#include <ctime>
18+
#include <magic_enum.hpp>
1719

1820
namespace DB
1921
{
@@ -25,29 +27,39 @@ namespace DB
2527
* "transaction_id": "<id>",
2628
* "partition_id": "<partition_id>",
2729
* "destination": "<database>.<table>",
28-
* "completed": <true/false>,
30+
* "create_time": <unix_timestamp>,
31+
* "status": "<pending|completed|failed>",
2932
* "parts": [ {"part_name": "name", "remote_path": "path-or-empty"}, ... ]
3033
* }
3134
*/
3235
struct MergeTreeExportManifest
3336
{
3437
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
38+
39+
enum class Status {
40+
pending,
41+
completed,
42+
failed
43+
};
3544

3645
MergeTreeExportManifest()
3746
: destination_storage_id(StorageID::createEmpty())
47+
, status(Status::pending)
3848
{}
3949

4050
struct Item
4151
{
4252
String part_name;
4353
String remote_path; // empty until uploaded
4454
};
55+
4556

4657
String transaction_id;
4758
String partition_id;
4859
StorageID destination_storage_id;
60+
time_t create_time = 0;
4961
std::vector<Item> items;
50-
bool completed = false;
62+
Status status = Status::pending;
5163

5264
std::filesystem::path file_path;
5365
DiskPtr disk;
@@ -65,6 +77,7 @@ struct MergeTreeExportManifest
6577
manifest->transaction_id = transaction_id_;
6678
manifest->partition_id = partition_id_;
6779
manifest->destination_storage_id = destination_storage_id_;
80+
manifest->create_time = std::time(nullptr);
6881
manifest->file_path = std::filesystem::path(path_prefix) / ("export_partition_" + partition_id_ + "_transaction_" + transaction_id_ + ".json");
6982
manifest->items.reserve(data_parts.size());
7083
for (const auto & data_part : data_parts)
@@ -94,7 +107,11 @@ struct MergeTreeExportManifest
94107
manifest->partition_id = root->getValue<String>("partition_id");
95108
const auto destination = root->getValue<String>("destination");
96109
manifest->destination_storage_id = StorageID(QualifiedTableName::parseFromString(destination));
97-
manifest->completed = root->getValue<bool>("completed");
110+
111+
manifest->create_time = root->getValue<UInt64>("create_time");
112+
113+
String status_str = root->getValue<String>("status");
114+
manifest->status = magic_enum::enum_cast<Status>(status_str).value();
98115

99116
manifest->items.clear();
100117
auto parts = root->get("parts").extract<Poco::JSON::Array::Ptr>();
@@ -118,7 +135,8 @@ struct MergeTreeExportManifest
118135
root->set("transaction_id", transaction_id);
119136
root->set("partition_id", partition_id);
120137
root->set("destination", destination_storage_id.getQualifiedName().getFullName());
121-
root->set("completed", completed);
138+
root->set("create_time", static_cast<UInt64>(create_time));
139+
root->set("status", String(magic_enum::enum_name(status)));
122140

123141
Poco::JSON::Array::Ptr parts(new Poco::JSON::Array());
124142
for (const auto & i : items)
@@ -138,7 +156,7 @@ struct MergeTreeExportManifest
138156
out->sync();
139157
}
140158

141-
void deleteFile()
159+
void deleteFile() const
142160
{
143161
disk->removeFile(file_path);
144162
}

src/Storages/MergeTree/MergeTreeExportStatus.cpp

Whitespace-only changes.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <base/types.h>
4+
#include <vector>
5+
#include <ctime>
6+
#include <Storages/MergeTree/MergeTreeExportManifest.h>
7+
8+
9+
namespace DB
10+
{
11+
12+
struct MergeTreeExportStatus
13+
{
14+
String source_database;
15+
String source_table;
16+
String destination_database;
17+
String destination_table;
18+
String transaction_id;
19+
time_t create_time = 0;
20+
std::vector<String> parts_to_do_names;
21+
MergeTreeExportManifest::Status status;
22+
};
23+
24+
}
25+

src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.cpp

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,40 +57,51 @@ bool ExportPartPlainMergeTreeTask::executeStep()
5757
if (executeExport())
5858
{
5959
state = State::NEED_COMMIT;
60-
return true;
6160
}
62-
63-
if (retry_count < max_retries)
61+
else if (retry_count < max_retries)
6462
{
6563
retry_count++;
6664
LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"),
6765
"Retrying export attempt {} for part {}",
6866
retry_count, exports_tagger->parts_to_export[0]->name);
6967
state = State::NEED_EXECUTE;
70-
71-
return true;
68+
}
69+
else
70+
{
71+
state = State::FAILED;
7272
}
7373

74-
return false;
74+
return true;
7575
}
7676
case State::NEED_COMMIT:
7777
{
7878
if (commitExport())
7979
{
8080
state = State::SUCCESS;
81-
return true;
8281
}
83-
84-
if (retry_count < max_retries)
82+
else if (retry_count < max_retries)
8583
{
8684
retry_count++;
8785
LOG_INFO(getLogger("ExportPartPlainMergeTreeTask"),
8886
"Retrying export attempt {} for part {}",
8987
retry_count, exports_tagger->parts_to_export[0]->name);
9088
state = State::NEED_COMMIT;
91-
92-
return true;
9389
}
90+
else
91+
{
92+
state = State::FAILED;
93+
}
94+
95+
return true;
96+
}
97+
case State::FAILED:
98+
{
99+
std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex);
100+
101+
manifest->status = MergeTreeExportManifest::Status::failed;
102+
manifest->write();
103+
104+
storage.already_exported_partition_ids.erase(manifest->partition_id);
94105

95106
return false;
96107
}
@@ -171,7 +182,7 @@ bool ExportPartPlainMergeTreeTask::commitExport()
171182
manifest->partition_id,
172183
manifest->exportedPaths(),
173184
context);
174-
manifest->completed = true;
185+
manifest->status = MergeTreeExportManifest::Status::completed;
175186
manifest->write();
176187
storage.export_partition_transaction_id_to_manifest.erase(manifest->transaction_id);
177188
LOG_INFO(getLogger("ExportMergeTreePartitionToObjectStorageTask"),

src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class ExportPartPlainMergeTreeTask : public IExecutableTask
4343
NEED_PREPARE,
4444
NEED_EXECUTE,
4545
NEED_COMMIT,
46+
FAILED,
4647
SUCCESS
4748
};
4849

src/Storages/ObjectStorage/MergeTree/ExportPartitionPlainMergeTreeTask.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ bool ExportPartitionPlainMergeTreeTask::commitExport()
189189
manifest->exportedPaths(),
190190
context);
191191

192-
manifest->completed = true;
192+
manifest->status = MergeTreeExportManifest::Status::completed;
193193
manifest->write();
194194

195195
storage.export_partition_transaction_id_to_manifest.erase(manifest->transaction_id);
@@ -204,7 +204,7 @@ bool ExportPartitionPlainMergeTreeTask::commitExport()
204204
void ExportPartitionPlainMergeTreeTask::onCompleted()
205205
{
206206
std::lock_guard lock(storage.export_partition_transaction_id_to_manifest_mutex);
207-
task_result_callback(manifest->completed);
207+
task_result_callback(manifest->status == MergeTreeExportManifest::Status::completed);
208208
}
209209

210210
void ExportPartitionPlainMergeTreeTask::cancel() noexcept

src/Storages/StorageMergeTree.cpp

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ namespace Setting
8282
extern const SettingsBool throw_on_unsupported_query_inside_transaction;
8383
extern const SettingsUInt64 max_parts_to_move;
8484
extern const SettingsBool allow_experimental_export_merge_tree_partition;
85-
extern const SettingsBool export_merge_tree_partition_individual_part_executor;
85+
extern const SettingsBool export_merge_tree_partition_executor;
8686
}
8787

8888
namespace MergeTreeSetting
@@ -559,8 +559,6 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
559559
throw Exception(ErrorCodes::PART_IS_LOCKED, "Partition {} has already been exported", partition_id);
560560
}
561561

562-
563-
564562
const auto transaction_id = std::to_string(generateSnowflakeID());
565563

566564
/// TODO missing parts lock here with tagger
@@ -578,7 +576,7 @@ void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command,
578576
export_partition_transaction_id_to_manifest[transaction_id] = manifest;
579577
}
580578

581-
if (getContext()->getSettingsRef()[Setting::export_merge_tree_partition_individual_part_executor])
579+
if (!getContext()->getSettingsRef()[Setting::export_merge_tree_partition_executor])
582580
{
583581
for (const auto & part : all_parts)
584582
{
@@ -981,6 +979,33 @@ std::map<std::string, MutationCommands> StorageMergeTree::getUnfinishedMutationC
981979
return result;
982980
}
983981

982+
983+
std::vector<MergeTreeExportStatus> StorageMergeTree::getExportsStatus() const
984+
{
985+
std::lock_guard lock(export_partition_transaction_id_to_manifest_mutex);
986+
std::vector<MergeTreeExportStatus> result;
987+
988+
auto source_database = getStorageID().database_name;
989+
auto source_table = getStorageID().table_name;
990+
991+
for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest)
992+
{
993+
MergeTreeExportStatus status;
994+
995+
status.transaction_id = transaction_id;
996+
status.source_database = source_database;
997+
status.source_table = source_table;
998+
status.destination_database = manifest->destination_storage_id.database_name;
999+
status.destination_table = manifest->destination_storage_id.table_name;
1000+
status.create_time = 0;
1001+
status.parts_to_do_names = manifest->pendingParts();
1002+
status.status = manifest->status;
1003+
1004+
result.emplace_back(std::move(status));
1005+
}
1006+
return result;
1007+
}
1008+
9841009
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
9851010
{
9861011
std::lock_guard lock(currently_processing_in_background_mutex);
@@ -1157,17 +1182,20 @@ void StorageMergeTree::readExportPartitionManifests()
11571182
{
11581183
auto manifest = MergeTreeExportManifest::read(disk, fs::path(relative_data_path) / name);
11591184

1160-
already_exported_partition_ids.insert(manifest->partition_id);
1161-
1162-
if (manifest->completed)
1185+
if (manifest->status != MergeTreeExportManifest::Status::failed)
11631186
{
1164-
LOG_INFO(
1165-
log,
1166-
"Export transaction {} of partition {} to destination storage {} already completed, skipping",
1167-
manifest->transaction_id,
1168-
manifest->partition_id,
1169-
manifest->destination_storage_id.getNameForLogs());
1170-
continue;
1187+
already_exported_partition_ids.insert(manifest->partition_id);
1188+
1189+
if (manifest->status == MergeTreeExportManifest::Status::completed)
1190+
{
1191+
LOG_INFO(
1192+
log,
1193+
"Export transaction {} of partition {} to destination storage {} already completed, skipping",
1194+
manifest->transaction_id,
1195+
manifest->partition_id,
1196+
manifest->destination_storage_id.getNameForLogs());
1197+
continue;
1198+
}
11711199
}
11721200

11731201
export_partition_transaction_id_to_manifest.emplace(manifest->transaction_id, manifest);
@@ -1189,6 +1217,9 @@ void StorageMergeTree::resumeExportPartitionTasks()
11891217
/// but it turns out the background executor schedules tasks based on their priority, so it is likely this is not needed anymore.
11901218
for (const auto & [transaction_id, manifest] : export_partition_transaction_id_to_manifest)
11911219
{
1220+
if (manifest->status != MergeTreeExportManifest::Status::pending)
1221+
continue;
1222+
11921223
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest->destination_storage_id, getContext());
11931224
if (!destination_storage)
11941225
{
@@ -1216,7 +1247,7 @@ void StorageMergeTree::resumeExportPartitionTasks()
12161247
parts_to_export.emplace_back(part);
12171248
}
12181249

1219-
if (getContext()->getSettingsRef()[Setting::export_merge_tree_partition_individual_part_executor])
1250+
if (!getContext()->getSettingsRef()[Setting::export_merge_tree_partition_executor])
12201251
{
12211252
for (const auto & part : parts_to_export)
12221253
{

src/Storages/StorageMergeTree.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ class StorageMergeTree final : public MergeTreeData
9696
/// Return introspection information about currently processing or recently processed mutations.
9797
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
9898

99+
std::vector<MergeTreeExportStatus> getExportsStatus() const override;
100+
99101
CancellationCode killMutation(const String & mutation_id) override;
100102

101103
/// Makes backup entries to backup the data of the storage.
@@ -160,7 +162,7 @@ class StorageMergeTree final : public MergeTreeData
160162
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
161163

162164
std::map<String, std::shared_ptr<MergeTreeExportManifest>> export_partition_transaction_id_to_manifest;
163-
std::mutex export_partition_transaction_id_to_manifest_mutex;
165+
mutable std::mutex export_partition_transaction_id_to_manifest_mutex; /// mutable because of getExportsStatus
164166

165167
/// Unfinished mutations that are required for AlterConversions.
166168
MutationCounters mutation_counters;

0 commit comments

Comments
 (0)