Skip to content

Commit e7810c9

Browse files
liujiayi771meta-codesync[bot]
authored andcommitted
perf: Dedup build side rows early for left semi and anti joins (facebookincubator#7066)
Summary: As discussed in the facebookincubator#6577, during the hash build process, left semi join and anti join can deduplicate the input rows based on the join key. However, Velox's hash build addInput process adds all inputs to the RowContainer, which can result in significant memory wastage in certain scenarios, such as TPCDS Q14 and Q95. To address this, we can construct the hash table directly during data input and utilize the existing allowDuplicates parameter of the hashTable to remove duplicate data without storing it in the RowContainer. This process is similar to constructing a hash table in the hash aggregation process. Due to Velox's hash build potentially having multiple drivers executing, in this scenario, duplicate data can only be removed for individual driver inputs. However, in the case of single-driver execution mode, it is possible to remove all duplicate data. Fixes facebookincubator#11212 Pull Request resolved: facebookincubator#7066 Reviewed By: kevinwilfong Differential Revision: D81556081 Pulled By: bikramSingh91 fbshipit-source-id: 379814d3b40e4898d7df9224098610908900f34c
1 parent 4791adf commit e7810c9

16 files changed

+742
-27
lines changed

velox/common/memory/tests/SharedArbitratorTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ DEBUG_ONLY_TEST_P(
760760
folly::EventCount taskPauseWait;
761761
auto taskPauseWaitKey = taskPauseWait.prepareWait();
762762

763-
const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
763+
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);
764764

765765
std::atomic<bool> injectAllocationOnce{true};
766766
fakeOperatorFactory_->setAllocationCallback([&](Operator* op) {

velox/core/PlanNode.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3129,6 +3129,15 @@ class AbstractJoinNode : public PlanNode {
31293129
return isInnerJoin() || isLeftJoin() || isAntiJoin();
31303130
}
31313131

3132+
/// Indicates if this joinNode can drop duplicate rows with same join key.
3133+
/// For left semi and anti join, it is not necessary to store duplicate rows.
3134+
bool canDropDuplicates() const {
3135+
// Left semi and anti join with no extra filter only needs to know whether
3136+
// there is a match. Hence, no need to store entries with duplicate keys.
3137+
return !filter() &&
3138+
(isLeftSemiFilterJoin() || isLeftSemiProjectJoin() || isAntiJoin());
3139+
}
3140+
31323141
const std::vector<FieldAccessTypedExprPtr>& leftKeys() const {
31333142
return leftKeys_;
31343143
}

velox/core/QueryConfig.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,18 @@ class QueryConfig {
211211
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
212212
"abandon_partial_topn_row_number_min_pct";
213213

214+
/// Number of input rows to receive before starting to check whether to
215+
/// abandon building a HashTable without duplicates in HashBuild for left
216+
/// semi/anti join.
217+
static constexpr const char* kAbandonDedupHashMapMinRows =
218+
"abandon_dedup_hashmap_min_rows";
219+
220+
/// Abandons building a HashTable without duplicates in HashBuild for left
221+
/// semi/anti join if the percentage of distinct keys in the HashTable exceeds
222+
/// this threshold. Zero means 'disable this optimization'.
223+
static constexpr const char* kAbandonDedupHashMapMinPct =
224+
"abandon_dedup_hashmap_min_pct";
225+
214226
static constexpr const char* kMaxElementsSizeInRepeatAndSequence =
215227
"max_elements_size_in_repeat_and_sequence";
216228

@@ -840,6 +852,14 @@ class QueryConfig {
840852
return get<int32_t>(kAbandonPartialTopNRowNumberMinPct, 80);
841853
}
842854

855+
int32_t abandonHashBuildDedupMinRows() const {
856+
return get<int32_t>(kAbandonDedupHashMapMinRows, 100'000);
857+
}
858+
859+
int32_t abandonHashBuildDedupMinPct() const {
860+
return get<int32_t>(kAbandonDedupHashMapMinPct, 0);
861+
}
862+
843863
int32_t maxElementsSizeInRepeatAndSequence() const {
844864
return get<int32_t>(kMaxElementsSizeInRepeatAndSequence, 10'000);
845865
}

velox/docs/configs.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ Generic Configuration
4343
- integer
4444
- 80
4545
- Abandons partial TopNRowNumber if number of output rows equals or exceeds this percentage of the number of input rows.
46+
* - abandon_dedup_hashmap_min_rows
47+
- integer
48+
- 100,000
49+
- Number of input rows to receive before starting to check whether to abandon building a HashTable without
50+
duplicates in HashBuild for left semi/anti join.
51+
* - abandon_dedup_hashmap_min_pct
52+
- integer
53+
- 0
54+
- Abandons building a HashTable without duplicates in HashBuild for left semi/anti join if the percentage of
55+
distinct keys in the HashTable exceeds this threshold. Zero means 'disable this optimization'.
4656
* - session_timezone
4757
- string
4858
-

velox/exec/HashBuild.cpp

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ HashBuild::HashBuild(
6363
joinType_{joinNode_->joinType()},
6464
nullAware_{joinNode_->isNullAware()},
6565
needProbedFlagSpill_{needRightSideJoin(joinType_)},
66+
dropDuplicates_(joinNode_->canDropDuplicates()),
67+
abandonHashBuildDedupMinRows_(
68+
driverCtx->queryConfig().abandonHashBuildDedupMinRows()),
69+
abandonHashBuildDedupMinPct_(
70+
driverCtx->queryConfig().abandonHashBuildDedupMinPct()),
6671
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
6772
operatorCtx_->driverCtx()->splitGroupId,
6873
planNodeId())),
@@ -86,19 +91,22 @@ HashBuild::HashBuild(
8691

8792
// Identify the non-key build side columns and make a decoder for each.
8893
const int32_t numDependents = inputType->size() - numKeys;
89-
if (numDependents > 0) {
90-
// Number of join keys (numKeys) may be less then number of input columns
91-
// (inputType->size()). In this case numDependents is negative and cannot be
92-
// used to call 'reserve'. This happens when we join different probe side
93-
// keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 =
94-
// u.k AND t.k2 = u.k.
95-
dependentChannels_.reserve(numDependents);
96-
decoders_.reserve(numDependents);
97-
}
98-
for (auto i = 0; i < inputType->size(); ++i) {
99-
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
100-
dependentChannels_.emplace_back(i);
101-
decoders_.emplace_back(std::make_unique<DecodedVector>());
94+
if (!dropDuplicates_) {
95+
if (numDependents > 0) {
96+
// Number of join keys (numKeys) may be less then number of input columns
97+
// (inputType->size()). In this case numDependents is negative and cannot
98+
// be used to call 'reserve'. This happens when we join different probe
99+
// side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON
100+
// t.k1 = u.k AND t.k2 = u.k.
101+
dependentChannels_.reserve(numDependents);
102+
decoders_.reserve(numDependents);
103+
}
104+
105+
for (auto i = 0; i < inputType->size(); ++i) {
106+
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
107+
dependentChannels_.emplace_back(i);
108+
decoders_.emplace_back(std::make_unique<DecodedVector>());
109+
}
102110
}
103111
}
104112

@@ -146,11 +154,6 @@ void HashBuild::setupTable() {
146154
.minTableRowsForParallelJoinBuild(),
147155
pool());
148156
} else {
149-
// (Left) semi and anti join with no extra filter only needs to know whether
150-
// there is a match. Hence, no need to store entries with duplicate keys.
151-
const bool dropDuplicates = !joinNode_->filter() &&
152-
(joinNode_->isLeftSemiFilterJoin() ||
153-
joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_));
154157
// Right semi join needs to tag build rows that were probed.
155158
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
156159
if (isLeftNullAwareJoinWithFilter(joinNode_)) {
@@ -159,7 +162,7 @@ void HashBuild::setupTable() {
159162
table_ = HashTable<false>::createForJoin(
160163
std::move(keyHashers),
161164
dependentTypes,
162-
!dropDuplicates, // allowDuplicates
165+
!dropDuplicates_, // allowDuplicates
163166
needProbedFlag, // hasProbedFlag
164167
operatorCtx_->driverCtx()
165168
->queryConfig()
@@ -170,7 +173,7 @@ void HashBuild::setupTable() {
170173
table_ = HashTable<true>::createForJoin(
171174
std::move(keyHashers),
172175
dependentTypes,
173-
!dropDuplicates, // allowDuplicates
176+
!dropDuplicates_, // allowDuplicates
174177
needProbedFlag, // hasProbedFlag
175178
operatorCtx_->driverCtx()
176179
->queryConfig()
@@ -179,6 +182,15 @@ void HashBuild::setupTable() {
179182
}
180183
}
181184
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
185+
if (abandonHashBuildDedupMinPct_ == 0) {
186+
// Building a HashTable without duplicates is disabled if
187+
// abandonBuildNoDupHashMinPct_ is 0.
188+
abandonHashBuildDedup_ = true;
189+
table_->setAllowDuplicates(true);
190+
return;
191+
}
192+
// Only create HashLookup when dedup is enabled.
193+
lookup_ = std::make_unique<HashLookup>(table_->hashers(), pool());
182194
}
183195

184196
void HashBuild::setupSpiller(SpillPartition* spillPartition) {
@@ -377,6 +389,25 @@ void HashBuild::addInput(RowVectorPtr input) {
377389
return;
378390
}
379391

392+
if (dropDuplicates_ && !abandonHashBuildDedup_) {
393+
const bool abandonEarly = abandonHashBuildDedupEarly(table_->numDistinct());
394+
if (!abandonEarly) {
395+
numHashInputRows_ += activeRows_.countSelected();
396+
table_->prepareForGroupProbe(
397+
*lookup_,
398+
input,
399+
activeRows_,
400+
BaseHashTable::kNoSpillInputStartPartitionBit);
401+
if (lookup_->rows.empty()) {
402+
return;
403+
}
404+
table_->groupProbe(
405+
*lookup_, BaseHashTable::kNoSpillInputStartPartitionBit);
406+
return;
407+
}
408+
abandonHashBuildDedup();
409+
}
410+
380411
if (analyzeKeys_ && hashes_.size() < activeRows_.end()) {
381412
hashes_.resize(activeRows_.end());
382413
}
@@ -755,6 +786,7 @@ bool HashBuild::finishHashBuild() {
755786
std::move(otherTables),
756787
isInputFromSpill() ? spillConfig()->startPartitionBit
757788
: BaseHashTable::kNoSpillInputStartPartitionBit,
789+
dropDuplicates_,
758790
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
759791
: nullptr);
760792
}
@@ -879,6 +911,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
879911
setupTable();
880912
setupSpiller(spillInput.spillPartition.get());
881913
stateCleared_ = false;
914+
numHashInputRows_ = 0;
882915

883916
// Start to process spill input.
884917
processSpillInput();
@@ -1240,4 +1273,21 @@ void HashBuildSpiller::extractSpill(
12401273
rows.data(), rows.size(), false, false, result->childAt(types.size()));
12411274
}
12421275
}
1276+
1277+
bool HashBuild::abandonHashBuildDedupEarly(int64_t numDistinct) const {
1278+
VELOX_CHECK(dropDuplicates_);
1279+
return numHashInputRows_ > abandonHashBuildDedupMinRows_ &&
1280+
100 * numDistinct / numHashInputRows_ >= abandonHashBuildDedupMinPct_;
1281+
}
1282+
1283+
void HashBuild::abandonHashBuildDedup() {
1284+
// The hash table is no longer directly constructed in addInput. The data
1285+
// that was previously inserted into the hash table is already in the
1286+
// RowContainer.
1287+
addRuntimeStat("abandonBuildNoDupHash", RuntimeCounter(1));
1288+
abandonHashBuildDedup_ = true;
1289+
table_->setAllowDuplicates(true);
1290+
lookup_.reset();
1291+
}
1292+
12431293
} // namespace facebook::velox::exec

velox/exec/HashBuild.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,14 @@ class HashBuild final : public Operator {
204204
// not.
205205
bool nonReclaimableState() const;
206206

207+
// True if we have enough rows and not enough duplicate join keys, i.e. more
208+
// than 'abandonHashBuildDedupMinRows_' rows and more than
209+
// 'abandonHashBuildDedupMinPct_' % of rows are unique.
210+
bool abandonHashBuildDedupEarly(int64_t numDistinct) const;
211+
212+
// Invoked to abandon build deduped hash table.
213+
void abandonHashBuildDedup();
214+
207215
const std::shared_ptr<const core::HashJoinNode> joinNode_;
208216

209217
const core::JoinType joinType_;
@@ -217,6 +225,19 @@ class HashBuild final : public Operator {
217225
// not.
218226
const bool needProbedFlagSpill_;
219227

228+
// Indicates whether drop duplicate rows. Rows containing duplicate keys
229+
// can be removed for left semi and anti join.
230+
const bool dropDuplicates_;
231+
232+
// Minimum number of rows to see before deciding to give up build no
233+
// duplicates hash table.
234+
const int32_t abandonHashBuildDedupMinRows_;
235+
236+
// Min unique rows pct for give up build deduped hash table. If more
237+
// than this many rows are unique, build hash table in addInput phase is not
238+
// worthwhile.
239+
const int32_t abandonHashBuildDedupMinPct_;
240+
220241
std::shared_ptr<HashJoinBridge> joinBridge_;
221242

222243
tsan_atomic<bool> exceededMaxSpillLevelLimit_{false};
@@ -242,6 +263,9 @@ class HashBuild final : public Operator {
242263
// Container for the rows being accumulated.
243264
std::unique_ptr<BaseHashTable> table_;
244265

266+
// Used for building hash table while adding input rows.
267+
std::unique_ptr<HashLookup> lookup_;
268+
245269
// Key channels in 'input_'
246270
std::vector<column_index_t> keyChannels_;
247271

@@ -269,6 +293,10 @@ class HashBuild final : public Operator {
269293
// at least one entry with null join keys.
270294
bool joinHasNullKeys_{false};
271295

296+
// Whether to abandon building a HashTable without duplicates in HashBuild
297+
// addInput phase for left semi/anti join.
298+
bool abandonHashBuildDedup_{false};
299+
272300
// The type used to spill hash table which might attach a boolean column to
273301
// record the probed flag if 'needProbedFlagSpill_' is true.
274302
RowTypePtr spillType_;
@@ -310,6 +338,10 @@ class HashBuild final : public Operator {
310338

311339
// Maps key channel in 'input_' to channel in key.
312340
folly::F14FastMap<column_index_t, column_index_t> keyChannelMap_;
341+
342+
// Count the number of hash table input rows for building deduped
343+
// hash table. It will not be updated after abandonBuildNoDupHash_ is true.
344+
int64_t numHashInputRows_ = 0;
313345
};
314346

315347
inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {

velox/exec/HashJoinBridge.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,18 @@ RowTypePtr hashJoinTableType(
4242
types.emplace_back(inputType->childAt(channel));
4343
}
4444

45+
if (joinNode->canDropDuplicates()) {
46+
// For left semi and anti join with no extra filter, hash table does not
47+
// store dependent columns.
48+
return ROW(std::move(names), std::move(types));
49+
}
50+
4551
for (auto i = 0; i < inputType->size(); ++i) {
4652
if (keyChannelSet.find(i) == keyChannelSet.end()) {
4753
names.emplace_back(inputType->nameOf(i));
4854
types.emplace_back(inputType->childAt(i));
4955
}
5056
}
51-
5257
return ROW(std::move(names), std::move(types));
5358
}
5459

velox/exec/HashTable.cpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ HashTable<ignoreNullKeys>::HashTable(
5757
pool_(pool),
5858
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
5959
isJoinBuild_(isJoinBuild),
60+
allowDuplicates_(allowDuplicates),
6061
buildPartitionBounds_(raw_vector<PartitionBoundIndexType>(pool)) {
6162
std::vector<TypePtr> keys;
6263
for (auto& hasher : hashers_) {
@@ -1495,7 +1496,9 @@ void HashTable<ignoreNullKeys>::decideHashMode(
14951496
return;
14961497
}
14971498
disableRangeArrayHash_ |= disableRangeArrayHash;
1498-
if (numDistinct_ && !isJoinBuild_) {
1499+
if (numDistinct_ && (!isJoinBuild_ || joinBuildNoDuplicates())) {
1500+
// If the join type is left semi and anti, allowDuplicates_ will be false,
1501+
// and join build is building hash table while adding input rows.
14991502
if (!analyze()) {
15001503
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
15011504
return;
@@ -1717,8 +1720,20 @@ template <bool ignoreNullKeys>
17171720
void HashTable<ignoreNullKeys>::prepareJoinTable(
17181721
std::vector<std::unique_ptr<BaseHashTable>> tables,
17191722
int8_t spillInputStartPartitionBit,
1723+
bool dropDuplicates,
17201724
folly::Executor* executor) {
17211725
buildExecutor_ = executor;
1726+
if (dropDuplicates) {
1727+
if (table_ != nullptr) {
1728+
// Reset table_ and capacity_ to trigger rehash.
1729+
rows_->pool()->freeContiguous(tableAllocation_);
1730+
table_ = nullptr;
1731+
capacity_ = 0;
1732+
}
1733+
// Call analyze to insert all unique values in row container to the
1734+
// table hashers' uniqueValues_;
1735+
analyze();
1736+
}
17221737
otherTables_.reserve(tables.size());
17231738
for (auto& table : tables) {
17241739
otherTables_.emplace_back(
@@ -1749,6 +1764,11 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
17491764
}
17501765
if (useValueIds) {
17511766
for (auto& other : otherTables_) {
1767+
if (dropDuplicates) {
1768+
// Before merging with the current hashers, all values in the row
1769+
// containers of other table need to be inserted into uniqueValues_.
1770+
other->analyze();
1771+
}
17521772
for (auto i = 0; i < hashers_.size(); ++i) {
17531773
hashers_[i]->merge(*other->hashers_[i]);
17541774
if (!hashers_[i]->mayUseValueIds()) {

0 commit comments

Comments
 (0)