Skip to content

Commit f23ed2d

Browse files
committed
preserve order of tasks..
1 parent 4487431 commit f23ed2d

File tree

6 files changed

+156
-35
lines changed

6 files changed

+156
-35
lines changed

src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
#include <Storages/ExportReplicatedMergeTreePartitionManifest.h>
44
#include <Storages/MergeTree/IMergeTreeDataPart.h>
5+
#include "Core/QualifiedTableName.h"
6+
#include <boost/multi_index_container.hpp>
7+
#include <boost/multi_index/hashed_index.hpp>
8+
#include <boost/multi_index/ordered_index.hpp>
9+
#include <boost/multi_index/mem_fun.hpp>
510

611
namespace DB
712
{
@@ -15,6 +20,38 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
1520
/// It does not mean this replica will export all the parts
1621
/// There is also a chance this replica does not contain a given part and it is totally ok.
1722
std::vector<DataPartPtr> part_references;
23+
24+
std::string getCompositeKey() const
25+
{
26+
const auto qualified_table_name = QualifiedTableName {manifest.destination_database, manifest.destination_table};
27+
return manifest.partition_id + "_" + qualified_table_name.getFullName();
28+
}
29+
30+
/// Get create_time for sorted iteration
31+
time_t getCreateTime() const
32+
{
33+
return manifest.create_time;
34+
}
1835
};
1936

37+
struct ExportPartitionTaskEntryTagByCompositeKey {};
38+
struct ExportPartitionTaskEntryTagByCreateTime {};
39+
40+
// Multi-index container for export partition task entries
41+
// - Index 0 (TagByCompositeKey): hashed_unique on composite key for O(1) lookup
42+
// - Index 1 (TagByCreateTime): ordered_non_unique on create_time for sorted iteration
43+
using ExportPartitionTaskEntriesContainer = boost::multi_index_container<
44+
ExportReplicatedMergeTreePartitionTaskEntry,
45+
boost::multi_index::indexed_by<
46+
boost::multi_index::hashed_unique<
47+
boost::multi_index::tag<ExportPartitionTaskEntryTagByCompositeKey>,
48+
boost::multi_index::const_mem_fun<ExportReplicatedMergeTreePartitionTaskEntry, std::string, &ExportReplicatedMergeTreePartitionTaskEntry::getCompositeKey>
49+
>,
50+
boost::multi_index::ordered_non_unique<
51+
boost::multi_index::tag<ExportPartitionTaskEntryTagByCreateTime>,
52+
boost::multi_index::const_mem_fun<ExportReplicatedMergeTreePartitionTaskEntry, time_t, &ExportReplicatedMergeTreePartitionTaskEntry::getCreateTime>
53+
>
54+
>
55+
>;
56+
2057
}

src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,13 @@ void ExportPartitionManifestUpdatingTask::run()
9898

9999
const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json);
100100

101-
const auto local_entry = storage.export_merge_tree_partition_task_entries.find(key);
101+
auto & entries_by_key = storage.export_merge_tree_partition_task_entries_by_key;
102+
const auto local_entry = entries_by_key.find(key);
102103

103104
/// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough
104105
/// we need to make sure it is the same transaction id. If it is not, it needs to be replaced.
105-
bool has_local_entry_and_is_up_to_date = local_entry != storage.export_merge_tree_partition_task_entries.end()
106-
&& local_entry->second.manifest.transaction_id == metadata.transaction_id;
106+
bool has_local_entry_and_is_up_to_date = local_entry != entries_by_key.end()
107+
&& local_entry->manifest.transaction_id == metadata.transaction_id;
107108

108109
/// If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done.
109110
if (!cleanup_lock_acquired && has_local_entry_and_is_up_to_date)
@@ -125,7 +126,9 @@ void ExportPartitionManifestUpdatingTask::run()
125126
if (has_expired && is_not_pending)
126127
{
127128
zk->tryRemoveRecursive(fs::path(entry_path));
128-
storage.export_merge_tree_partition_task_entries.erase(key);
129+
auto it = entries_by_key.find(key);
130+
if (it != entries_by_key.end())
131+
entries_by_key.erase(it);
129132
LOG_INFO(storage.log, "ExportPartition: Removed {}: expired", key);
130133
continue;
131134
}
@@ -180,46 +183,46 @@ void ExportPartitionManifestUpdatingTask::run()
180183
}
181184
}
182185

183-
/// It is important to use the operator[] because it updates the existing entry if it already exists.
184-
storage.export_merge_tree_partition_task_entries[key] = ExportReplicatedMergeTreePartitionTaskEntry {metadata, std::move(part_references)};
186+
/// Insert or update entry. The multi_index container automatically maintains both indexes.
187+
auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, std::move(part_references)};
188+
auto it = entries_by_key.find(key);
189+
if (it != entries_by_key.end())
190+
entries_by_key.replace(it, entry);
191+
else
192+
entries_by_key.insert(entry);
185193
}
186194

187195
/// Remove entries that were deleted by someone else
188-
std::erase_if(storage.export_merge_tree_partition_task_entries,
189-
[&](auto const & kv)
196+
auto & entries_by_key = storage.export_merge_tree_partition_task_entries_by_key;
197+
for (auto it = entries_by_key.begin(); it != entries_by_key.end();)
198+
{
199+
const auto & key = it->getCompositeKey();
200+
if (zk_children.contains(key))
190201
{
191-
if (zk_children.contains(kv.first))
192-
{
193-
return false;
194-
}
202+
++it;
203+
continue;
204+
}
195205

196-
const auto & transaction_id = kv.second.manifest.transaction_id;
197-
LOG_INFO(storage.log, "ExportPartition: Export task {} was deleted, calling killExportPartition for transaction {}", kv.first, transaction_id);
198-
199-
try
200-
{
201-
storage.killExportPart(transaction_id);
202-
}
203-
catch (...)
204-
{
205-
tryLogCurrentException(storage.log, __PRETTY_FUNCTION__);
206-
}
206+
const auto & transaction_id = it->manifest.transaction_id;
207+
LOG_INFO(storage.log, "ExportPartition: Export task {} was deleted, calling killExportPartition for transaction {}", key, transaction_id);
208+
209+
try
210+
{
211+
storage.killExportPart(transaction_id);
212+
}
213+
catch (...)
214+
{
215+
tryLogCurrentException(storage.log, __PRETTY_FUNCTION__);
216+
}
207217

208-
return true;
209-
});
218+
it = entries_by_key.erase(it);
219+
}
210220

211221
if (cleanup_lock_acquired)
212222
{
213223
zk->tryRemove(cleanup_lock_path);
214224
}
215225

216-
/// todo arthur remember to sort the entries by create_time
217-
// std::sort(storage.export_merge_tree_partition_task_entries.begin(), storage.export_merge_tree_partition_task_entries.end(),
218-
// [](const auto & a, const auto & b)
219-
// {
220-
// return a.second.manifest.create_time < b.second.manifest.create_time;
221-
// });
222-
223226
storage.export_merge_tree_partition_select_task->schedule();
224227
}
225228

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ void ExportPartitionTaskScheduler::run()
2020

2121
auto zk = storage.getZooKeeper();
2222

23-
for (const auto & [key, entry] : storage.export_merge_tree_partition_task_entries)
23+
// Iterate sorted by create_time
24+
for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time)
2425
{
2526
const auto & manifest = entry.manifest;
27+
const auto key = entry.getCompositeKey();
2628
const auto & database = storage.getContext()->resolveDatabase(manifest.destination_database);
2729
const auto & table = manifest.destination_table;
2830

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include <gtest/gtest.h>
2+
#include <Storages/ExportReplicatedMergeTreePartitionTaskEntry.h>
3+
4+
namespace DB
5+
{
6+
7+
// Test fixture that uses the shared container definition
8+
class ExportPartitionOrderingTest : public ::testing::Test
9+
{
10+
protected:
11+
ExportPartitionTaskEntriesContainer container;
12+
ExportPartitionTaskEntriesContainer::index<ExportPartitionTaskEntryTagByCompositeKey>::type & by_key;
13+
ExportPartitionTaskEntriesContainer::index<ExportPartitionTaskEntryTagByCreateTime>::type & by_create_time;
14+
15+
ExportPartitionOrderingTest()
16+
: by_key(container.get<ExportPartitionTaskEntryTagByCompositeKey>())
17+
, by_create_time(container.get<ExportPartitionTaskEntryTagByCreateTime>())
18+
{
19+
}
20+
};
21+
22+
TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime)
23+
{
24+
// Create entries with different create_times (in reverse order)
25+
time_t base_time = 1000;
26+
27+
ExportReplicatedMergeTreePartitionManifest manifest1;
28+
manifest1.partition_id = "2020";
29+
manifest1.destination_database = "db1";
30+
manifest1.destination_table = "table1";
31+
manifest1.create_time = base_time + 300; // Latest
32+
33+
ExportReplicatedMergeTreePartitionManifest manifest2;
34+
manifest2.partition_id = "2021";
35+
manifest2.destination_database = "db1";
36+
manifest2.destination_table = "table1";
37+
manifest2.create_time = base_time + 100; // Middle
38+
39+
ExportReplicatedMergeTreePartitionManifest manifest3;
40+
manifest3.partition_id = "2022";
41+
manifest3.destination_database = "db1";
42+
manifest3.destination_table = "table1";
43+
manifest3.create_time = base_time; // Oldest
44+
45+
ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, {}};
46+
ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, {}};
47+
ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, {}};
48+
49+
// Insert in reverse order
50+
by_key.insert(entry1);
51+
by_key.insert(entry2);
52+
by_key.insert(entry3);
53+
54+
// Verify iteration order matches create_time (ascending)
55+
auto it = by_create_time.begin();
56+
ASSERT_NE(it, by_create_time.end());
57+
EXPECT_EQ(it->manifest.partition_id, "2022"); // Oldest first
58+
EXPECT_EQ(it->manifest.create_time, base_time);
59+
60+
++it;
61+
ASSERT_NE(it, by_create_time.end());
62+
EXPECT_EQ(it->manifest.partition_id, "2021");
63+
EXPECT_EQ(it->manifest.create_time, base_time + 100);
64+
65+
++it;
66+
ASSERT_NE(it, by_create_time.end());
67+
EXPECT_EQ(it->manifest.partition_id, "2020");
68+
EXPECT_EQ(it->manifest.create_time, base_time + 300);
69+
70+
++it;
71+
EXPECT_EQ(it, by_create_time.end());
72+
}
73+
74+
}

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
425425
, merge_strategy_picker(*this)
426426
, queue(*this, merge_strategy_picker)
427427
, fetcher(*this)
428+
, export_merge_tree_partition_task_entries_by_key(export_merge_tree_partition_task_entries.get<ExportPartitionTaskEntryTagByCompositeKey>())
429+
, export_merge_tree_partition_task_entries_by_create_time(export_merge_tree_partition_task_entries.get<ExportPartitionTaskEntryTagByCreateTime>())
428430
, cleanup_thread(*this)
429431
, async_block_ids_cache(*this)
430432
, part_check_thread(*this)
@@ -8080,7 +8082,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
80808082
const auto exports_path = fs::path(zookeeper_path) / "exports";
80818083
Coordination::Requests ops;
80828084

8083-
const auto export_key = partition_id + "_" + dest_storage_id.getNameForLogs();
8085+
const auto export_key = partition_id + "_" + dest_storage_id.getQualifiedName().getFullName();
80848086

80858087
const auto partition_exports_path = fs::path(exports_path) / export_key;
80868088

src/Storages/StorageReplicatedMergeTree.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,8 +531,11 @@ class StorageReplicatedMergeTree final : public MergeTreeData
531531

532532
BackgroundSchedulePoolTaskHolder export_merge_tree_partition_select_task;
533533

534+
ExportPartitionTaskEntriesContainer export_merge_tree_partition_task_entries;
534535

535-
std::unordered_map<std::string, ExportReplicatedMergeTreePartitionTaskEntry> export_merge_tree_partition_task_entries;
536+
// Convenience references to indexes
537+
ExportPartitionTaskEntriesContainer::index<ExportPartitionTaskEntryTagByCompositeKey>::type & export_merge_tree_partition_task_entries_by_key;
538+
ExportPartitionTaskEntriesContainer::index<ExportPartitionTaskEntryTagByCreateTime>::type & export_merge_tree_partition_task_entries_by_create_time;
536539
/// A thread that removes old parts, log entries, and blocks.
537540
ReplicatedMergeTreeCleanupThread cleanup_thread;
538541

0 commit comments

Comments
 (0)