Skip to content

Commit 4e66e69

Browse files
authored
Merge pull request ClickHouse#78724 from ClickHouse/parallel_uniq_exact_fix
Fix parallel `uniqExact` merging for aggregation with key
2 parents 5ce1364 + 31e6b23 commit 4e66e69

File tree

4 files changed

+100
-17
lines changed

4 files changed

+100
-17
lines changed

src/AggregateFunctions/AggregateFunctionUniq.h

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -276,13 +276,13 @@ struct Adder
276276
{
277277
/// We have to introduce this template parameter (and a bunch of ugly code dealing with it), because we cannot
278278
/// add runtime branches in whatever_hash_set::insert - it will immediately pop up in the perf top.
279-
template <bool use_single_level_hash_table = true>
279+
template <SetLevelHint hint = Data::is_able_to_parallelize_merge ? SetLevelHint::unknown : SetLevelHint::singleLevel>
280280
static void ALWAYS_INLINE add(Data & data, const IColumn ** columns, size_t num_args, size_t row_num)
281281
{
282282
if constexpr (Data::is_variadic)
283283
{
284284
if constexpr (IsUniqExactSet<typename Data::Set>::value)
285-
data.set.template insert<T, use_single_level_hash_table>(
285+
data.set.template insert<T, hint>(
286286
UniqVariadicHash<Data::is_exact, Data::argument_is_tuple>::apply(num_args, columns, row_num));
287287
else
288288
data.set.insert(T{UniqVariadicHash<Data::is_exact, Data::argument_is_tuple>::apply(num_args, columns, row_num)});
@@ -316,12 +316,11 @@ struct Adder
316316
hash.update(value.data, value.size);
317317
const auto key = hash.get128();
318318

319-
data.set.template insert<const UInt128 &, use_single_level_hash_table>(key);
319+
data.set.template insert<const UInt128 &, hint>(key);
320320
}
321321
else
322322
{
323-
data.set.template insert<const T &, use_single_level_hash_table>(
324-
assert_cast<const ColumnVector<T> &>(column).getData()[row_num]);
323+
data.set.template insert<const T &, hint>(assert_cast<const ColumnVector<T> &>(column).getData()[row_num]);
325324
}
326325
}
327326
#if USE_DATASKETCHES
@@ -341,9 +340,9 @@ struct Adder
341340
use_single_level_hash_table = data.set.isSingleLevel();
342341

343342
if (use_single_level_hash_table)
344-
addImpl<true>(data, columns, num_args, row_begin, row_end, flags, null_map);
343+
addImpl<SetLevelHint::singleLevel>(data, columns, num_args, row_begin, row_end, flags, null_map);
345344
else
346-
addImpl<false>(data, columns, num_args, row_begin, row_end, flags, null_map);
345+
addImpl<SetLevelHint::twoLevel>(data, columns, num_args, row_begin, row_end, flags, null_map);
347346

348347
if constexpr (Data::is_able_to_parallelize_merge)
349348
{
@@ -353,22 +352,28 @@ struct Adder
353352
}
354353

355354
private:
356-
template <bool use_single_level_hash_table>
357-
static void ALWAYS_INLINE
358-
addImpl(Data & data, const IColumn ** columns, size_t num_args, size_t row_begin, size_t row_end, const char8_t * flags, const UInt8 * null_map)
355+
template <SetLevelHint hint>
356+
static void ALWAYS_INLINE addImpl(
357+
Data & data,
358+
const IColumn ** columns,
359+
size_t num_args,
360+
size_t row_begin,
361+
size_t row_end,
362+
const char8_t * flags,
363+
const UInt8 * null_map)
359364
{
360365
if (!flags)
361366
{
362367
if (!null_map)
363368
{
364369
for (size_t row = row_begin; row < row_end; ++row)
365-
add<use_single_level_hash_table>(data, columns, num_args, row);
370+
add<hint>(data, columns, num_args, row);
366371
}
367372
else
368373
{
369374
for (size_t row = row_begin; row < row_end; ++row)
370375
if (!null_map[row])
371-
add<use_single_level_hash_table>(data, columns, num_args, row);
376+
add<hint>(data, columns, num_args, row);
372377
}
373378
}
374379
else
@@ -377,13 +382,13 @@ struct Adder
377382
{
378383
for (size_t row = row_begin; row < row_end; ++row)
379384
if (flags[row])
380-
add<use_single_level_hash_table>(data, columns, num_args, row);
385+
add<hint>(data, columns, num_args, row);
381386
}
382387
else
383388
{
384389
for (size_t row = row_begin; row < row_end; ++row)
385390
if (!null_map[row] && flags[row])
386-
add<use_single_level_hash_table>(data, columns, num_args, row);
391+
add<hint>(data, columns, num_args, row);
387392
}
388393
}
389394
}

src/AggregateFunctions/UniqExactSet.h

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ namespace ErrorCodes
1515
extern const int TOO_LARGE_ARRAY_SIZE;
1616
}
1717

18+
enum class SetLevelHint
19+
{
20+
singleLevel,
21+
twoLevel,
22+
unknown,
23+
};
24+
1825
template <typename SingleLevelSet, typename TwoLevelSet>
1926
class UniqExactSet
2027
{
@@ -24,13 +31,30 @@ class UniqExactSet
2431
public:
2532
using value_type = typename SingleLevelSet::value_type;
2633

27-
template <typename Arg, bool use_single_level_hash_table = true>
34+
template <typename Arg, SetLevelHint hint>
2835
auto ALWAYS_INLINE insert(Arg && arg)
2936
{
30-
if constexpr (use_single_level_hash_table)
37+
if constexpr (hint == SetLevelHint::singleLevel)
38+
{
3139
asSingleLevel().insert(std::forward<Arg>(arg));
32-
else
40+
}
41+
else if constexpr (hint == SetLevelHint::twoLevel)
42+
{
3343
asTwoLevel().insert(std::forward<Arg>(arg));
44+
}
45+
else
46+
{
47+
if (isSingleLevel())
48+
{
49+
auto && [_, inserted] = asSingleLevel().insert(std::forward<Arg>(arg));
50+
if (inserted && worthConvertingToTwoLevel(asSingleLevel().size()))
51+
convertToTwoLevel();
52+
}
53+
else
54+
{
55+
asTwoLevel().insert(std::forward<Arg>(arg));
56+
}
57+
}
3458
}
3559

3660
/// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel().
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<test>
2+
<create_query>
3+
CREATE TABLE test (a UInt64, b UInt64) ENGINE = MergeTree() ORDER BY () AS SELECT number, intDiv(number, 10) FROM numbers_mt(1e8);
4+
</create_query>
5+
6+
<substitutions>
7+
<substitution>
8+
<name>threads</name>
9+
<values>
10+
<value>1</value>
11+
<value>2</value>
12+
<value>4</value>
13+
<value>8</value>
14+
</values>
15+
</substitution>
16+
</substitutions>
17+
18+
<query>
19+
SELECT
20+
multiIf(a >= 100000 AND a &lt; 10000000, 0, 1) AS index,
21+
COUNT(DISTINCT b) AS total
22+
FROM test
23+
GROUP BY 1
24+
SETTINGS max_threads = {threads};
25+
</query>
26+
27+
<drop_query>
28+
DROP TABLE test;
29+
</drop_query>
30+
</test>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<test>
2+
<substitutions>
3+
<substitution>
4+
<name>uniq_keys</name>
5+
<values>
6+
<value>100</value>
7+
<value>500</value>
8+
<value>1000</value>
9+
<value>5000</value>
10+
<value>10000</value>
11+
<value>50000</value>
12+
<value>100000</value>
13+
</values>
14+
</substitution>
15+
</substitutions>
16+
17+
<create_query>create table t_{uniq_keys}(a UInt64, b UInt64) engine=MergeTree order by tuple()</create_query>
18+
19+
<fill_query>insert into t_{uniq_keys} select number, number % {uniq_keys} from numbers_mt(5e7)</fill_query>
20+
21+
<query>SELECT uniqExact(a) FROM t_{uniq_keys} group by b</query>
22+
23+
<drop_query>drop table t_{uniq_keys}</drop_query>
24+
</test>

0 commit comments

Comments
 (0)