Skip to content

Commit 1f5235d

Browse files
committed
fix
1 parent ef878a2 commit 1f5235d

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

src/AggregateFunctions/UniqExactSet.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class UniqExactSet
123123
{
124124
while (true)
125125
{
126-
if (is_cancelled->load(std::memory_order_seq_cst))
126+
if (is_cancelled->load())
127127
return;
128128

129129
const auto bucket = next_bucket_to_merge->fetch_add(1);
@@ -139,6 +139,7 @@ class UniqExactSet
139139
}
140140
catch (...)
141141
{
142+
is_cancelled->store(true);
142143
runner.waitForAllToFinishAndRethrowFirstError();
143144
}
144145
}

src/Interpreters/Aggregator.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2353,7 +2353,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(AggregatedDataVariants &
23532353

23542354
/// Select Arena to avoid race conditions
23552355
Arena * arena = data_variants.aggregates_pools.at(thread_id).get();
2356-
res.at(thread_id).emplace_back(convertOneBucketToBlock(data_variants, method, arena, final, bucket));
2356+
res[thread_id].emplace_back(convertOneBucketToBlock(data_variants, method, arena, final, bucket));
23572357
}
23582358
};
23592359

@@ -3098,6 +3098,9 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
30983098
if (bucket > max_bucket)
30993099
break;
31003100

3101+
if (is_cancelled.load())
3102+
return;
3103+
31013104
for (Block & block : bucket_to_blocks[bucket])
31023105
{
31033106
/// Copy to avoid race.
@@ -3134,6 +3137,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
31343137
}
31353138
catch (...)
31363139
{
3140+
is_cancelled.store(true);
31373141
runner.waitForAllToFinishAndRethrowFirstError();
31383142
}
31393143
}

0 commit comments

Comments
 (0)