Skip to content

Commit a93baca

Browse files
authored
Merge pull request ClickHouse#76185 from ClickHouse/parallel_hash_join_threshold
Parallel hash join threshold
2 parents ae43913 + c194ed0 commit a93baca

27 files changed

+327
-99
lines changed

src/Core/Settings.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6485,6 +6485,10 @@ As each series represents a node in Keeper, it is recommended to have no more th
64856485
)", 0) \
64866486
DECLARE(Bool, use_hive_partitioning, true, R"(
64876487
When enabled, ClickHouse will detect Hive-style partitioning in path (`/name=value/`) in file-like table engines [File](/sql-reference/table-functions/file#hive-style-partitioning)/[S3](/sql-reference/table-functions/s3#hive-style-partitioning)/[URL](/sql-reference/table-functions/url#hive-style-partitioning)/[HDFS](/sql-reference/table-functions/hdfs#hive-style-partitioning)/[AzureBlobStorage](/sql-reference/table-functions/azureBlobStorage#hive-style-partitioning) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
6488+
)", 0) \
6489+
DECLARE(UInt64, parallel_hash_join_threshold, 100'000, R"(
6490+
When hash-based join algorithm is applied, this threshold helps to decide between using `hash` and `parallel_hash` (only if estimation of the right table size is available).
6491+
The former is used when we know that the right table size is below the threshold.
64886492
)", 0) \
64896493
DECLARE(Bool, apply_settings_from_server, true, R"(
64906494
Whether the client should accept settings from server.

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7676
{"page_cache_block_size", 1048576, 1048576, "Made this setting adjustable on a per-query level."},
7777
{"page_cache_lookahead_blocks", 16, 16, "Made this setting adjustable on a per-query level."},
7878
{"output_format_pretty_glue_chunks", "0", "auto", "A new setting to make Pretty formats prettier."},
79+
{"parallel_hash_join_threshold", 0, 100'000, "New setting"},
7980
{"make_distributed_plan", 0, 0, "New experimental setting."},
8081
{"execute_distributed_plan_locally", 0, 0, "New experimental setting."},
8182
{"default_shuffle_join_bucket_count", 8, 8, "New experimental setting."},

src/Interpreters/Aggregator.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include <algorithm>
2+
#include <optional>
13
#include <Core/Settings.h>
24
#include <Poco/Util/Application.h>
35

@@ -39,10 +41,6 @@
3941
#include <Common/threadPoolCallbackRunner.h>
4042
#include <Common/typeid_cast.h>
4143

42-
#include <algorithm>
43-
#include <numeric>
44-
#include <optional>
45-
4644
namespace ProfileEvents
4745
{
4846
extern const Event ExternalAggregationWritePart;
@@ -140,7 +138,7 @@ void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, cons
140138
const auto median_size = sizes.begin() + sizes.size() / 2; // not precisely though...
141139
std::nth_element(sizes.begin(), median_size, sizes.end());
142140
const auto sum_of_sizes = std::accumulate(sizes.begin(), sizes.end(), 0ull);
143-
DB::getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
141+
DB::getHashTablesStatistics<DB::AggregationEntry>().update({.sum_of_sizes = sum_of_sizes, .median_size = *median_size}, params);
144142
}
145143

146144
DB::ColumnNumbers calculateKeysPositions(const DB::Block & header, const DB::Aggregator::Params & params)

src/Interpreters/ConcurrentHashJoin.cpp

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,17 @@ void updateStatistics(const auto & hash_joins, const DB::StatsCollectingParams &
7878
if (!params.isCollectionAndUseEnabled())
7979
return;
8080

81-
std::vector<size_t> sizes(hash_joins.size());
82-
for (size_t i = 0; i < hash_joins.size(); ++i)
83-
sizes[i] = hash_joins[i]->data->getTotalRowCount();
84-
const auto median_size = sizes.begin() + sizes.size() / 2; // not precisely though...
85-
std::nth_element(sizes.begin(), median_size, sizes.end());
86-
if (auto sum_of_sizes = std::accumulate(sizes.begin(), sizes.end(), 0ull))
87-
DB::getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
81+
const auto ht_size = hash_joins.at(0)->data->getTotalRowCount();
82+
if (!std::ranges::all_of(hash_joins, [&](const auto & hash_join) { return hash_join->data->getTotalRowCount() == ht_size; }))
83+
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "HashJoin instances have different sizes");
84+
85+
const auto source_rows = std::accumulate(
86+
hash_joins.begin(),
87+
hash_joins.end(),
88+
0ull,
89+
[](auto acc, const auto & hash_join) { return acc + hash_join->data->getJoinedData()->rows_to_join; });
90+
if (ht_size)
91+
DB::getHashTablesStatistics<DB::HashJoinEntry>().update({.ht_size = ht_size, .source_rows = source_rows}, params);
8892
}
8993

9094
UInt32 toPowerOfTwo(UInt32 x)
@@ -101,11 +105,11 @@ HashJoin::RightTableDataPtr getData(const std::shared_ptr<ConcurrentHashJoin::In
101105

102106
void reserveSpaceInHashMaps(HashJoin & hash_join, size_t ind, const StatsCollectingParams & stats_collecting_params, size_t slots)
103107
{
104-
if (auto hint = getSizeHint(stats_collecting_params, slots))
108+
if (auto hint = getSizeHint(stats_collecting_params))
105109
{
106110
/// Hash map is shared between all `HashJoin` instances, so the `median_size` is actually the total size
107111
/// we need to preallocate in all buckets of all hash maps.
108-
const size_t reserve_size = hint->median_size;
112+
const size_t reserve_size = hint->ht_size;
109113

110114
/// Each `HashJoin` instance will "own" a subset of buckets during the build phase. Because of that
111115
/// we preallocate space only in the specific buckets of each `HashJoin` instance.
@@ -205,7 +209,8 @@ ConcurrentHashJoin::~ConcurrentHashJoin()
205209
if (!hash_joins[0]->data->twoLevelMapIsUsed())
206210
return;
207211

208-
updateStatistics(hash_joins, stats_collecting_params);
212+
if (build_phase_finished)
213+
updateStatistics(hash_joins, stats_collecting_params);
209214

210215
for (size_t i = 0; i < slots; ++i)
211216
{
@@ -643,6 +648,8 @@ void ConcurrentHashJoin::onBuildPhaseFinish()
643648
hash_joins[i]->data->getUsedFlags() = hash_joins[0]->data->getUsedFlags();
644649
}
645650
}
651+
652+
build_phase_finished = true;
646653
}
647654
}
648655

src/Interpreters/ConcurrentHashJoin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ class ConcurrentHashJoin : public IJoin
9898
bool any_take_last_row;
9999
std::unique_ptr<ThreadPool> pool;
100100
std::vector<std::shared_ptr<InternalHashJoin>> hash_joins;
101+
bool build_phase_finished = false;
101102

102103
StatsCollectingParams stats_collecting_params;
103104

src/Interpreters/HashTablesStatistics.cpp

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ namespace ErrorCodes
1111
extern const int LOGICAL_ERROR;
1212
}
1313

14-
std::optional<HashTablesStatistics::Entry> HashTablesStatistics::getSizeHint(const Params & params)
14+
template <typename Entry>
15+
std::optional<Entry> HashTablesStatistics<Entry>::getSizeHint(const Params & params)
1516
{
1617
if (!params.isCollectionAndUseEnabled())
1718
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
@@ -20,19 +21,15 @@ std::optional<HashTablesStatistics::Entry> HashTablesStatistics::getSizeHint(con
2021
const auto cache = getHashTableStatsCache(params, lock);
2122
if (const auto hint = cache->get(params.key))
2223
{
23-
LOG_TRACE(
24-
getLogger("HashTablesStatistics"),
25-
"An entry for key={} found in cache: sum_of_sizes={}, median_size={}",
26-
params.key,
27-
hint->sum_of_sizes,
28-
hint->median_size);
24+
LOG_TRACE(getLogger("HashTablesStatistics"), "An entry for key={} found in cache: {}", params.key, hint->dump());
2925
return *hint;
3026
}
3127
return std::nullopt;
3228
}
3329

3430
/// Collection and use of the statistics should be enabled.
35-
void HashTablesStatistics::update(size_t sum_of_sizes, size_t median_size, const Params & params)
31+
template <typename Entry>
32+
void HashTablesStatistics<Entry>::update(const Entry & new_entry, const Params & params)
3633
{
3734
if (!params.isCollectionAndUseEnabled())
3835
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
@@ -41,20 +38,15 @@ void HashTablesStatistics::update(size_t sum_of_sizes, size_t median_size, const
4138
const auto cache = getHashTableStatsCache(params, lock);
4239
const auto hint = cache->get(params.key);
4340
// We'll maintain the maximum among all the observed values until another prediction is much lower (that should indicate some change)
44-
if (!hint || sum_of_sizes < hint->sum_of_sizes / 2 || hint->sum_of_sizes < sum_of_sizes || median_size < hint->median_size / 2
45-
|| hint->median_size < median_size)
41+
if (!hint || hint->shouldBeUpdated(new_entry))
4642
{
47-
LOG_TRACE(
48-
getLogger("HashTablesStatistics"),
49-
"Statistics updated for key={}: new sum_of_sizes={}, median_size={}",
50-
params.key,
51-
sum_of_sizes,
52-
median_size);
53-
cache->set(params.key, std::make_shared<Entry>(Entry{.sum_of_sizes = sum_of_sizes, .median_size = median_size}));
43+
LOG_TRACE(getLogger("HashTablesStatistics"), "Statistics updated for key={}: {}", params.key, new_entry.dump());
44+
cache->set(params.key, std::make_shared<Entry>(new_entry));
5445
}
5546
}
5647

57-
std::optional<HashTablesCacheStatistics> HashTablesStatistics::getCacheStats() const
48+
template <typename Entry>
49+
std::optional<HashTablesCacheStatistics> HashTablesStatistics<Entry>::getCacheStats() const
5850
{
5951
std::lock_guard lock(mutex);
6052
if (hash_table_stats)
@@ -67,29 +59,38 @@ std::optional<HashTablesCacheStatistics> HashTablesStatistics::getCacheStats() c
6759
return std::nullopt;
6860
}
6961

70-
HashTablesStatistics::CachePtr HashTablesStatistics::getHashTableStatsCache(const Params & params, const std::lock_guard<std::mutex> &)
62+
template <typename Entry>
63+
HashTablesStatistics<Entry>::CachePtr
64+
HashTablesStatistics<Entry>::getHashTableStatsCache(const Params & params, const std::lock_guard<std::mutex> &)
7165
{
7266
if (!hash_table_stats)
7367
hash_table_stats = std::make_shared<Cache>(params.max_entries_for_hash_table_stats * sizeof(Entry));
7468
return hash_table_stats;
7569
}
7670

77-
HashTablesStatistics & getHashTablesStatistics()
78-
{
79-
static HashTablesStatistics hash_tables_stats;
80-
return hash_tables_stats;
81-
}
82-
8371
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics()
8472
{
85-
return getHashTablesStatistics().getCacheStats();
73+
HashTablesCacheStatistics res{};
74+
if (auto aggr_stats = getHashTablesStatistics<AggregationEntry>().getCacheStats())
75+
{
76+
res.entries += aggr_stats->entries;
77+
res.hits += aggr_stats->hits;
78+
res.misses += aggr_stats->misses;
79+
}
80+
if (auto hash_join_stats = getHashTablesStatistics<HashJoinEntry>().getCacheStats())
81+
{
82+
res.entries += hash_join_stats->entries;
83+
res.hits += hash_join_stats->hits;
84+
res.misses += hash_join_stats->misses;
85+
}
86+
return res;
8687
}
8788

88-
std::optional<HashTablesStatistics::Entry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params, size_t tables_cnt)
89+
std::optional<AggregationEntry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params, size_t tables_cnt)
8990
{
9091
if (stats_collecting_params.isCollectionAndUseEnabled())
9192
{
92-
if (auto hint = DB::getHashTablesStatistics().getSizeHint(stats_collecting_params))
93+
if (auto hint = DB::getHashTablesStatistics<AggregationEntry>().getSizeHint(stats_collecting_params))
9394
{
9495
const auto lower_limit = hint->sum_of_sizes / tables_cnt;
9596
const auto upper_limit = stats_collecting_params.max_size_to_preallocate / tables_cnt;
@@ -109,10 +110,34 @@ std::optional<HashTablesStatistics::Entry> getSizeHint(const DB::StatsCollecting
109110
/// https://github.com/ClickHouse/ClickHouse/issues/44402#issuecomment-1359920703
110111
else if ((tables_cnt > 1 && hint->sum_of_sizes > 100'000) || hint->sum_of_sizes > 500'000)
111112
{
112-
return HashTablesStatistics::Entry{hint->sum_of_sizes, std::max(lower_limit, hint->median_size)};
113+
return AggregationEntry{hint->sum_of_sizes, std::max(lower_limit, hint->median_size)};
114+
}
115+
}
116+
}
117+
return std::nullopt;
118+
}
119+
120+
std::optional<HashJoinEntry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params)
121+
{
122+
if (stats_collecting_params.isCollectionAndUseEnabled())
123+
{
124+
if (auto hint = DB::getHashTablesStatistics<HashJoinEntry>().getSizeHint(stats_collecting_params))
125+
{
126+
if (hint->ht_size > stats_collecting_params.max_size_to_preallocate)
127+
{
128+
LOG_TRACE(
129+
getLogger("HashTablesStatistics"),
130+
"No space were preallocated in hash tables because 'max_size_to_preallocate' has too small value: {}, should be at "
131+
"least {}",
132+
stats_collecting_params.max_size_to_preallocate,
133+
hint->ht_size);
113134
}
135+
return hint;
114136
}
115137
}
116138
return std::nullopt;
117139
}
140+
141+
template class HashTablesStatistics<AggregationEntry>;
142+
template class HashTablesStatistics<HashJoinEntry>;
118143
}

src/Interpreters/HashTablesStatistics.h

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,47 @@ struct StatsCollectingParams
2626
bool isCollectionAndUseEnabled() const { return key != 0; }
2727
void disable() { key = 0; }
2828

29+
StatsCollectingParams & setKey(UInt64 key_)
30+
{
31+
key = key_;
32+
return *this;
33+
}
34+
2935
UInt64 key = 0;
3036
const size_t max_entries_for_hash_table_stats = 0;
3137
const size_t max_size_to_preallocate = 0;
3238
};
3339

40+
struct AggregationEntry
41+
{
42+
bool shouldBeUpdated(const AggregationEntry & new_entry) const
43+
{
44+
return new_entry.sum_of_sizes < sum_of_sizes / 2 || sum_of_sizes < new_entry.sum_of_sizes || new_entry.median_size < median_size / 2
45+
|| median_size < new_entry.median_size;
46+
}
47+
48+
std::string dump() const { return fmt::format("sum_of_sizes={}, median_size={}", sum_of_sizes, median_size); }
49+
50+
size_t sum_of_sizes; // used to determine if it's better to convert aggregation to two-level from the beginning
51+
size_t median_size; // roughly the size we're going to preallocate on each thread
52+
};
53+
54+
struct HashJoinEntry
55+
{
56+
bool shouldBeUpdated(const HashJoinEntry & new_entry) const { return new_entry.ht_size < ht_size / 2 || ht_size < new_entry.ht_size; }
57+
58+
std::string dump() const { return fmt::format("ht_size={}", ht_size); }
59+
60+
size_t ht_size; // the size of the shared hash table
61+
size_t source_rows; // the number of rows in the source table
62+
};
63+
3464
/** Collects observed HashTable-s sizes to avoid redundant intermediate resizes.
3565
*/
66+
template <typename Entry>
3667
class HashTablesStatistics
3768
{
3869
public:
39-
struct Entry
40-
{
41-
size_t sum_of_sizes; // used to determine if it's better to convert aggregation to two-level from the beginning
42-
size_t median_size; // roughly the size we're going to preallocate on each thread
43-
};
44-
4570
using Cache = DB::CacheBase<UInt64, Entry>;
4671
using CachePtr = std::shared_ptr<Cache>;
4772
using Params = StatsCollectingParams;
@@ -50,7 +75,7 @@ class HashTablesStatistics
5075
std::optional<Entry> getSizeHint(const Params & params);
5176

5277
/// Collection and use of the statistics should be enabled.
53-
void update(size_t sum_of_sizes, size_t median_size, const Params & params);
78+
void update(const Entry & new_entry, const Params & params);
5479

5580
std::optional<DB::HashTablesCacheStatistics> getCacheStats() const;
5681

@@ -61,9 +86,15 @@ class HashTablesStatistics
6186
CachePtr hash_table_stats;
6287
};
6388

64-
HashTablesStatistics & getHashTablesStatistics();
89+
template <typename Entry>
90+
HashTablesStatistics<Entry> & getHashTablesStatistics()
91+
{
92+
static HashTablesStatistics<Entry> hash_tables_stats;
93+
return hash_tables_stats;
94+
}
6595

6696
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics();
6797

68-
std::optional<HashTablesStatistics::Entry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params, size_t tables_cnt);
98+
std::optional<AggregationEntry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params, size_t tables_cnt);
99+
std::optional<HashJoinEntry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params);
69100
}

src/Interpreters/JoinInfo.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ namespace Setting
3939

4040
extern const SettingsBool collect_hash_table_stats_during_joins;
4141
extern const SettingsUInt64 max_size_to_preallocate_for_joins;
42+
extern const SettingsUInt64 parallel_hash_join_threshold;
4243

4344
extern const SettingsUInt64 max_joined_block_size_rows;
4445
extern const SettingsString temporary_files_codec;
@@ -73,6 +74,7 @@ namespace QueryPlanSerializationSetting
7374

7475
extern const QueryPlanSerializationSettingsBool collect_hash_table_stats_during_joins;
7576
extern const QueryPlanSerializationSettingsUInt64 max_size_to_preallocate_for_joins;
77+
extern const QueryPlanSerializationSettingsUInt64 parallel_hash_join_threshold;
7678

7779
extern const QueryPlanSerializationSettingsUInt64 max_joined_block_size_rows;
7880
extern const QueryPlanSerializationSettingsString temporary_files_codec;
@@ -115,6 +117,7 @@ JoinSettings::JoinSettings(const Settings & query_settings)
115117

116118
collect_hash_table_stats_during_joins = query_settings[Setting::collect_hash_table_stats_during_joins];
117119
max_size_to_preallocate_for_joins = query_settings[Setting::max_size_to_preallocate_for_joins];
120+
parallel_hash_join_threshold = query_settings[Setting::parallel_hash_join_threshold];
118121

119122
temporary_files_codec = query_settings[Setting::temporary_files_codec];
120123
join_output_by_rowlist_perkey_rows_threshold = query_settings[Setting::join_output_by_rowlist_perkey_rows_threshold];
@@ -148,6 +151,7 @@ JoinSettings::JoinSettings(const QueryPlanSerializationSettings & settings)
148151

149152
collect_hash_table_stats_during_joins = settings[QueryPlanSerializationSetting::collect_hash_table_stats_during_joins];
150153
max_size_to_preallocate_for_joins = settings[QueryPlanSerializationSetting::max_size_to_preallocate_for_joins];
154+
parallel_hash_join_threshold = settings[QueryPlanSerializationSetting::parallel_hash_join_threshold];
151155

152156
max_joined_block_size_rows = settings[QueryPlanSerializationSetting::max_joined_block_size_rows];
153157
temporary_files_codec = settings[QueryPlanSerializationSetting::temporary_files_codec];
@@ -185,6 +189,7 @@ void JoinSettings::updatePlanSettings(QueryPlanSerializationSettings & settings)
185189

186190
settings[QueryPlanSerializationSetting::collect_hash_table_stats_during_joins] = collect_hash_table_stats_during_joins;
187191
settings[QueryPlanSerializationSetting::max_size_to_preallocate_for_joins] = max_size_to_preallocate_for_joins;
192+
settings[QueryPlanSerializationSetting::parallel_hash_join_threshold] = parallel_hash_join_threshold;
188193

189194
settings[QueryPlanSerializationSetting::max_joined_block_size_rows] = max_joined_block_size_rows;
190195
settings[QueryPlanSerializationSetting::temporary_files_codec] = temporary_files_codec;

src/Interpreters/JoinInfo.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ struct JoinSettings
219219
/* Hash/Parallel hash join settings */
220220
bool collect_hash_table_stats_during_joins;
221221
UInt64 max_size_to_preallocate_for_joins;
222+
UInt64 parallel_hash_join_threshold;
222223
UInt64 join_output_by_rowlist_perkey_rows_threshold;
223224
bool allow_experimental_join_right_table_sorting;
224225
UInt64 join_to_sort_minimum_perkey_rows;

src/Planner/PlannerJoinTree.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1515,7 +1515,7 @@ std::tuple<QueryPlan, JoinPtr> buildJoinQueryPlan(
15151515
auto hash_table_stat_cache_key = preCalculateCacheKey(right_table_expression, select_query_info);
15161516
const auto cache_key_for_parallel_hash = calculateCacheKey(table_join, hash_table_stat_cache_key);
15171517
auto join_algorithm = chooseJoinAlgorithm(
1518-
table_join, prepared_join_storage, left_header, right_header, JoinAlgorithmSettings(*planner_context->getQueryContext()), cache_key_for_parallel_hash);
1518+
table_join, prepared_join_storage, left_header, right_header, JoinAlgorithmSettings(*planner_context->getQueryContext()), cache_key_for_parallel_hash, /*rhs_size_estimation=*/{});
15191519
auto result_plan = QueryPlan();
15201520

15211521
bool is_filled_join = join_algorithm->isFilled();

0 commit comments

Comments
 (0)