Skip to content

Commit fda5025

Browse files
duxiao1212meta-codesync[bot]
authored andcommitted
feat: Add spill for MarkDistinct (#16577)
Summary: Pull Request resolved: #16577 ### Added Spill for MarkDistinct This diff adds a new feature to MarkDistinct to allow spilling. Here are the highlights of the change: ### Changed Files include * GroupingSet.h * core/QueryConfig.h * exec/EnforceDistinct.cpp * exec/GroupingSet.cpp * core/PlanNode.h in addition to configured spilling flag in QueryConfig.h * Added SpillConfig to GroupingSet.h’s constructor to allow spilling in GroupingSet.cpp * Added a canSpill() function to MarkDistinct’s PlanNode to enforce spilling support In addition to updating all GroupingSet.h’s function Added spilling support for operator MarkDistinct disabled by default Reviewed By: xiaoxmeng, tanjialiang Differential Revision: D90342763 fbshipit-source-id: 184841f9af7eca47e44a5394a86238c9dfa1c4ed
1 parent 6d481a6 commit fda5025

File tree

10 files changed

+904
-26
lines changed

10 files changed

+904
-26
lines changed

velox/core/PlanNode.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5419,6 +5419,10 @@ class MarkDistinctNode : public PlanNode {
54195419
return "MarkDistinct";
54205420
}
54215421

5422+
bool canSpill(const QueryConfig& queryConfig) const override {
5423+
return queryConfig.markDistinctSpillEnabled();
5424+
}
5425+
54225426
const std::string& markerName() const {
54235427
return markerName_;
54245428
}

velox/core/QueryConfig.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ class QueryConfig {
368368
static constexpr const char* kRowNumberSpillEnabled =
369369
"row_number_spill_enabled";
370370

371+
/// MarkDistinct spilling flag, only applies if "spill_enabled" flag is set.
372+
static constexpr const char* kMarkDistinctSpillEnabled =
373+
"mark_distinct_spill_enabled";
374+
371375
/// TopNRowNumber spilling flag, only applies if "spill_enabled" flag is set.
372376
static constexpr const char* kTopNRowNumberSpillEnabled =
373377
"topn_row_number_spill_enabled";
@@ -1185,6 +1189,10 @@ class QueryConfig {
11851189
return get<bool>(kRowNumberSpillEnabled, true);
11861190
}
11871191

1192+
bool markDistinctSpillEnabled() const {
1193+
return get<bool>(kMarkDistinctSpillEnabled, true);
1194+
}
1195+
11881196
bool topNRowNumberSpillEnabled() const {
11891197
return get<bool>(kTopNRowNumberSpillEnabled, true);
11901198
}

velox/docs/configs.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,10 @@ Spilling
413413
- boolean
414414
- true
415415
- When `spill_enabled` is true, determines whether TopNRowNumber operator can spill to disk under memory pressure.
416+
* - mark_distinct_spill_enabled
417+
- boolean
418+
- false
419+
- When `spill_enabled` is true, determines whether MarkDistinct operator can spill to disk under memory pressure.
416420
* - writer_spill_enabled
417421
- boolean
418422
- true

velox/docs/develop/memory.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ Here is the memory reclaim process within a query:
667667
reclamation through disk spilling and table writer flush. *Operator::reclaim*
668668
is added to support memory reclamation with the default implementation does
669669
nothing. Only spillable operators override that method: *OrderBy*, *HashBuild*,
670-
*HashAggregation*, *RowNumber*, *TopNRowNumber*, *Window* and *TableWriter*.
670+
*HashAggregation*, *RowNumber*, *TopNRowNumber*, *MarkDistinct*, *Window* and *TableWriter*.
671671
As for now, we simply spill everything from the spillable operator’s row
672672
container to free up memory. After we add memory compaction support for row
673673
containers, we could leverage fine-grained disk spilling features in Velox

velox/docs/develop/operators.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,11 @@ MarkDistinctNode
10181018
The MarkDistinct operator is used to produce aggregate mask columns for aggregations over distinct values, e.g. agg(DISTINCT a).
10191019
Mask is a boolean column set to true for a subset of input rows that collectively represent a set of unique values of 'distinctKeys'.
10201020

1021+
This operator supports spilling. The spill mechanism follows the same pattern as RowNumber: when memory pressure
1022+
triggers spilling, the hash table contents and future input are partitioned and written to disk. During restore,
1023+
each partition's hash table is rebuilt from the spilled data, preserving knowledge of which keys were already seen.
1024+
Disabled by default; enable with `mark_distinct_spill_enabled` configuration property.
1025+
10211026
.. list-table::
10221027
:widths: 10 30
10231028
:align: left

velox/exec/GroupingSet.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ void GroupingSet::addInputForActiveRows(
276276
const RowVectorPtr& input,
277277
bool mayPushdown) {
278278
VELOX_CHECK(!isGlobal_);
279-
if (!table_) {
279+
if (table_ == nullptr) {
280280
createHashTable();
281281
}
282282
ensureInputFits(input);

velox/exec/GroupingSet.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ class GroupingSet {
149149
return table_ ? table_->rows()->numRows() : 0;
150150
}
151151

152+
/// Returns the underlying hash table, or nullptr if it has not been created
153+
/// yet.
154+
BaseHashTable* table() const {
155+
return table_.get();
156+
}
157+
152158
/// Frees hash tables and other state when giving up partial aggregation as
153159
/// non-productive. Must be called before toIntermediate() is used.
154160
void abandonPartialAggregation();

0 commit comments

Comments
 (0)