Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit e1d6f60

Browse files
committed
Support partitioned aggregation.
Signed-off-by: ienkovich <[email protected]>
1 parent aab3f75 commit e1d6f60

37 files changed

+1479
-142
lines changed

omniscidb/ConfigBuilder/ConfigBuilder.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,39 @@ bool ConfigBuilder::parseCommandLineArgs(int argc,
183183
po::value<size_t>(&config_->exec.group_by.large_ndv_multiplier)
184184
->default_value(config_->exec.group_by.large_ndv_multiplier),
185185
"A multiplier applied to NDV estimator buffer size for large ranges.");
186+
opt_desc.add_options()(
187+
"enable-cpu-partitioned-groupby",
188+
po::value<bool>(&config_->exec.group_by.enable_cpu_partitioned_groupby)
189+
->default_value(config_->exec.group_by.enable_cpu_partitioned_groupby)
190+
->implicit_value(true),
191+
"Enable partitioned aggregation on CPU.");
192+
opt_desc.add_options()(
193+
"groupby-partitioning-buffer-size-threshold",
194+
po::value<size_t>(&config_->exec.group_by.partitioning_buffer_size_threshold)
195+
->default_value(config_->exec.group_by.partitioning_buffer_size_threshold),
196+
"Minimal estimated output buffer size to enable partitioned aggregation.");
197+
opt_desc.add_options()(
198+
"groupby-partitioning-group-size-threshold",
199+
po::value<double>(&config_->exec.group_by.partitioning_group_size_threshold)
200+
->default_value(config_->exec.group_by.partitioning_group_size_threshold),
201+
"Maximum average estimated number of rows per output group to enable partitioned "
202+
"aggregation.");
203+
opt_desc.add_options()("groupby-min-partitions",
204+
po::value<size_t>(&config_->exec.group_by.min_partitions)
205+
->default_value(config_->exec.group_by.min_partitions),
206+
"A minimal number of partitions to be used for partitioned "
207+
"aggregation. 0 value is used for auto-detection.");
208+
opt_desc.add_options()("groupby-max-partitions",
209+
po::value<size_t>(&config_->exec.group_by.max_partitions)
210+
->default_value(config_->exec.group_by.max_partitions),
211+
"A maximum number of partitions to be used for partitioned "
212+
"aggregation.");
213+
opt_desc.add_options()(
214+
"groupby-partitioning-target-buffer-size",
215+
po::value<size_t>(&config_->exec.group_by.partitioning_buffer_target_size)
216+
->default_value(config_->exec.group_by.partitioning_buffer_target_size),
217+
"A preferred aggregation output buffer size used to compute number of partitions "
218+
"to use.");
186219

187220
// exec.window
188221
opt_desc.add_options()("enable-window-functions",
@@ -349,6 +382,12 @@ bool ConfigBuilder::parseCommandLineArgs(int argc,
349382
->implicit_value(true),
350383
"Enable multi-fragment intermediate results to improve execution parallelism for "
351384
"queries with multiple execution steps");
385+
opt_desc.add_options()(
386+
"enable-multifrag-execution-result",
387+
po::value<bool>(&config_->exec.enable_multifrag_execution_result)
388+
->default_value(config_->exec.enable_multifrag_execution_result)
389+
->implicit_value(true),
390+
"Enable multi-fragment final execution result");
352391
opt_desc.add_options()("gpu-block-size",
353392
po::value<size_t>(&config_->exec.override_gpu_block_size)
354393
->default_value(config_->exec.override_gpu_block_size),

omniscidb/IR/ExprCollector.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,21 @@ class ExprCollector : public ExprVisitor<void> {
2525
return collect(expr.get(), std::forward<Ts>(args)...);
2626
}
2727

28+
template <typename... Ts>
29+
static ResultType collect(const ExprPtrVector& exprs, Ts&&... args) {
30+
CollectorType collector(std::forward<Ts>(args)...);
31+
for (auto& expr : exprs) {
32+
collector.visit(expr.get());
33+
}
34+
return std::move(collector.result_);
35+
}
36+
2837
ResultType& result() { return result_; }
2938
const ResultType& result() const { return result_; }
3039

3140
protected:
41+
using BaseClass = ExprCollector<ResultType, CollectorType>;
42+
3243
ResultType result_;
3344
};
3445

omniscidb/IR/ExprVisitor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ class ExprVisitor {
115115
if (auto agg = expr->as<AggExpr>()) {
116116
return visitAggExpr(agg);
117117
}
118+
CHECK(false) << "Unhandled expr: " << expr->toString();
118119
return defaultResult(expr);
119120
}
120121

omniscidb/IR/Node.cpp

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,90 @@ void LogicalUnion::checkForMatchingMetaInfoTypes() const {
284284
}
285285
}
286286

287+
size_t ShuffleFunction::hash() const {
288+
size_t res = 0;
289+
boost::hash_combine(res, kind);
290+
boost::hash_combine(res, partitions);
291+
return res;
292+
}
293+
294+
std::string ShuffleFunction::toString() const {
295+
std::stringstream ss;
296+
ss << kind << "(" << partitions << ")";
297+
return ss.str();
298+
}
299+
300+
std::ostream& operator<<(std::ostream& os, const ShuffleFunction& fn) {
301+
os << fn.toString();
302+
return os;
303+
}
304+
305+
std::ostream& operator<<(std::ostream& os, ShuffleFunction::Kind kind) {
306+
os << ::toString(kind);
307+
return os;
308+
}
309+
310+
Shuffle::Shuffle(ExprPtrVector keys,
311+
ExprPtr expr,
312+
std::string field,
313+
ShuffleFunction fn,
314+
NodePtr input)
315+
: keys_(keys), exprs_({std::move(expr)}), fields_({std::move(field)}), fn_(fn) {
316+
inputs_.emplace_back(std::move(input));
317+
}
318+
319+
Shuffle::Shuffle(ExprPtrVector keys,
320+
ExprPtrVector exprs,
321+
std::vector<std::string> fields,
322+
ShuffleFunction fn,
323+
std::vector<NodePtr> inputs)
324+
: keys_(keys), exprs_(std::move(exprs)), fields_(std::move(fields)), fn_(fn) {
325+
inputs_ = std::move(inputs);
326+
}
327+
328+
std::string Shuffle::toString() const {
329+
return cat(::typeName(this),
330+
getIdString(),
331+
"(keys=",
332+
::toString(keys_),
333+
", exprs=",
334+
::toString(exprs_),
335+
", fields",
336+
::toString(fields_),
337+
", fn=",
338+
::toString(fn_),
339+
")");
340+
}
341+
342+
size_t Shuffle::toHash() const {
343+
if (!hash_) {
344+
hash_ = typeid(Shuffle).hash_code();
345+
for (auto& expr : keys_) {
346+
boost::hash_combine(*hash_, expr->hash());
347+
}
348+
for (auto& expr : exprs_) {
349+
boost::hash_combine(*hash_, expr->hash());
350+
}
351+
for (auto& field : fields_) {
352+
boost::hash_combine(*hash_, field);
353+
}
354+
boost::hash_combine(*hash_, fn_.hash());
355+
for (auto& node : inputs_) {
356+
boost::hash_combine(*hash_, node->toHash());
357+
}
358+
}
359+
return *hash_;
360+
}
361+
362+
void Shuffle::rewriteExprs(hdk::ir::ExprRewriter& rewriter) {
363+
for (size_t i = 0; i < keys_.size(); ++i) {
364+
keys_[i] = rewriter.visit(keys_[i].get());
365+
}
366+
for (size_t i = 0; i < exprs_.size(); ++i) {
367+
exprs_[i] = rewriter.visit(exprs_[i].get());
368+
}
369+
}
370+
287371
namespace {
288372

289373
void collectNodes(NodePtr node, std::vector<NodePtr> nodes) {
@@ -318,7 +402,7 @@ void QueryDag::resetQueryExecutionState() {
318402
// TODO: always simply use node->size()
319403
size_t getNodeColumnCount(const Node* node) {
320404
// Nodes that don't depend on input.
321-
if (is_one_of<Scan, Project, Aggregate, LogicalUnion, LogicalValues>(node)) {
405+
if (is_one_of<Scan, Project, Aggregate, LogicalUnion, LogicalValues, Shuffle>(node)) {
322406
return node->size();
323407
}
324408

@@ -356,7 +440,8 @@ ExprPtrVector getNodeColumnRefs(const Node* node) {
356440
LogicalValues,
357441
Filter,
358442
Sort,
359-
Join>(node)) {
443+
Join,
444+
Shuffle>(node)) {
360445
return genColumnRefs(node, getNodeColumnCount(node));
361446
}
362447

@@ -374,7 +459,8 @@ ExprPtr getNodeColumnRef(const Node* node, unsigned index) {
374459
LogicalValues,
375460
Filter,
376461
Sort,
377-
Join>(node)) {
462+
Join,
463+
Shuffle>(node)) {
378464
return makeExpr<ColumnRef>(getColumnType(node, index), node, index);
379465
}
380466

@@ -458,3 +544,12 @@ const Type* getColumnType(const Node* node, size_t col_idx) {
458544
}
459545

460546
} // namespace hdk::ir
547+
548+
std::string toString(hdk::ir::ShuffleFunction::Kind kind) {
549+
switch (kind) {
550+
case hdk::ir::ShuffleFunction::kHash:
551+
return "Hash";
552+
}
553+
LOG(FATAL) << "Invalid shuffle kind.";
554+
return "";
555+
}

omniscidb/IR/Node.h

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,9 @@ class Aggregate : public Node {
377377
NodePtr input)
378378
: groupby_count_(groupby_count)
379379
, aggs_(std::move(aggs))
380-
, fields_(std::move(fields)) {
380+
, fields_(std::move(fields))
381+
, partitioned_(false)
382+
, buffer_entry_count_hint_(0) {
381383
inputs_.emplace_back(std::move(input));
382384
}
383385

@@ -406,6 +408,12 @@ class Aggregate : public Node {
406408

407409
void setAggExprs(ExprPtrVector new_aggs) { aggs_ = std::move(new_aggs); }
408410

411+
bool isPartitioned() const { return partitioned_; }
412+
void setPartitioned(bool val) { partitioned_ = val; }
413+
414+
size_t bufferEntryCountHint() const { return buffer_entry_count_hint_; }
415+
void setBufferEntryCountHint(size_t val) { buffer_entry_count_hint_ = val; }
416+
409417
void rewriteExprs(hdk::ir::ExprRewriter& rewriter) override;
410418

411419
std::string toString() const override {
@@ -417,6 +425,7 @@ class Aggregate : public Node {
417425
::toString(aggs_),
418426
", fields=",
419427
::toString(fields_),
428+
(partitioned_ ? ", partitioned" : ""),
420429
", inputs=",
421430
inputsToString(inputs_),
422431
")");
@@ -445,6 +454,8 @@ class Aggregate : public Node {
445454
const size_t groupby_count_;
446455
ExprPtrVector aggs_;
447456
std::vector<std::string> fields_;
457+
bool partitioned_;
458+
size_t buffer_entry_count_hint_;
448459
};
449460

450461
class Join : public Node {
@@ -593,7 +604,9 @@ class TranslatedJoin : public Node {
593604
CHECK(false);
594605
return nullptr;
595606
}
596-
const std::string& getFieldName(size_t i) const override { CHECK(false); }
607+
const std::string& getFieldName(size_t i) const override {
608+
throw std::runtime_error("Unexpected call to TranslatedJoin::getFieldName.");
609+
}
597610
std::vector<const ColumnVar*> getJoinCols(bool lhs) const {
598611
if (lhs) {
599612
return lhs_join_cols_;
@@ -853,6 +866,69 @@ class LogicalUnion : public Node {
853866
bool const is_all_;
854867
};
855868
869+
struct ShuffleFunction {
870+
enum Kind {
871+
kHash,
872+
};
873+
874+
Kind kind;
875+
size_t partitions;
876+
877+
size_t hash() const;
878+
std::string toString() const;
879+
};
880+
881+
std::ostream& operator<<(std::ostream& os, const ShuffleFunction& fn);
882+
std::ostream& operator<<(std::ostream& os, ShuffleFunction::Kind kind);
883+
884+
class Shuffle : public Node {
885+
public:
886+
Shuffle(ExprPtrVector keys,
887+
ExprPtr expr,
888+
std::string field,
889+
ShuffleFunction fn,
890+
NodePtr input);
891+
Shuffle(ExprPtrVector keys,
892+
ExprPtrVector exprs,
893+
std::vector<std::string> fields,
894+
ShuffleFunction fn,
895+
std::vector<NodePtr> input);
896+
Shuffle(const Shuffle& other) = default;
897+
898+
const ExprPtrVector& keys() const { return keys_; }
899+
const ExprPtrVector& exprs() const { return exprs_; }
900+
const std::vector<std::string>& fields() const { return fields_; }
901+
ShuffleFunction fn() const { return fn_; }
902+
903+
size_t size() const override { return exprs_.size(); }
904+
905+
// Shuffle node can be used for computing partition sizes and perform
906+
// actual partitioning. The first version uses COUNT aggregte as its
907+
// only target expression.
908+
bool isCount() const {
909+
return exprs_.size() == (size_t)1 && exprs_.front()->is<AggExpr>();
910+
}
911+
912+
std::string toString() const override;
913+
size_t toHash() const override;
914+
void rewriteExprs(hdk::ir::ExprRewriter& rewriter) override;
915+
916+
std::shared_ptr<Node> deepCopy() const override {
917+
return std::make_shared<Shuffle>(*this);
918+
}
919+
920+
const std::string& getFieldName(size_t i) const override {
921+
CHECK_LT(i, fields_.size());
922+
return fields_[i];
923+
}
924+
925+
private:
926+
ExprPtrVector keys_;
927+
ExprPtrVector exprs_;
928+
std::vector<std::string> fields_;
929+
ShuffleFunction fn_;
930+
};
931+
856932
class QueryNotSupported : public std::runtime_error {
857933
public:
858934
QueryNotSupported(const std::string& reason) : std::runtime_error(reason) {}
@@ -921,3 +997,5 @@ size_t getNodeColumnCount(const Node* node);
921997
ExprPtr getJoinInputColumnRef(const ColumnRef* col_ref);
922998
923999
} // namespace hdk::ir
1000+
1001+
std::string toString(hdk::ir::ShuffleFunction::Kind kind);

omniscidb/QueryEngine/CardinalityEstimator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ RelAlgExecutionUnit create_count_all_execution_unit(
114114
ra_exe_unit.hash_table_build_plan_dag,
115115
ra_exe_unit.table_id_to_node_map,
116116
ra_exe_unit.union_all,
117+
ra_exe_unit.shuffle_fn,
118+
ra_exe_unit.partition_offsets_col,
119+
ra_exe_unit.partitioned_aggregation,
117120
ra_exe_unit.cost_model,
118121
{}}; // TODO(bagrorg): should we use costmodel here?
119122
}

omniscidb/QueryEngine/CardinalityEstimator.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ class CardinalityEstimationRequired : public std::runtime_error {
4343
const int64_t range_;
4444
};
4545

46+
class RequestPartitionedAggregation : public std::runtime_error {
47+
public:
48+
RequestPartitionedAggregation(size_t entry_size, size_t estimated_buffer_entries)
49+
: std::runtime_error("RequestPartitionedAggregation")
50+
, entry_size_(entry_size)
51+
, estimated_buffer_entries_(estimated_buffer_entries) {}
52+
53+
size_t entrySize() const { return entry_size_; }
54+
size_t estimatedBufferEntries() const { return estimated_buffer_entries_; }
55+
size_t estimatedBufferSize() const { return entry_size_ * estimated_buffer_entries_; }
56+
57+
private:
58+
size_t entry_size_;
59+
size_t estimated_buffer_entries_;
60+
};
61+
4662
RelAlgExecutionUnit create_ndv_execution_unit(const RelAlgExecutionUnit& ra_exe_unit,
4763
SchemaProvider* schema_provider,
4864
const Config& config,

0 commit comments

Comments
 (0)