Skip to content

Commit 0d32ced

Browse files
antaljanosbenjaminarthurpassos
authored andcommitted
Check cancellation in progress callback
1 parent c081730 commit 0d32ced

File tree

3 files changed

+45
-18
lines changed

3 files changed

+45
-18
lines changed

src/Storages/MergeTree/MergeProgress.h

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#pragma once
22

3-
#include <base/types.h>
4-
#include <Common/ProfileEvents.h>
3+
#include <functional>
54
#include <IO/Progress.h>
65
#include <Storages/MergeTree/MergeList.h>
6+
#include <base/types.h>
7+
#include <Common/ProfileEvents.h>
78

89

910
namespace ProfileEvents
@@ -47,27 +48,23 @@ struct MergeStageProgress
4748
class MergeProgressCallback
4849
{
4950
public:
51+
// It should throw an exception in case the operation should be cancelled
52+
using CancellationChecker = std::function<void()>;
53+
5054
MergeProgressCallback(
51-
MergeListElement * merge_list_element_ptr_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
55+
MergeListElement * merge_list_element_ptr_,
56+
UInt64 & watch_prev_elapsed_,
57+
MergeStageProgress & stage_,
58+
CancellationChecker && cancellation_checker_)
5259
: merge_list_element_ptr(merge_list_element_ptr_)
5360
, watch_prev_elapsed(watch_prev_elapsed_)
5461
, stage(stage_)
62+
, cancellation_checker(std::move(cancellation_checker_))
5563
{
5664
updateWatch();
5765
}
5866

59-
MergeListElement * merge_list_element_ptr;
60-
UInt64 & watch_prev_elapsed;
61-
MergeStageProgress & stage;
62-
63-
void updateWatch()
64-
{
65-
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
66-
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
67-
watch_prev_elapsed = watch_curr_elapsed;
68-
}
69-
70-
void operator() (const Progress & value)
67+
void operator()(const Progress & value)
7168
{
7269
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
7370
if (stage.is_first)
@@ -77,6 +74,8 @@ class MergeProgressCallback
7774
}
7875
updateWatch();
7976

77+
cancellation_checker();
78+
8079
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
8180
if (stage.is_first)
8281
merge_list_element_ptr->rows_read += value.read_rows;
@@ -90,6 +89,25 @@ class MergeProgressCallback
9089
std::memory_order_relaxed);
9190
}
9291
}
92+
93+
private:
94+
MergeListElement * merge_list_element_ptr;
95+
UInt64 & watch_prev_elapsed;
96+
MergeStageProgress & stage;
97+
CancellationChecker cancellation_checker;
98+
99+
void updateWatch()
100+
{
101+
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
102+
watch_prev_elapsed = watch_curr_elapsed;
103+
}
104+
105+
void updateProfileEvents(const Progress & value, ProfileEvents::Event rows_event, ProfileEvents::Event bytes_event) const
106+
{
107+
ProfileEvents::increment(bytes_event, value.read_bytes);
108+
if (stage.is_first)
109+
ProfileEvents::increment(rows_event, value.read_rows);
110+
}
93111
};
94112

95113
}

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
607607
ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback(
608608
global_ctx->merge_list_element_ptr,
609609
global_ctx->watch_prev_elapsed,
610-
*global_ctx->column_progress));
610+
*global_ctx->column_progress,
611+
[&my_ctx = *global_ctx]() { my_ctx.checkOperationIsNotCanceled(); }));
611612

612613
/// Is calculated inside MergeProgressCallback.
613614
ctx->column_parts_pipeline.disableProfileEventUpdate();
@@ -1120,7 +1121,11 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
11201121

11211122
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
11221123
/// Dereference unique_ptr and pass horizontal_stage_progress by reference
1123-
global_ctx->merged_pipeline.setProgressCallback(MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress));
1124+
global_ctx->merged_pipeline.setProgressCallback(MergeProgressCallback(
1125+
global_ctx->merge_list_element_ptr,
1126+
global_ctx->watch_prev_elapsed,
1127+
*global_ctx->horizontal_stage_progress,
1128+
[&my_ctx = *global_ctx]() { my_ctx.checkOperationIsNotCanceled(); }));
11241129
/// Is calculated inside MergeProgressCallback.
11251130
global_ctx->merged_pipeline.disableProfileEventUpdate();
11261131

src/Storages/MergeTree/MutateTask.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2190,7 +2190,11 @@ bool MutateTask::prepare()
21902190
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
21912191
ctx->mutating_pipeline_builder = ctx->interpreter->execute();
21922192
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
2193-
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
2193+
ctx->progress_callback = MergeProgressCallback(
2194+
(*ctx->mutate_entry)->ptr(),
2195+
ctx->watch_prev_elapsed,
2196+
*ctx->stage_progress,
2197+
[&my_ctx = *ctx]() { my_ctx.checkOperationIsNotCanceled(); });
21942198
}
21952199

21962200
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0);

0 commit comments

Comments
 (0)