@@ -128,6 +128,19 @@ static void addMissedColumnsToSerializationInfos(
128128 }
129129}
130130
131+ bool MergeTask::GlobalRuntimeContext::isCancelled () const
132+ {
133+ return (future_part ? merges_blocker->isCancelledForPartition (future_part->part_info .partition_id ) : merges_blocker->isCancelled ())
134+ || merge_list_element_ptr->is_cancelled .load (std::memory_order_relaxed);
135+ }
136+
137+ void MergeTask::GlobalRuntimeContext::checkOperationIsNotCanceled () const
138+ {
139+ if (isCancelled ())
140+ {
141+ throw Exception (ErrorCodes::ABORTED, " Cancelled merging parts" );
142+ }
143+ }
131144
132145bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare ()
133146{
@@ -140,8 +153,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
140153 }
141154 const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : " " ;
142155
143- if (global_ctx->merges_blocker ->isCancelled () || global_ctx->merge_list_element_ptr ->is_cancelled .load (std::memory_order_relaxed))
144- throw Exception (ErrorCodes::ABORTED, " Cancelled merging parts" );
156+ global_ctx->checkOperationIsNotCanceled ();
145157
146158 // / We don't want to perform merge assigned with TTL as normal merge, so
147159 // / throw exception
@@ -391,9 +403,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
391403 ctx->is_cancelled = [merges_blocker = global_ctx->merges_blocker ,
392404 ttl_merges_blocker = global_ctx->ttl_merges_blocker ,
393405 need_remove = ctx->need_remove_expired_values ,
394- merge_list_element = global_ctx->merge_list_element_ptr ]() -> bool
406+ merge_list_element = global_ctx->merge_list_element_ptr ,
407+ partition_id = global_ctx->future_part ->part_info .partition_id ]() -> bool
395408 {
396- return merges_blocker->isCancelled ( )
409+ return merges_blocker->isCancelledForPartition (partition_id )
397410 || (need_remove && ttl_merges_blocker->isCancelled ())
398411 || merge_list_element->is_cancelled .load (std::memory_order_relaxed);
399412 };
@@ -478,8 +491,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
478491 global_ctx->merging_executor .reset ();
479492 global_ctx->merged_pipeline .reset ();
480493
481- if (global_ctx->merges_blocker ->isCancelled () || global_ctx->merge_list_element_ptr ->is_cancelled .load (std::memory_order_relaxed))
482- throw Exception (ErrorCodes::ABORTED, " Cancelled merging parts" );
494+ global_ctx->checkOperationIsNotCanceled ();
483495
484496 if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker ->isCancelled ())
485497 throw Exception (ErrorCodes::ABORTED, " Cancelled merging parts with expired TTL" );
@@ -607,7 +619,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
607619 ctx->column_parts_pipeline .setProgressCallback (MergeProgressCallback (
608620 global_ctx->merge_list_element_ptr ,
609621 global_ctx->watch_prev_elapsed ,
610- *global_ctx->column_progress ));
622+ *global_ctx->column_progress ,
623+ [&my_ctx = *global_ctx]() { my_ctx.checkOperationIsNotCanceled (); }));
611624
612625 // / Is calculated inside MergeProgressCallback.
613626 ctx->column_parts_pipeline .disableProfileEventUpdate ();
@@ -634,8 +647,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
634647bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn () const
635648{
636649 Block block;
637- if (!global_ctx->merges_blocker ->isCancelled () && !global_ctx->merge_list_element_ptr ->is_cancelled .load (std::memory_order_relaxed)
638- && ctx->executor ->pull (block))
650+ if (!global_ctx->isCancelled () && ctx->executor ->pull (block))
639651 {
640652 ctx->column_elems_written += block.rows ();
641653 ctx->column_to ->write (block);
@@ -650,8 +662,7 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
650662void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn () const
651663{
652664 const String & column_name = ctx->it_name_and_type ->name ;
653- if (global_ctx->merges_blocker ->isCancelled () || global_ctx->merge_list_element_ptr ->is_cancelled .load (std::memory_order_relaxed))
654- throw Exception (ErrorCodes::ABORTED, " Cancelled merging parts" );
665+ global_ctx->checkOperationIsNotCanceled ();
655666
656667 ctx->executor .reset ();
657668 auto changed_checksums = ctx->column_to ->fillChecksums (global_ctx->new_data_part , global_ctx->checksums_gathered_columns );
@@ -1120,7 +1131,11 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
11201131
11211132 global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline (std::move (*builder));
11221133 // / Dereference unique_ptr and pass horizontal_stage_progress by reference
1123- global_ctx->merged_pipeline .setProgressCallback (MergeProgressCallback (global_ctx->merge_list_element_ptr , global_ctx->watch_prev_elapsed , *global_ctx->horizontal_stage_progress ));
1134+ global_ctx->merged_pipeline .setProgressCallback (MergeProgressCallback (
1135+ global_ctx->merge_list_element_ptr ,
1136+ global_ctx->watch_prev_elapsed ,
1137+ *global_ctx->horizontal_stage_progress ,
1138+ [&my_ctx = *global_ctx]() { my_ctx.checkOperationIsNotCanceled (); }));
11241139 // / Is calculated inside MergeProgressCallback.
11251140 global_ctx->merged_pipeline .disableProfileEventUpdate ();
11261141
0 commit comments