Skip to content

Commit cf13ec2

Browse files
committed
use enum for statuses, remove status from completed part export, try to make tests a bit more stable, cache status
1 parent de9deb2 commit cf13ec2

File tree

9 files changed

+57
-26
lines changed

9 files changed

+57
-26
lines changed

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5050
{"export_merge_tree_partition_force_export", false, false, "New setting."},
5151
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
5252
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
53+
{"export_merge_tree_part_file_already_exists_policy", "NO_OP", "NO_OP", "New setting."},
5354
});
5455
addSettingsChanges(settings_changes_history, "25.8",
5556
{

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@ namespace DB
1212

1313
struct ExportReplicatedMergeTreePartitionProcessingPartEntry
1414
{
15+
16+
enum class Status
17+
{
18+
PENDING,
19+
COMPLETED,
20+
FAILED
21+
};
22+
1523
String part_name;
16-
String status;
24+
Status status;
1725
size_t retry_count;
1826
String finished_by;
1927

@@ -22,7 +30,7 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry
2230
Poco::JSON::Object json;
2331

2432
json.set("part_name", part_name);
25-
json.set("status", status);
33+
json.set("status", String(magic_enum::enum_name(status)));
2634
json.set("retry_count", retry_count);
2735
json.set("finished_by", finished_by);
2836
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
@@ -41,7 +49,7 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry
4149
ExportReplicatedMergeTreePartitionProcessingPartEntry entry;
4250

4351
entry.part_name = json->getValue<String>("part_name");
44-
entry.status = json->getValue<String>("status");
52+
entry.status = magic_enum::enum_cast<Status>(json->getValue<String>("status")).value();
4553
entry.retry_count = json->getValue<size_t>("retry_count");
4654
if (json->has("finished_by"))
4755
{
@@ -55,15 +63,13 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
5563
{
5664
String part_name;
5765
String path_in_destination;
58-
String status;
5966
String finished_by;
6067

6168
std::string toJsonString() const
6269
{
6370
Poco::JSON::Object json;
6471
json.set("part_name", part_name);
6572
json.set("path_in_destination", path_in_destination);
66-
json.set("status", status);
6773
json.set("finished_by", finished_by);
6874
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
6975
oss.exceptions(std::ios::failbit);
@@ -81,7 +87,6 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
8187

8288
entry.part_name = json->getValue<String>("part_name");
8389
entry.path_in_destination = json->getValue<String>("path_in_destination");
84-
entry.status = json->getValue<String>("status");
8590
entry.finished_by = json->getValue<String>("finished_by");
8691

8792
return entry;

src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
1515
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
1616
ExportReplicatedMergeTreePartitionManifest manifest;
1717

18+
enum class Status
19+
{
20+
PENDING,
21+
COMPLETED,
22+
FAILED
23+
};
24+
25+
/// Allows us to skip completed / failed entries during scheduling
26+
mutable Status status;
27+
1828
/// References to the parts that should be exported
1929
/// This is used to prevent the parts from being deleted before finishing the export operation
2030
/// It does not mean this replica will export all the parts

src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ void ExportPartitionManifestUpdatingTask::addTask(
265265
}
266266

267267
/// Insert or update entry. The multi_index container automatically maintains both indexes.
268-
auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, std::move(part_references)};
268+
auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, std::move(part_references)};
269269
auto it = entries_by_key.find(key);
270270
if (it != entries_by_key.end())
271271
entries_by_key.replace(it, entry);

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,20 @@ void ExportPartitionTaskScheduler::run()
3535
auto zk = storage.getZooKeeper();
3636

3737
// Iterate sorted by create_time
38-
for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time)
38+
for (auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time)
3939
{
4040
const auto & manifest = entry.manifest;
4141
const auto key = entry.getCompositeKey();
4242
const auto & database = storage.getContext()->resolveDatabase(manifest.destination_database);
4343
const auto & table = manifest.destination_table;
4444

45+
/// No need to query zk for status if the local one is not PENDING
46+
if (entry.status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
47+
{
48+
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Local status is {}", magic_enum::enum_name(entry.status).data());
49+
continue;
50+
}
51+
4552
const auto destination_storage_id = StorageID(QualifiedTableName {database, table});
4653

4754
const auto destination_storage = DatabaseCatalog::instance().tryGetTable(destination_storage_id, storage.getContext());
@@ -52,16 +59,25 @@ void ExportPartitionTaskScheduler::run()
5259
continue;
5360
}
5461

55-
std::string status;
56-
if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", status))
62+
std::string status_in_zk_string;
63+
if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", status_in_zk_string))
5764
{
5865
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get status, skipping");
5966
continue;
6067
}
6168

62-
if (status != "PENDING")
69+
const auto status_in_zk = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(status_in_zk_string);
70+
71+
if (!status_in_zk)
72+
{
73+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get status from zk, skipping");
74+
continue;
75+
}
76+
77+
if (status_in_zk.value() != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
6378
{
64-
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Status is not PENDING");
79+
entry.status = status_in_zk.value();
80+
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Status from zk is {}", entry.status);
6581
continue;
6682
}
6783

@@ -238,10 +254,10 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
238254
if (processing_part_entry.retry_count >= max_retries)
239255
{
240256
/// just set status in processing_part_path and finished_by
241-
processing_part_entry.status = "FAILED";
257+
processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED;
242258
processing_part_entry.finished_by = storage.replica_name;
243259

244-
ops.emplace_back(zkutil::makeSetRequest(export_path / "status", "FAILED", -1));
260+
ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1));
245261
LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name);
246262
}
247263

@@ -312,7 +328,6 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed(
312328
ExportReplicatedMergeTreePartitionProcessedPartEntry processed_part_entry;
313329
processed_part_entry.part_name = part_name;
314330
processed_part_entry.path_in_destination = relative_path_in_destination_storage;
315-
processed_part_entry.status = "SUCCESS";
316331
processed_part_entry.finished_by = storage.replica_name;
317332

318333
requests.emplace_back(zkutil::makeRemoveRequest(processing_parts_path / part_name, -1));

src/Storages/MergeTree/ExportPartitionUtils.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <Common/ZooKeeper/ZooKeeper.h>
33
#include <Common/logger_useful.h>
44
#include "Storages/ExportReplicatedMergeTreePartitionManifest.h"
5+
#include "Storages/ExportReplicatedMergeTreePartitionTaskEntry.h"
56
#include <filesystem>
67

78
namespace DB
@@ -80,7 +81,7 @@ namespace ExportPartitionUtils
8081
destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, context);
8182

8283
LOG_INFO(log, "ExportPartition: Committed export, mark as completed");
83-
if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status", "COMPLETED", -1))
84+
if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(), -1))
8485
{
8586
LOG_INFO(log, "ExportPartition: Marked export as completed");
8687
}

src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime)
4040
manifest3.destination_table = "table1";
4141
manifest3.create_time = base_time; // Oldest
4242

43-
ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, {}};
44-
ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, {}};
45-
ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, {}};
43+
ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}};
44+
ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}};
45+
ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}};
4646

4747
// Insert in reverse order
4848
by_key.insert(entry1);

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8191,7 +8191,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
81918191
for (const auto & part : part_names)
81928192
{
81938193
ExportReplicatedMergeTreePartitionProcessingPartEntry entry;
8194-
entry.status = "PENDING";
8194+
entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::PENDING;
81958195
entry.part_name = part;
81968196
entry.retry_count = 0;
81978197

tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -553,16 +553,15 @@ def test_export_ttl(cluster):
553553
time.sleep(expiration_time)
554554

555555
# assert that the export succeeded, check the commit file
556-
assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '1\n', "Export did not succeed"
556+
# there should be two commit files now, one for the first export and one for the second export
557+
assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '2\n', "Export did not succeed"
557558

558559

559-
# export an individual part with alter table export part
560-
# and then try to export the partition. It should not fail because export partition is idempotent.
561-
def test_export_part_and_partition(cluster):
560+
def test_export_partition_file_already_exists_policy(cluster):
562561
node = cluster.instances["replica1"]
563562

564-
mt_table = "export_part_and_partition_mt_table"
565-
s3_table = "export_part_and_partition_s3_table"
563+
mt_table = "export_partition_file_already_exists_policy_mt_table"
564+
s3_table = "export_partition_file_already_exists_policy_s3_table"
566565

567566
create_tables_and_insert_data(node, mt_table, s3_table, "replica1")
568567

0 commit comments

Comments
 (0)