Skip to content

Commit 0c0b85f

Browse files
committed
refactor kill so that it sets status instead of removing entry, adds some logs
1 parent cf13ec2 commit 0c0b85f

File tree

9 files changed

+211
-102
lines changed

9 files changed

+211
-102
lines changed

src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
1919
{
2020
PENDING,
2121
COMPLETED,
22-
FAILED
22+
FAILED,
23+
KILLED
2324
};
2425

2526
/// Allows us to skip completed / failed entries during scheduling
@@ -37,6 +38,11 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
3738
return manifest.partition_id + "_" + qualified_table_name.getFullName();
3839
}
3940

41+
std::string getTransactionId() const
42+
{
43+
return manifest.transaction_id;
44+
}
45+
4046
/// Get create_time for sorted iteration
4147
time_t getCreateTime() const
4248
{
@@ -46,6 +52,7 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
4652

4753
struct ExportPartitionTaskEntryTagByCompositeKey {};
4854
struct ExportPartitionTaskEntryTagByCreateTime {};
55+
struct ExportPartitionTaskEntryTagByTransactionId {};
4956

5057
// Multi-index container for export partition task entries
5158
// - Index 0 (TagByCompositeKey): hashed_unique on composite key for O(1) lookup
@@ -60,6 +67,10 @@ using ExportPartitionTaskEntriesContainer = boost::multi_index_container<
6067
boost::multi_index::ordered_non_unique<
6168
boost::multi_index::tag<ExportPartitionTaskEntryTagByCreateTime>,
6269
boost::multi_index::const_mem_fun<ExportReplicatedMergeTreePartitionTaskEntry, time_t, &ExportReplicatedMergeTreePartitionTaskEntry::getCreateTime>
70+
>,
71+
boost::multi_index::hashed_unique<
72+
boost::multi_index::tag<ExportPartitionTaskEntryTagByTransactionId>,
73+
boost::multi_index::const_mem_fun<ExportReplicatedMergeTreePartitionTaskEntry, std::string, &ExportReplicatedMergeTreePartitionTaskEntry::getTransactionId>
6374
>
6475
>
6576
>;

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace ErrorCodes
3030
extern const int UNKNOWN_TABLE;
3131
extern const int FILE_ALREADY_EXISTS;
3232
extern const int LOGICAL_ERROR;
33+
extern const int QUERY_WAS_CANCELLED;
3334
}
3435

3536
namespace Setting
@@ -201,12 +202,18 @@ bool ExportPartTask::executeStep()
201202

202203
auto is_cancelled_callback = [this]()
203204
{
204-
return storage.parts_mover.moves_blocker.isCancelled();
205+
return isCancelled();
205206
};
206207

207208
exec.setCancelCallback(is_cancelled_callback, 100);
208209

209210
exec.execute();
211+
212+
if (isCancelled())
213+
{
214+
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Export part was cancelled");
215+
}
216+
210217
std::lock_guard inner_lock(storage.export_manifests_mutex);
211218
storage.writePartLog(
212219
PartLogElement::Type::EXPORT_PART,
@@ -257,9 +264,15 @@ bool ExportPartTask::executeStep()
257264

258265
void ExportPartTask::cancel() noexcept
259266
{
267+
cancel_requested.store(true);
260268
pipeline.cancel();
261269
}
262270

271+
bool ExportPartTask::isCancelled() const
272+
{
273+
return cancel_requested.load() || storage.parts_mover.moves_blocker.isCancelled();
274+
}
275+
263276
void ExportPartTask::onCompleted()
264277
{
265278
}

src/Storages/MergeTree/ExportPartTask.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ class ExportPartTask : public IExecutableTask
2727
MergeTreePartExportManifest manifest;
2828
ContextPtr local_context;
2929
QueryPipeline pipeline;
30+
std::atomic<bool> cancel_requested = false;
31+
32+
bool isCancelled() const;
3033
};
3134

3235
}

src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <Storages/StorageReplicatedMergeTree.h>
33
#include <Storages/ExportReplicatedMergeTreePartitionTaskEntry.h>
44
#include "Storages/MergeTree/ExportPartitionUtils.h"
5+
#include "Common/logger_useful.h"
56
#include <Common/ZooKeeper/Types.h>
67
#include <Common/ZooKeeper/ZooKeeper.h>
78
#include <Interpreters/DatabaseCatalog.h>
@@ -154,10 +155,12 @@ ExportPartitionManifestUpdatingTask::ExportPartitionManifestUpdatingTask(Storage
154155
{
155156
}
156157

157-
void ExportPartitionManifestUpdatingTask::run()
158+
void ExportPartitionManifestUpdatingTask::poll()
158159
{
159160
std::lock_guard lock(storage.export_merge_tree_partition_mutex);
160161

162+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Polling for new entries for table {}. Current number of entries: {}", storage.getStorageID().getNameForLogs(), storage.export_merge_tree_partition_task_entries_by_key.size());
163+
161164
auto zk = storage.getZooKeeper();
162165

163166
const std::string exports_path = fs::path(storage.zookeeper_path) / "exports";
@@ -200,8 +203,13 @@ void ExportPartitionManifestUpdatingTask::run()
200203
if (!cleanup_lock.is_locked && has_local_entry_and_is_up_to_date)
201204
continue;
202205

206+
auto status_watch_callback = std::make_shared<Coordination::WatchCallback>([this, key](const Coordination::WatchResponse &) {
207+
storage.export_merge_tree_partition_manifest_updater->addStatusChange(key);
208+
storage.export_merge_tree_partition_status_handling_task->schedule();
209+
});
210+
203211
std::string status;
204-
if (!zk->tryGet(fs::path(entry_path) / "status", status))
212+
if (!zk->tryGetWatch(fs::path(entry_path) / "status", status, nullptr, status_watch_callback))
205213
{
206214
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing status", key);
207215
continue;
@@ -245,6 +253,8 @@ void ExportPartitionManifestUpdatingTask::run()
245253
/// Remove entries that were deleted by someone else
246254
removeStaleEntries(zk_children, entries_by_key);
247255

256+
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: finished polling for new entries. Number of entries: {}", entries_by_key.size());
257+
248258
storage.export_merge_tree_partition_select_task->schedule();
249259
}
250260

@@ -303,4 +313,62 @@ void ExportPartitionManifestUpdatingTask::removeStaleEntries(
303313
}
304314
}
305315

316+
void ExportPartitionManifestUpdatingTask::addStatusChange(const std::string & key)
317+
{
318+
std::lock_guard lock(status_changes_mutex);
319+
status_changes.emplace(key);
320+
}
321+
322+
void ExportPartitionManifestUpdatingTask::handleStatusChanges()
323+
{
324+
std::lock_guard lock(status_changes_mutex);
325+
std::lock_guard task_entries_lock(storage.export_merge_tree_partition_mutex);
326+
auto zk = storage.getZooKeeper();
327+
328+
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status changes. Number of status changes: {}", status_changes.size());
329+
330+
while (!status_changes.empty())
331+
{
332+
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status change for task {}", status_changes.front());
333+
const auto key = status_changes.front();
334+
status_changes.pop();
335+
336+
auto it = storage.export_merge_tree_partition_task_entries_by_key.find(key);
337+
if (it == storage.export_merge_tree_partition_task_entries_by_key.end())
338+
continue;
339+
340+
/// get new status from zk
341+
std::string new_status_string;
342+
if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", new_status_string))
343+
{
344+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Failed to get new status for task {}, skipping", key);
345+
continue;
346+
}
347+
348+
const auto new_status = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(new_status_string);
349+
if (!new_status)
350+
{
351+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", new_status_string, key);
352+
continue;
353+
}
354+
355+
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: status changed for task {}. New status: {}", key, magic_enum::enum_name(*new_status).data());
356+
357+
/// If status changed to KILLED, cancel local export operations
358+
if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED)
359+
{
360+
try
361+
{
362+
storage.killExportPart(it->manifest.transaction_id);
363+
}
364+
catch (...)
365+
{
366+
tryLogCurrentException(storage.log, __PRETTY_FUNCTION__);
367+
}
368+
}
369+
370+
it->status = *new_status;
371+
}
372+
}
373+
306374
}

src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include <mutex>
4+
#include <queue>
35
#include <string>
46
#include <unordered_set>
57
namespace DB
@@ -13,7 +15,11 @@ class ExportPartitionManifestUpdatingTask
1315
public:
1416
ExportPartitionManifestUpdatingTask(StorageReplicatedMergeTree & storage);
1517

16-
void run();
18+
void poll();
19+
20+
void handleStatusChanges();
21+
22+
void addStatusChange(const std::string & key);
1723

1824
private:
1925
StorageReplicatedMergeTree & storage;
@@ -28,6 +34,9 @@ class ExportPartitionManifestUpdatingTask
2834
const std::unordered_set<std::string> & zk_children,
2935
auto & entries_by_key
3036
);
37+
38+
std::mutex status_changes_mutex;
39+
std::queue<std::string> status_changes;
3140
};
3241

3342
}

src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
176176
storage.part_check_thread.start();
177177
storage.export_merge_tree_partition_updating_task->activateAndSchedule();
178178
storage.export_merge_tree_partition_select_task->activateAndSchedule();
179+
storage.export_merge_tree_partition_status_handling_task->activateAndSchedule();
179180

180181
LOG_DEBUG(log, "Table started successfully");
181182
return true;

0 commit comments

Comments
 (0)