@@ -78,51 +78,35 @@ class QueryFragmentDescriptor {
78
78
void buildFragmentKernelMap (const RelAlgExecutionUnit& ra_exe_unit,
79
79
const std::vector<uint64_t >& frag_offsets,
80
80
const policy::ExecutionPolicy* policy,
81
- const int device_count,
82
- const bool enable_multifrag_kernels,
83
81
Executor* executor,
84
82
compiler::CodegenTraitsDescriptor cgen_traits_desc);
85
83
86
84
/* *
87
- * Dispatch multi-fragment kernels. Currently GPU only. Each GPU should have only one
88
- * kernel, with multiple fragments in its fragments list.
85
+ * Dispatch according to policy
89
86
*/
90
87
template <typename DISPATCH_FCN>
91
- void assignFragsToMultiDispatch (DISPATCH_FCN f) const {
92
- for (const auto & device_type_itr : execution_kernels_per_device_) {
93
- for (const auto & device_itr : device_type_itr.second ) {
94
- const auto & execution_kernels = device_itr.second ;
95
- CHECK_EQ (execution_kernels.size (), size_t (1 ));
96
-
97
- const auto & fragments_list = execution_kernels.front ().fragments ;
98
- f (device_itr.first , fragments_list, rowid_lookup_key_);
99
- }
100
- }
101
- }
102
-
103
- template <typename DISPATCH_FCN>
104
- void assignFragsToMultiHeterogeneousDispatch (
105
- DISPATCH_FCN dispatcher_f,
106
- const RelAlgExecutionUnit& ra_exe_unit) const {
107
- std::unordered_map<int , size_t > cpu_execution_kernel_index;
88
+ void dispatchKernelsToDevices (DISPATCH_FCN dispatcher_f,
89
+ const RelAlgExecutionUnit& ra_exe_unit,
90
+ policy::ExecutionPolicy* policy) const {
91
+ std::unordered_map<ExecutorDeviceType, std::unordered_map<int , size_t >>
92
+ execution_kernel_index;
108
93
size_t tuple_count = 0 ;
109
-
110
- if (execution_kernels_per_device_.count (ExecutorDeviceType::CPU)) {
111
- cpu_execution_kernel_index.reserve (
112
- execution_kernels_per_device_.at (ExecutorDeviceType::CPU).size ());
113
- for (const auto & device_itr :
114
- execution_kernels_per_device_.at (ExecutorDeviceType::CPU)) {
115
- CHECK (
116
- cpu_execution_kernel_index.insert (std::make_pair (device_itr.first , size_t (0 )))
117
- .second );
94
+ for (const auto & device_type_itr : execution_kernels_per_device_) {
95
+ if (policy->devices_dispatch_modes .at (device_type_itr.first ) ==
96
+ ExecutorDispatchMode::KernelPerFragment) {
97
+ for (const auto & device_itr : device_type_itr.second ) {
98
+ CHECK (execution_kernel_index[device_type_itr.first ]
99
+ .insert (std::make_pair (device_itr.first , size_t (0 )))
100
+ .second );
101
+ }
118
102
}
119
103
}
120
104
121
105
for (const auto & device_type_itr : execution_kernels_per_device_) {
122
- if (device_type_itr.first == ExecutorDeviceType::GPU) {
106
+ if (policy->devices_dispatch_modes .at (device_type_itr.first ) ==
107
+ ExecutorDispatchMode::MultifragmentKernel) {
123
108
for (const auto & device_itr : device_type_itr.second ) {
124
109
const auto & execution_kernels = device_itr.second ;
125
- CHECK_EQ (execution_kernels.size (), size_t (1 ));
126
110
const auto & fragments_list = execution_kernels.front ().fragments ;
127
111
dispatcher_f (
128
112
device_itr.first , fragments_list, rowid_lookup_key_, device_type_itr.first );
@@ -131,71 +115,27 @@ class QueryFragmentDescriptor {
131
115
bool dispatch_finished = false ;
132
116
while (!dispatch_finished) {
133
117
dispatch_finished = true ;
134
- for (const auto & device_itr : device_type_itr.second ) {
135
- auto & kernel_idx = cpu_execution_kernel_index[device_itr.first ];
136
- if (kernel_idx < device_itr.second .size ()) {
137
- dispatch_finished = false ;
138
- const auto & execution_kernel = device_itr.second [kernel_idx++];
139
- dispatcher_f (device_itr.first ,
140
- execution_kernel.fragments ,
141
- rowid_lookup_key_,
142
- device_type_itr.first );
143
- if (terminateDispatchMaybe (tuple_count, ra_exe_unit, execution_kernel)) {
144
- return ;
118
+ for (const auto & device_type_itr : execution_kernels_per_device_)
119
+ for (const auto & device_itr : device_type_itr.second ) {
120
+ auto & kernel_idx =
121
+ execution_kernel_index[device_type_itr.first ][device_itr.first ];
122
+ if (kernel_idx < device_itr.second .size ()) {
123
+ dispatch_finished = false ;
124
+ const auto & execution_kernel = device_itr.second [kernel_idx++];
125
+ dispatcher_f (device_itr.first ,
126
+ execution_kernel.fragments ,
127
+ rowid_lookup_key_,
128
+ device_type_itr.first );
129
+ if (terminateDispatchMaybe (tuple_count, ra_exe_unit, execution_kernel)) {
130
+ return ;
131
+ }
145
132
}
146
133
}
147
- }
148
134
}
149
135
}
150
136
}
151
137
}
152
138
153
- /* *
154
- * Dispatch one fragment for each device. Iterate the device map and dispatch one kernel
155
- * for each device per iteration. This allows balanced dispatch as well as early
156
- * termination if the number of rows passing the kernel can be computed at dispatch time
157
- * and the scan limit is reached.
158
- */
159
- template <typename DISPATCH_FCN>
160
- void assignFragsToKernelDispatch (DISPATCH_FCN f,
161
- const RelAlgExecutionUnit& ra_exe_unit) const {
162
- if (execution_kernels_per_device_.empty ()) {
163
- return ;
164
- }
165
-
166
- size_t tuple_count = 0 ;
167
-
168
- std::map<ExecutorDeviceType, std::unordered_map<int , size_t >> execution_kernel_index;
169
- for (const auto & device_type_itr : execution_kernels_per_device_) {
170
- for (const auto & device_itr : device_type_itr.second ) {
171
- CHECK (execution_kernel_index[device_type_itr.first ]
172
- .insert (std::make_pair (device_itr.first , size_t (0 )))
173
- .second );
174
- }
175
- }
176
-
177
- bool dispatch_finished = false ;
178
- while (!dispatch_finished) {
179
- dispatch_finished = true ;
180
- for (const auto & device_type_itr : execution_kernels_per_device_)
181
- for (const auto & device_itr : device_type_itr.second ) {
182
- auto & kernel_idx =
183
- execution_kernel_index[device_type_itr.first ][device_itr.first ];
184
- if (kernel_idx < device_itr.second .size ()) {
185
- dispatch_finished = false ;
186
- const auto & execution_kernel = device_itr.second [kernel_idx++];
187
- f (device_itr.first ,
188
- execution_kernel.fragments ,
189
- rowid_lookup_key_,
190
- device_type_itr.first );
191
- if (terminateDispatchMaybe (tuple_count, ra_exe_unit, execution_kernel)) {
192
- return ;
193
- }
194
- }
195
- }
196
- }
197
- }
198
-
199
139
bool shouldCheckWorkUnitWatchdog () const {
200
140
return rowid_lookup_key_ < 0 && !execution_kernels_per_device_.empty ();
201
141
}
@@ -218,23 +158,20 @@ class QueryFragmentDescriptor {
218
158
const RelAlgExecutionUnit& ra_exe_unit,
219
159
const std::vector<uint64_t >& frag_offsets,
220
160
const policy::ExecutionPolicy* policy,
221
- const int device_count,
222
161
const size_t num_bytes_for_row,
223
162
Executor* executor,
224
163
compiler::CodegenTraitsDescriptor cgen_traits_desc);
225
164
226
165
void buildFragmentPerKernelMap (const RelAlgExecutionUnit& ra_exe_unit,
227
166
const std::vector<uint64_t >& frag_offsets,
228
167
const policy::ExecutionPolicy* policy,
229
- const int device_count,
230
168
const size_t num_bytes_for_row,
231
169
Executor* executor,
232
170
compiler::CodegenTraitsDescriptor cgen_traits_desc);
233
171
234
172
void buildMultifragKernelMap (const RelAlgExecutionUnit& ra_exe_unit,
235
173
const std::vector<uint64_t >& frag_offsets,
236
174
const policy::ExecutionPolicy* policy,
237
- const int device_count,
238
175
const size_t num_bytes_for_row,
239
176
Executor* executor,
240
177
compiler::CodegenTraitsDescriptor cgen_traits_desc);
@@ -244,7 +181,6 @@ class QueryFragmentDescriptor {
244
181
const InputDescriptor& table_desc,
245
182
const std::vector<uint64_t >& frag_offsets,
246
183
const policy::ExecutionPolicy* policy,
247
- const int device_count,
248
184
const size_t num_bytes_for_row,
249
185
const std::optional<size_t > table_desc_offset,
250
186
Executor* executor,
0 commit comments