Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit ff6b55e

Browse files
akroviakovkurapov-peter
authored andcommitted
unambiguous executor
1 parent b4d67d9 commit ff6b55e

14 files changed

+304
-241
lines changed

omniscidb/QueryEngine/CostModel/CostModel.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ class CostModel {
4949

5050
virtual void calibrate(const CaibrationConfig& conf);
5151
virtual std::unique_ptr<policy::ExecutionPolicy> predict(
52-
QueryInfo query_info) const = 0;
52+
QueryInfo query_info,
53+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
54+
const = 0;
5355

5456
protected:
5557
struct DeviceExtrapolations {

omniscidb/QueryEngine/CostModel/Dispatchers/DefaultExecutionPolicy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ SchedulingAssignment FragmentIDAssignmentExecutionPolicy::scheduleSingleFragment
2424
int device_id = fragment.deviceIds[static_cast<int>(memory_level)];
2525
return {dt_, device_id};
2626
}
27-
std::vector<ExecutorDeviceType> FragmentIDAssignmentExecutionPolicy::devices() const {
27+
std::set<ExecutorDeviceType> FragmentIDAssignmentExecutionPolicy::devices() const {
2828
return {dt_};
2929
}
3030
} // namespace policy

omniscidb/QueryEngine/CostModel/Dispatchers/DefaultExecutionPolicy.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
namespace policy {
1919
class FragmentIDAssignmentExecutionPolicy : public ExecutionPolicy {
2020
public:
21-
FragmentIDAssignmentExecutionPolicy(ExecutorDeviceType dt) : dt_(dt){};
21+
FragmentIDAssignmentExecutionPolicy(
22+
ExecutorDeviceType dt,
23+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
24+
: ExecutionPolicy(devices_dispatch_modes), dt_(dt){};
2225
SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
2326
size_t frag_id,
2427
size_t frag_num) const override;
25-
std::vector<ExecutorDeviceType> devices() const override;
28+
std::set<ExecutorDeviceType> devices() const override;
2629
std::string name() const override { return "ExecutionPolicy::FragmentIDAssignment"; };
2730

2831
private:

omniscidb/QueryEngine/CostModel/Dispatchers/ExecutionPolicy.h

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,45 @@ struct SchedulingAssignment {
2727
};
2828

2929
class ExecutionPolicy {
30+
std::map<ExecutorDeviceType, ExecutorDispatchMode> devices_dispatch_modes_;
31+
3032
public:
33+
ExecutionPolicy(
34+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
35+
: devices_dispatch_modes_(devices_dispatch_modes){};
3136
virtual SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
3237
size_t frag_id,
3338
size_t frag_num) const = 0;
34-
virtual std::vector<ExecutorDeviceType> devices() const {
35-
return {ExecutorDeviceType::CPU, ExecutorDeviceType::GPU};
39+
40+
virtual std::set<ExecutorDeviceType> devices() const {
41+
std::set<ExecutorDeviceType> res;
42+
for (const auto& dt_mode : devices_dispatch_modes_) {
43+
res.insert(dt_mode.first);
44+
}
45+
return res;
46+
}
47+
48+
virtual bool hasDevice(const ExecutorDeviceType dt) const {
49+
return (devices_dispatch_modes_.count(dt) != 0);
50+
}
51+
52+
virtual ExecutorDispatchMode getExecutionMode(const ExecutorDeviceType dt) const {
53+
CHECK(hasDevice(dt));
54+
return devices_dispatch_modes_.at(dt);
55+
}
56+
57+
virtual std::map<ExecutorDeviceType, ExecutorDispatchMode> getExecutionModes() const {
58+
return devices_dispatch_modes_;
3659
}
3760
virtual std::string name() const = 0;
3861

3962
virtual ~ExecutionPolicy() = default;
40-
41-
// Probe/modify modes during kernel building (do not iterate). These are the default
42-
// modes.
43-
std::unordered_map<ExecutorDeviceType, ExecutorDispatchMode> devices_dispatch_modes{
44-
{ExecutorDeviceType::CPU, ExecutorDispatchMode::KernelPerFragment},
45-
{ExecutorDeviceType::GPU, ExecutorDispatchMode::KernelPerFragment}};
4663
};
4764

4865
inline std::ostream& operator<<(std::ostream& os, const ExecutionPolicy& policy) {
4966
os << policy.name() << "\n";
5067
os << "Dispatching modes: \n";
51-
for (const auto& device_disp_mode : policy.devices_dispatch_modes) {
68+
for (const auto& device_disp_mode : policy.getExecutionModes()) {
5269
os << device_disp_mode.first << " - " << device_disp_mode.second << "\n";
5370
}
5471
return os;

omniscidb/QueryEngine/CostModel/Dispatchers/ProportionBasedExecutionPolicy.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
namespace policy {
2020

2121
ProportionBasedExecutionPolicy::ProportionBasedExecutionPolicy(
22-
std::map<ExecutorDeviceType, unsigned>&& propotion) {
23-
CHECK_GT(propotion.size(), 0u);
24-
proportion_.merge(propotion);
22+
std::map<ExecutorDeviceType, unsigned>&& proportion,
23+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
24+
: ExecutionPolicy(devices_dispatch_modes) {
25+
CHECK_GT(proportion.size(), 0u);
26+
proportion_.merge(proportion);
2527
total_parts_ = std::accumulate(
2628
proportion_.begin(), proportion_.end(), 0u, [](unsigned acc, auto& cur) {
2729
return acc + cur.second;

omniscidb/QueryEngine/CostModel/Dispatchers/ProportionBasedExecutionPolicy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ namespace policy {
2424
*/
2525
class ProportionBasedExecutionPolicy : public ExecutionPolicy {
2626
public:
27-
ProportionBasedExecutionPolicy(std::map<ExecutorDeviceType, unsigned>&& proportion);
27+
ProportionBasedExecutionPolicy(
28+
std::map<ExecutorDeviceType, unsigned>&& proportion,
29+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes);
2830
SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
2931
size_t frag_id,
3032
size_t frag_num) const override;

omniscidb/QueryEngine/CostModel/Dispatchers/RRExecutionPolicy.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
namespace policy {
1919
class RoundRobinExecutionPolicy : public ExecutionPolicy {
2020
public:
21+
RoundRobinExecutionPolicy(
22+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
23+
: ExecutionPolicy(devices_dispatch_modes){};
24+
2125
SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
2226
size_t frag_id,
2327
size_t frag_num) const override;

omniscidb/QueryEngine/CostModel/IterativeCostModel.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ IterativeCostModel::IterativeCostModel()
3131
#endif
3232

3333
std::unique_ptr<policy::ExecutionPolicy> IterativeCostModel::predict(
34-
QueryInfo query_info) const {
34+
QueryInfo query_info,
35+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
36+
const {
3537
std::shared_lock<std::shared_mutex> l(latch_);
3638

3739
unsigned cpu_prop = 1, gpu_prop = 0;
@@ -74,6 +76,7 @@ std::unique_ptr<policy::ExecutionPolicy> IterativeCostModel::predict(
7476
proportion[ExecutorDeviceType::GPU] = gpu_prop;
7577
proportion[ExecutorDeviceType::CPU] = cpu_prop;
7678

77-
return std::make_unique<policy::ProportionBasedExecutionPolicy>(std::move(proportion));
79+
return std::make_unique<policy::ProportionBasedExecutionPolicy>(std::move(proportion),
80+
devices_dispatch_modes);
7881
}
7982
} // namespace costmodel

omniscidb/QueryEngine/CostModel/IterativeCostModel.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ class IterativeCostModel : public CostModel {
2424
IterativeCostModel();
2525
IterativeCostModel(CostModelConfig config) : CostModel(std::move(config)) {}
2626

27-
virtual std::unique_ptr<policy::ExecutionPolicy> predict(QueryInfo query_info) const;
27+
virtual std::unique_ptr<policy::ExecutionPolicy> predict(
28+
QueryInfo query_info,
29+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
30+
const;
2831

2932
private:
3033
static constexpr size_t optimization_iterations_ = 1024;

omniscidb/QueryEngine/Descriptors/QueryFragmentDescriptor.cpp

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
9999
Executor* executor,
100100
compiler::CodegenTraitsDescriptor cgen_traits_desc) {
101101
const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
102-
LOG(INFO) << *policy;
102+
LOG(INFO) << "Building Kernel Fragment table with policy: " << *policy;
103103
for (size_t frag_id = 0; frag_id < fragments->size(); frag_id++) {
104104
if (!allowed_outer_fragment_indices_.empty()) {
105105
if (std::find(allowed_outer_fragment_indices_.begin(),
@@ -162,7 +162,7 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
162162
const auto table_id = ra_exe_unit.input_descs[table_desc_idx].getTableId();
163163
auto table_frags_it = selected_tables_fragments_.find({db_id, table_id});
164164
CHECK(table_frags_it != selected_tables_fragments_.end());
165-
if (policy->devices_dispatch_modes.at(device_type) ==
165+
if (policy->getExecutionMode(device_type) ==
166166
ExecutorDispatchMode::KernelPerFragment) {
167167
execution_kernel_desc.fragments.emplace_back(
168168
FragmentsPerTable{db_id, table_id, frag_ids});
@@ -198,7 +198,7 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
198198
}
199199
LOG(DEBUG1) << "Assigning frag_id=" << frag_id << "/" << fragments->size() - 1
200200
<< " to " << device_type << ", device_id=" << device_id;
201-
if (policy->devices_dispatch_modes.at(device_type) ==
201+
if (policy->getExecutionMode(device_type) ==
202202
ExecutorDispatchMode::KernelPerFragment) {
203203
auto itr = execution_kernels_per_device_[device_type].find(device_id);
204204
if (itr == execution_kernels_per_device_[device_type].end()) {
@@ -237,31 +237,36 @@ void QueryFragmentDescriptor::buildFragmentPerKernelMapForUnion(
237237
j,
238238
executor,
239239
cgen_traits_desc);
240-
241-
std::vector<int> table_cpu_ids =
242-
std::accumulate(execution_kernels_per_device_[ExecutorDeviceType::CPU][0].begin(),
243-
execution_kernels_per_device_[ExecutorDeviceType::CPU][0].end(),
244-
std::vector<int>(),
245-
[](auto&& vec, auto& exe_kern) {
246-
vec.push_back(exe_kern.fragments[0].table_id);
247-
return vec;
248-
});
249-
std::vector<int> table_gpu_ids =
250-
std::accumulate(execution_kernels_per_device_[ExecutorDeviceType::GPU][0].begin(),
251-
execution_kernels_per_device_[ExecutorDeviceType::GPU][0].end(),
252-
std::vector<int>(),
253-
[](auto&& vec, auto& exe_kern) {
254-
vec.push_back(exe_kern.fragments[0].table_id);
255-
return vec;
256-
});
257-
VLOG(1) << "execution_kernels_per_device_[CPU].size()="
258-
<< execution_kernels_per_device_[ExecutorDeviceType::CPU].size()
259-
<< " execution_kernels_per_device_[CPU][0][*].fragments[0].table_id="
260-
<< shared::printContainer(table_cpu_ids);
261-
VLOG(1) << "execution_kernels_per_device_[GPU].size()="
262-
<< execution_kernels_per_device_[ExecutorDeviceType::GPU].size()
263-
<< " execution_kernels_per_device_[GPU][0][*].fragments[0].table_id="
264-
<< shared::printContainer(table_gpu_ids);
240+
if (policy->hasDevice(ExecutorDeviceType::CPU)) {
241+
CHECK(execution_kernels_per_device_.count(ExecutorDeviceType::CPU));
242+
std::vector<int> table_cpu_ids = std::accumulate(
243+
execution_kernels_per_device_.at(ExecutorDeviceType::CPU)[0].begin(),
244+
execution_kernels_per_device_.at(ExecutorDeviceType::CPU)[0].end(),
245+
std::vector<int>(),
246+
[](auto&& vec, auto& exe_kern) {
247+
vec.push_back(exe_kern.fragments[0].table_id);
248+
return vec;
249+
});
250+
VLOG(1) << "execution_kernels_per_device_[CPU].size()="
251+
<< execution_kernels_per_device_.at(ExecutorDeviceType::CPU).size()
252+
<< " execution_kernels_per_device_[CPU][0][*].fragments[0].table_id="
253+
<< shared::printContainer(table_cpu_ids);
254+
}
255+
if (policy->hasDevice(ExecutorDeviceType::GPU)) {
256+
CHECK(execution_kernels_per_device_.count(ExecutorDeviceType::GPU));
257+
std::vector<int> table_gpu_ids = std::accumulate(
258+
execution_kernels_per_device_.at(ExecutorDeviceType::GPU)[0].begin(),
259+
execution_kernels_per_device_.at(ExecutorDeviceType::GPU)[0].end(),
260+
std::vector<int>(),
261+
[](auto&& vec, auto& exe_kern) {
262+
vec.push_back(exe_kern.fragments[0].table_id);
263+
return vec;
264+
});
265+
VLOG(1) << "execution_kernels_per_device_[GPU].size()="
266+
<< execution_kernels_per_device_.at(ExecutorDeviceType::GPU).size()
267+
<< " execution_kernels_per_device_[GPU][0][*].fragments[0].table_id="
268+
<< shared::printContainer(table_gpu_ids);
269+
}
265270
}
266271
}
267272

0 commit comments

Comments
 (0)