@@ -2797,66 +2797,109 @@ std::vector<std::unique_ptr<ExecutionKernel>> Executor::createHeterogeneousKerne
2797
2797
2798
2798
CHECK (!ra_exe_unit.input_descs .empty ());
2799
2799
2800
+ const bool use_multifrag_kernel = eo.allow_multifrag && is_agg;
2801
+
2800
2802
fragment_descriptor.buildFragmentKernelMap (ra_exe_unit,
2801
2803
shared_context.getFragOffsets (),
2802
2804
policy,
2803
2805
available_cpus + available_gpus.size (),
2804
- false , /* multifrag policy unsupported yet */
2806
+ use_multifrag_kernel,
2805
2807
this ,
2806
2808
co.codegen_traits_desc );
2807
2809
2808
- if (allow_single_frag_table_opt && query_mem_descs.count (ExecutorDeviceType::GPU) &&
2809
- (query_mem_descs.at (ExecutorDeviceType::GPU)->getQueryDescriptionType () ==
2810
- QueryDescriptionType::Projection) &&
2811
- table_infos.size () == 1 ) {
2812
- const auto max_frag_size = table_infos.front ().info .getFragmentNumTuplesUpperBound ();
2813
- if (max_frag_size < query_mem_descs.at (ExecutorDeviceType::GPU)->getEntryCount ()) {
2814
- LOG (INFO) << " Lowering scan limit from "
2815
- << query_mem_descs.at (ExecutorDeviceType::GPU)->getEntryCount ()
2816
- << " to match max fragment size " << max_frag_size
2817
- << " for kernel per fragment execution path." ;
2818
- throw CompilationRetryNewScanLimit (max_frag_size);
2810
+ if (use_multifrag_kernel) {
2811
+ LOG (INFO) << " use_multifrag_kernel=" << use_multifrag_kernel;
2812
+ size_t frag_list_idx{0 };
2813
+ auto multifrag_heterogeneous_kernel_dispatch =
2814
+ [&ra_exe_unit,
2815
+ &execution_kernels,
2816
+ &column_fetcher,
2817
+ &co,
2818
+ &eo,
2819
+ &frag_list_idx,
2820
+ &query_comp_descs,
2821
+ &query_mem_descs](const int device_id,
2822
+ const FragmentsList& frag_list,
2823
+ const int64_t rowid_lookup_key,
2824
+ const ExecutorDeviceType device_type) {
2825
+ if (!frag_list.size ()) {
2826
+ return ;
2827
+ }
2828
+ CHECK_GE (device_id, 0 );
2829
+
2830
+ execution_kernels.emplace_back (std::make_unique<ExecutionKernel>(
2831
+ ra_exe_unit,
2832
+ device_type,
2833
+ device_id,
2834
+ co,
2835
+ eo,
2836
+ column_fetcher,
2837
+ *query_comp_descs.at (device_type).get (),
2838
+ *query_mem_descs.at (device_type).get (),
2839
+ frag_list,
2840
+ device_type == ExecutorDeviceType::CPU
2841
+ ? ExecutorDispatchMode::KernelPerFragment
2842
+ : ExecutorDispatchMode::MultifragmentKernel,
2843
+ rowid_lookup_key));
2844
+
2845
+ ++frag_list_idx;
2846
+ };
2847
+ fragment_descriptor.assignFragsToMultiHeterogeneousDispatch (
2848
+ multifrag_heterogeneous_kernel_dispatch, ra_exe_unit);
2849
+ } else {
2850
+ if (allow_single_frag_table_opt && query_mem_descs.count (ExecutorDeviceType::GPU) &&
2851
+ (query_mem_descs.at (ExecutorDeviceType::GPU)->getQueryDescriptionType () ==
2852
+ QueryDescriptionType::Projection) &&
2853
+ table_infos.size () == 1 ) {
2854
+ const auto max_frag_size =
2855
+ table_infos.front ().info .getFragmentNumTuplesUpperBound ();
2856
+ if (max_frag_size < query_mem_descs.at (ExecutorDeviceType::GPU)->getEntryCount ()) {
2857
+ LOG (INFO) << " Lowering scan limit from "
2858
+ << query_mem_descs.at (ExecutorDeviceType::GPU)->getEntryCount ()
2859
+ << " to match max fragment size " << max_frag_size
2860
+ << " for kernel per fragment execution path." ;
2861
+ throw CompilationRetryNewScanLimit (max_frag_size);
2862
+ }
2819
2863
}
2820
- }
2821
2864
2822
- size_t frag_list_idx{0 };
2823
- auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2824
- &execution_kernels,
2825
- &column_fetcher,
2826
- &co,
2827
- &eo,
2828
- &frag_list_idx,
2829
- &query_comp_descs,
2830
- &query_mem_descs](
2831
- const int device_id,
2832
- const FragmentsList& frag_list,
2833
- const int64_t rowid_lookup_key,
2834
- const ExecutorDeviceType device_type) {
2835
- if (!frag_list.size ()) {
2836
- return ;
2837
- }
2838
- CHECK_GE (device_id, 0 );
2839
- CHECK (query_comp_descs.count (device_type));
2840
- CHECK (query_mem_descs.count (device_type));
2841
-
2842
- execution_kernels.emplace_back (
2843
- std::make_unique<ExecutionKernel>(ra_exe_unit,
2844
- device_type,
2845
- device_id,
2846
- co,
2847
- eo,
2848
- column_fetcher,
2849
- *query_comp_descs.at (device_type).get (),
2850
- *query_mem_descs.at (device_type).get (),
2851
- frag_list,
2852
- ExecutorDispatchMode::KernelPerFragment,
2853
- rowid_lookup_key));
2854
- ++frag_list_idx;
2855
- };
2865
+ size_t frag_list_idx{0 };
2866
+ auto fragment_per_kernel_dispatch = [&ra_exe_unit,
2867
+ &execution_kernels,
2868
+ &column_fetcher,
2869
+ &co,
2870
+ &eo,
2871
+ &frag_list_idx,
2872
+ &query_comp_descs,
2873
+ &query_mem_descs](
2874
+ const int device_id,
2875
+ const FragmentsList& frag_list,
2876
+ const int64_t rowid_lookup_key,
2877
+ const ExecutorDeviceType device_type) {
2878
+ if (!frag_list.size ()) {
2879
+ return ;
2880
+ }
2881
+ CHECK_GE (device_id, 0 );
2882
+ CHECK (query_comp_descs.count (device_type));
2883
+ CHECK (query_mem_descs.count (device_type));
2856
2884
2857
- fragment_descriptor.assignFragsToKernelDispatch (fragment_per_kernel_dispatch,
2858
- ra_exe_unit);
2885
+ execution_kernels.emplace_back (
2886
+ std::make_unique<ExecutionKernel>(ra_exe_unit,
2887
+ device_type,
2888
+ device_id,
2889
+ co,
2890
+ eo,
2891
+ column_fetcher,
2892
+ *query_comp_descs.at (device_type).get (),
2893
+ *query_mem_descs.at (device_type).get (),
2894
+ frag_list,
2895
+ ExecutorDispatchMode::KernelPerFragment,
2896
+ rowid_lookup_key));
2897
+ ++frag_list_idx;
2898
+ };
2859
2899
2900
+ fragment_descriptor.assignFragsToKernelDispatch (fragment_per_kernel_dispatch,
2901
+ ra_exe_unit);
2902
+ }
2860
2903
return execution_kernels;
2861
2904
}
2862
2905
0 commit comments