Skip to content

Commit a480434

Browse files
authored
Merge pull request ClickHouse#78703 from ClickHouse/parallel_uniq_exact_merging_on_initiator
Parallel `uniqExact` merging on initiator
2 parents 865cf34 + 1f5235d commit a480434

File tree

14 files changed

+191
-169
lines changed

14 files changed

+191
-169
lines changed

src/AggregateFunctions/Combinators/AggregateFunctionIf.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,11 @@ class AggregateFunctionIf final : public IAggregateFunctionHelper<AggregateFunct
183183
AggregateDataPtr * places,
184184
size_t place_offset,
185185
const AggregateDataPtr * rhs,
186+
ThreadPool & thread_pool,
187+
std::atomic<bool> & is_cancelled,
186188
Arena * arena) const override
187189
{
188-
nested_func->mergeBatch(row_begin, row_end, places, place_offset, rhs, arena);
190+
nested_func->mergeBatch(row_begin, row_end, places, place_offset, rhs, thread_pool, is_cancelled, arena);
189191
}
190192

191193
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override

src/AggregateFunctions/Combinators/AggregateFunctionOrFill.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,11 @@ class AggregateFunctionOrFill final : public IAggregateFunctionHelper<AggregateF
223223
AggregateDataPtr * places,
224224
size_t place_offset,
225225
const AggregateDataPtr * rhs,
226+
ThreadPool & thread_pool,
227+
std::atomic<bool> & is_cancelled,
226228
Arena * arena) const override
227229
{
228-
nested_function->mergeBatch(row_begin, row_end, places, place_offset, rhs, arena);
230+
nested_function->mergeBatch(row_begin, row_end, places, place_offset, rhs, thread_pool, is_cancelled, arena);
229231
for (size_t i = row_begin; i < row_end; ++i)
230232
(places[i] + place_offset)[size_of_data] |= rhs[i][size_of_data];
231233
}

src/AggregateFunctions/IAggregateFunction.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunctio
245245
AggregateDataPtr * places,
246246
size_t place_offset,
247247
const AggregateDataPtr * rhs,
248+
ThreadPool & thread_pool,
249+
std::atomic<bool> & is_cancelled,
248250
Arena * arena) const = 0;
249251

250252
/** The same for single place.
@@ -509,11 +511,20 @@ class IAggregateFunctionHelper : public IAggregateFunction
509511
AggregateDataPtr * places,
510512
size_t place_offset,
511513
const AggregateDataPtr * rhs,
514+
ThreadPool & thread_pool,
515+
std::atomic<bool> & is_cancelled,
512516
Arena * arena) const override
513517
{
514518
for (size_t i = row_begin; i < row_end; ++i)
519+
{
515520
if (places[i])
516-
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
521+
{
522+
if constexpr (Derived::parallelizeMergeWithKey())
523+
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], thread_pool, is_cancelled, arena);
524+
else
525+
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
526+
}
527+
}
517528
}
518529

519530
void mergeAndDestroyBatch(AggregateDataPtr * dst_places, AggregateDataPtr * rhs_places, size_t size, size_t offset, ThreadPool & thread_pool, std::atomic<bool> & is_cancelled, Arena * arena) const override

src/AggregateFunctions/UniqExactSet.h

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
#pragma once
22

3-
#include <exception>
43
#include <Common/CurrentThread.h>
54
#include <Common/HashTable/HashSet.h>
65
#include <Common/ThreadPool.h>
76
#include <Common/scope_guard_safe.h>
87
#include <Common/setThreadName.h>
9-
8+
#include <Common/threadPoolCallbackRunner.h>
109

1110
namespace DB
1211
{
@@ -94,13 +93,6 @@ class UniqExactSet
9493

9594
auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr, std::atomic<bool> * is_cancelled = nullptr)
9695
{
97-
/// If the size is large, we may convert the singleLevelHash to twoLevelHash and merge in parallel.
98-
if (other.size() > 40000)
99-
{
100-
if (isSingleLevel())
101-
convertToTwoLevel();
102-
}
103-
10496
if (isSingleLevel() && other.isTwoLevel())
10597
convertToTwoLevel();
10698

@@ -122,17 +114,16 @@ class UniqExactSet
122114
}
123115
else
124116
{
117+
ThreadPoolCallbackRunnerLocal<void> runner(*thread_pool, "UniqExactMerger");
125118
try
126119
{
127120
auto next_bucket_to_merge = std::make_shared<std::atomic_uint32_t>(0);
128121

129122
auto thread_func = [&lhs, &rhs, next_bucket_to_merge, is_cancelled, thread_group = CurrentThread::getGroup()]()
130123
{
131-
ThreadGroupSwitcher switcher(thread_group, "UniqExactMerger");
132-
133124
while (true)
134125
{
135-
if (is_cancelled->load(std::memory_order_seq_cst))
126+
if (is_cancelled->load())
136127
return;
137128

138129
const auto bucket = next_bucket_to_merge->fetch_add(1);
@@ -143,13 +134,13 @@ class UniqExactSet
143134
};
144135

145136
for (size_t i = 0; i < std::min<size_t>(thread_pool->getMaxThreads(), rhs.NUM_BUCKETS); ++i)
146-
thread_pool->scheduleOrThrowOnError(thread_func);
147-
thread_pool->wait();
137+
runner(thread_func, Priority{});
138+
runner.waitForAllToFinishAndRethrowFirstError();
148139
}
149140
catch (...)
150141
{
151-
thread_pool->wait();
152-
throw;
142+
is_cancelled->store(true);
143+
runner.waitForAllToFinishAndRethrowFirstError();
153144
}
154145
}
155146
}

0 commit comments

Comments
 (0)