Skip to content

Commit b571f5a

Browse files
committed
wip - refactor new design
1 parent d609d04 commit b571f5a

18 files changed

+942
-649
lines changed

src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
1010
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
1111
ExportReplicatedMergeTreePartitionManifest manifest;
1212

13-
std::size_t parts_to_do;
1413
/// References to the parts that should be exported
1514
/// This is used to prevent the parts from being deleted before finishing the export operation
1615
/// It does not mean this replica will export all the parts

src/Storages/MergeTree/ExportList.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,4 @@ UInt64 ExportsListElement::getPeakMemoryUsage() const
6363
return thread_group->memory_tracker.getPeak();
6464
}
6565

66-
void ExportsList::remove(const StorageID & source_table_id, const StorageID & destination_table_id, const String & part_name)
67-
{
68-
std::erase_if(entries, [source_table_id, destination_table_id, part_name](const auto & entry)
69-
{
70-
return entry.source_table_id == source_table_id && entry.destination_table_id == destination_table_id && entry.part_name == part_name;
71-
});
72-
}
73-
7466
}

src/Storages/MergeTree/ExportList.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ class ExportsList final : public BackgroundProcessList<ExportsListElement, Expor
8383
ExportsList()
8484
: Parent(CurrentMetrics::Export)
8585
{}
86-
87-
void remove(const StorageID & source_table_id, const StorageID & destination_table_id, const String & part_name);
8886
};
8987

9088
using ExportsListEntry = BackgroundProcessListEntry<ExportsListElement, ExportInfo>;

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ bool ExportPartTask::executeStep()
7474
{
7575
auto context_copy = Context::createCopy(local_context);
7676
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
77+
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_formatting);
7778

7879
sink = destination_storage->import(
7980
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
@@ -228,7 +229,7 @@ void ExportPartTask::onCompleted()
228229

229230
StorageID ExportPartTask::getStorageID() const
230231
{
231-
return manifest.destination_storage_id;
232+
return storage.getStorageID();
232233
}
233234

234235
Priority ExportPartTask::getPriority() const
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
#include <Storages/MergeTree/ExportPartitionManifestUpdatingTask.h>
2+
#include <Storages/StorageReplicatedMergeTree.h>
3+
#include <Storages/ExportReplicatedMergeTreePartitionTaskEntry.h>
4+
#include "Storages/MergeTree/ExportPartitionUtils.h"
5+
#include <Common/ZooKeeper/Types.h>
6+
#include <Common/ZooKeeper/ZooKeeper.h>
7+
#include <Interpreters/DatabaseCatalog.h>
8+
9+
namespace DB
10+
{
11+
12+
/// v2 of my initial design
13+
/*
14+
table_path/
15+
exports/
16+
<partition_id+destination_storage_id>/
17+
metadata.json -> {tid, partition_id, destination_id, create_time, ttl}
18+
parts/
19+
processing/ <-- not started, in progress
20+
part_1.json -> {retry_count, max_retry_count, path_in_destination}
21+
...
22+
part_n.json
23+
processed/
24+
part_1.json -> {retry_count, max_retry_count, path_in_destination}
25+
...
26+
part_n.json
27+
locks
28+
part_1 -> r1
29+
part_n -> rN
30+
cleanup_lock <--- ephemeral
31+
32+
One of the ideas behind this design is to reduce the number of required CAS loops.
33+
It should work as follows:
34+
35+
upon request, the structure should be created in zk in case it does not exist.
36+
37+
once the task is published in zk, replicas are notified there is a new task and will fetch it.
38+
39+
once they have it loaded locally, eventually the scheduler thread will run and try to lock individual parts in that task to export.
40+
41+
the lock process is kind of the following:
42+
43+
try to create an ephemeral node with the aprt name under the `locks` path. If it succeeded, the part is locked and the task will be scheduled within that replica.
44+
45+
if it fails, it means the part is already locked by another replica. Try the next part.
46+
47+
Once it completes, moves the part structure that lives under processing to processed with status either of failed or succeeded. If it failed, it'll also fail the entire task.
48+
49+
Also, once it completes a local part, after moving it to processed (a transaction). It tries to read `processing` to check if it is empty.
50+
51+
If it is empty, it means all parts have been exported and it is time to commit the export. Note that this is not transactional with the previous operation of moving the part to processed.
52+
53+
So it means there is a chance the last part will be exported, but the server might die right before checking processing path and will never commit. For this, the cleanup thread also helps
54+
55+
This is the overall idea, but please read the code to get a better understanding
56+
*/
57+
58+
ExportPartitionManifestUpdatingTask::ExportPartitionManifestUpdatingTask(StorageReplicatedMergeTree & storage_)
59+
: storage(storage_)
60+
{
61+
}
62+
63+
void ExportPartitionManifestUpdatingTask::run()
64+
{
65+
std::lock_guard lock(storage.export_merge_tree_partition_mutex);
66+
67+
auto zk = storage.getZooKeeper();
68+
69+
const std::string exports_path = fs::path(storage.zookeeper_path) / "exports";
70+
const std::string cleanup_lock_path = fs::path(storage.zookeeper_path) / "exports_cleanup_lock";
71+
72+
bool cleanup_lock_acquired = zk->tryCreate(cleanup_lock_path, "", ::zkutil::CreateMode::Ephemeral) == Coordination::Error::ZOK;
73+
74+
if (cleanup_lock_acquired)
75+
{
76+
LOG_INFO(storage.log, "ExportPartition: Cleanup lock acquired, will remove stale entries");
77+
}
78+
79+
Coordination::Stat stat;
80+
const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback);
81+
const std::unordered_set<std::string> zk_children(children.begin(), children.end());
82+
83+
const auto now = time(nullptr);
84+
85+
/// Load new entries
86+
/// If we have the cleanup lock, also remove stale entries from zk and local
87+
/// Upload dangling commit files if any
88+
for (const auto & key : zk_children)
89+
{
90+
const std::string entry_path = fs::path(exports_path) / key;
91+
92+
std::string metadata_json;
93+
if (!zk->tryGet(fs::path(entry_path) / "metadata.json", metadata_json))
94+
{
95+
LOG_INFO(storage.log, "ExportPartition: Skipping {}: missing metadata.json", key);
96+
continue;
97+
}
98+
99+
const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json);
100+
101+
const auto local_entry = storage.export_merge_tree_partition_task_entries.find(key);
102+
103+
/// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough
104+
/// we need to make sure it is the same transaction id. If it is not, it needs to be replaced.
105+
bool has_local_entry_and_is_up_to_date = local_entry != storage.export_merge_tree_partition_task_entries.end()
106+
&& local_entry->second.manifest.transaction_id == metadata.transaction_id;
107+
108+
/// If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done.
109+
if (!cleanup_lock_acquired && has_local_entry_and_is_up_to_date)
110+
continue;
111+
112+
std::string status;
113+
if (!zk->tryGet(fs::path(entry_path) / "status", status))
114+
{
115+
LOG_INFO(storage.log, "ExportPartition: Skipping {}: missing status", key);
116+
continue;
117+
}
118+
119+
bool is_not_pending = status != "PENDING";
120+
121+
if (cleanup_lock_acquired)
122+
{
123+
bool has_expired = metadata.create_time < now - 180;
124+
125+
if (has_expired && is_not_pending)
126+
{
127+
zk->tryRemoveRecursive(fs::path(entry_path));
128+
storage.export_merge_tree_partition_task_entries.erase(key);
129+
LOG_INFO(storage.log, "ExportPartition: Removed {}: expired", key);
130+
continue;
131+
}
132+
}
133+
134+
if (is_not_pending)
135+
{
136+
LOG_INFO(storage.log, "ExportPartition: Skipping {}: status is not PENDING", key);
137+
continue;
138+
}
139+
140+
141+
if (cleanup_lock_acquired)
142+
{
143+
std::vector<std::string> parts_in_processing_or_pending;
144+
if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(entry_path) / "processing", parts_in_processing_or_pending))
145+
{
146+
LOG_INFO(storage.log, "ExportPartition: Failed to get parts in processing or pending, skipping");
147+
continue;
148+
}
149+
150+
if (parts_in_processing_or_pending.empty())
151+
{
152+
LOG_INFO(storage.log, "ExportPartition: Cleanup found PENDING for {} with all parts exported, try to fix it by committing the export", entry_path);
153+
154+
const auto destination_storage_id = StorageID(QualifiedTableName {metadata.destination_database, metadata.destination_table});
155+
const auto destination_storage = DatabaseCatalog::instance().tryGetTable(destination_storage_id, storage.getContext());
156+
if (!destination_storage)
157+
{
158+
LOG_INFO(storage.log, "ExportPartition: Failed to reconstruct destination storage: {}, skipping", destination_storage_id.getNameForLogs());
159+
continue;
160+
}
161+
162+
/// it sounds like a replica exported the last part, but was not able to commit the export. Try to fix it
163+
ExportPartitionUtils::commit(metadata, destination_storage, zk, storage.log.load(), entry_path, storage.getContext());
164+
}
165+
}
166+
167+
if (has_local_entry_and_is_up_to_date)
168+
{
169+
LOG_INFO(storage.log, "ExportPartition: Skipping {}: already exists", key);
170+
continue;
171+
}
172+
173+
std::vector<DataPartPtr> part_references;
174+
175+
for (const auto & part_name : metadata.parts)
176+
{
177+
if (const auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}))
178+
{
179+
part_references.push_back(part);
180+
}
181+
}
182+
183+
/// It is important to use the operator[] because it updates the existing entry if it already exists.
184+
storage.export_merge_tree_partition_task_entries[key] = ExportReplicatedMergeTreePartitionTaskEntry {metadata, std::move(part_references)};
185+
}
186+
187+
/// Remove entries that were deleted by someone else
188+
std::erase_if(storage.export_merge_tree_partition_task_entries,
189+
[&](auto const & kv)
190+
{
191+
if (zk_children.contains(kv.first))
192+
{
193+
return false;
194+
}
195+
196+
const auto & transaction_id = kv.second.manifest.transaction_id;
197+
LOG_INFO(storage.log, "ExportPartition: Export task {} was deleted, calling killExportPartition for transaction {}", kv.first, transaction_id);
198+
199+
try
200+
{
201+
storage.killExportPart(transaction_id);
202+
}
203+
catch (...)
204+
{
205+
tryLogCurrentException(storage.log, __PRETTY_FUNCTION__);
206+
}
207+
208+
return true;
209+
});
210+
211+
if (cleanup_lock_acquired)
212+
{
213+
zk->tryRemove(cleanup_lock_path);
214+
}
215+
216+
/// todo arthur remember to sort the entries by create_time
217+
// std::sort(storage.export_merge_tree_partition_task_entries.begin(), storage.export_merge_tree_partition_task_entries.end(),
218+
// [](const auto & a, const auto & b)
219+
// {
220+
// return a.second.manifest.create_time < b.second.manifest.create_time;
221+
// });
222+
223+
storage.export_merge_tree_partition_select_task->schedule();
224+
}
225+
226+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
namespace DB
4+
{
5+
6+
class StorageReplicatedMergeTree;
7+
8+
class ExportPartitionManifestUpdatingTask
9+
{
10+
public:
11+
ExportPartitionManifestUpdatingTask(StorageReplicatedMergeTree & storage);
12+
13+
void run();
14+
15+
private:
16+
StorageReplicatedMergeTree & storage;
17+
};
18+
19+
}

0 commit comments

Comments
 (0)