Skip to content

Commit 4ce73dc

Browse files
author
Raghuveer Devulapalli
committed
Add avx512_partition_unrolled for key-value sort
1 parent eafc4b8 commit 4ce73dc

File tree

1 file changed

+141
-1
lines changed

1 file changed

+141
-1
lines changed

src/avx512-64bit-keyvaluesort.hpp

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,146 @@ X86_SIMD_SORT_INLINE arrsize_t partition_avx512(type_t1 *keys,
182182
return l_store;
183183
}
184184

185+
template <typename vtype1,
186+
typename vtype2,
187+
int num_unroll,
188+
typename type_t1 = typename vtype1::type_t,
189+
typename type_t2 = typename vtype2::type_t,
190+
typename reg_t1 = typename vtype1::reg_t,
191+
typename reg_t2 = typename vtype2::reg_t>
192+
X86_SIMD_SORT_INLINE arrsize_t partition_avx512_unrolled(type_t1 *keys,
193+
type_t2 *indexes,
194+
arrsize_t left,
195+
arrsize_t right,
196+
type_t1 pivot,
197+
type_t1 *smallest,
198+
type_t1 *biggest)
199+
{
200+
if (right - left <= 8 * num_unroll * vtype1::numlanes) {
201+
return partition_avx512<vtype1, vtype2>(
202+
keys, indexes, left, right, pivot, smallest, biggest);
203+
}
204+
/* make array length divisible by vtype1::numlanes , shortening the array */
205+
for (int32_t i = ((right - left) % (num_unroll * vtype1::numlanes)); i > 0;
206+
--i) {
207+
*smallest = std::min(*smallest, keys[left]);
208+
*biggest = std::max(*biggest, keys[left]);
209+
if (keys[left] > pivot) {
210+
right--;
211+
std::swap(keys[left], keys[right]);
212+
std::swap(indexes[left], indexes[right]);
213+
}
214+
else {
215+
++left;
216+
}
217+
}
218+
219+
if (left == right) return left;
220+
221+
reg_t1 pivot_vec = vtype1::set1(pivot);
222+
reg_t1 min_vec = vtype1::set1(*smallest);
223+
reg_t1 max_vec = vtype1::set1(*biggest);
224+
225+
// first and last vtype1::numlanes values are partitioned at the end
226+
reg_t1 key_left[num_unroll], key_right[num_unroll];
227+
reg_t2 indx_left[num_unroll], indx_right[num_unroll];
228+
X86_SIMD_SORT_UNROLL_LOOP(8)
229+
for (int ii = 0; ii < num_unroll; ++ii) {
230+
indx_left[ii] = vtype2::loadu(indexes + left + vtype2::numlanes * ii);
231+
key_left[ii] = vtype1::loadu(keys + left + vtype1::numlanes * ii);
232+
indx_right[ii] = vtype2::loadu(
233+
indexes + (right - vtype2::numlanes * (num_unroll - ii)));
234+
key_right[ii] = vtype1::loadu(
235+
keys + (right - vtype1::numlanes * (num_unroll - ii)));
236+
}
237+
// store points of the vectors
238+
arrsize_t r_store = right - vtype1::numlanes;
239+
arrsize_t l_store = left;
240+
// indices for loading the elements
241+
left += num_unroll * vtype1::numlanes;
242+
right -= num_unroll * vtype1::numlanes;
243+
while (right - left != 0) {
244+
reg_t2 indx_vec[num_unroll];
245+
reg_t1 curr_vec[num_unroll];
246+
/*
247+
* if fewer elements are stored on the right side of the array,
248+
* then next elements are loaded from the right side,
249+
* otherwise from the left side
250+
*/
251+
if ((r_store + vtype1::numlanes) - right < left - l_store) {
252+
right -= num_unroll * vtype1::numlanes;
253+
X86_SIMD_SORT_UNROLL_LOOP(8)
254+
for (int ii = 0; ii < num_unroll; ++ii) {
255+
indx_vec[ii] = vtype2::loadu(indexes + right
256+
+ ii * vtype2::numlanes);
257+
curr_vec[ii]
258+
= vtype1::loadu(keys + right + ii * vtype1::numlanes);
259+
}
260+
}
261+
else {
262+
X86_SIMD_SORT_UNROLL_LOOP(8)
263+
for (int ii = 0; ii < num_unroll; ++ii) {
264+
indx_vec[ii]
265+
= vtype2::loadu(indexes + left + ii * vtype2::numlanes);
266+
curr_vec[ii]
267+
= vtype1::loadu(keys + left + ii * vtype1::numlanes);
268+
}
269+
left += num_unroll * vtype1::numlanes;
270+
}
271+
// partition the current vector and save it on both sides of the array
272+
X86_SIMD_SORT_UNROLL_LOOP(8)
273+
for (int ii = 0; ii < num_unroll; ++ii) {
274+
int32_t amount_gt_pivot
275+
= partition_vec<vtype1, vtype2>(keys,
276+
indexes,
277+
l_store,
278+
r_store + vtype1::numlanes,
279+
curr_vec[ii],
280+
indx_vec[ii],
281+
pivot_vec,
282+
&min_vec,
283+
&max_vec);
284+
l_store += (vtype1::numlanes - amount_gt_pivot);
285+
r_store -= amount_gt_pivot;
286+
}
287+
}
288+
289+
/* partition and save key_left and key_right */
290+
X86_SIMD_SORT_UNROLL_LOOP(8)
291+
for (int ii = 0; ii < num_unroll; ++ii) {
292+
int32_t amount_gt_pivot
293+
= partition_vec<vtype1, vtype2>(keys,
294+
indexes,
295+
l_store,
296+
r_store + vtype1::numlanes,
297+
key_left[ii],
298+
indx_left[ii],
299+
pivot_vec,
300+
&min_vec,
301+
&max_vec);
302+
l_store += (vtype1::numlanes - amount_gt_pivot);
303+
r_store -= amount_gt_pivot;
304+
}
305+
X86_SIMD_SORT_UNROLL_LOOP(8)
306+
for (int ii = 0; ii < num_unroll; ++ii) {
307+
int32_t amount_gt_pivot
308+
= partition_vec<vtype1, vtype2>(keys,
309+
indexes,
310+
l_store,
311+
r_store + vtype1::numlanes,
312+
key_right[ii],
313+
indx_right[ii],
314+
pivot_vec,
315+
&min_vec,
316+
&max_vec);
317+
l_store += (vtype1::numlanes - amount_gt_pivot);
318+
r_store -= amount_gt_pivot;
319+
}
320+
*smallest = vtype1::reducemin(min_vec);
321+
*biggest = vtype1::reducemax(max_vec);
322+
return l_store;
323+
}
324+
185325
template <typename vtype1,
186326
typename vtype2,
187327
typename type1_t = typename vtype1::type_t,
@@ -251,7 +391,7 @@ X86_SIMD_SORT_INLINE void qsort_64bit_(type1_t *keys,
251391
type1_t pivot = get_pivot_blocks<vtype1>(keys, left, right);
252392
type1_t smallest = vtype1::type_max();
253393
type1_t biggest = vtype1::type_min();
254-
arrsize_t pivot_index = partition_avx512<vtype1, vtype2>(
394+
arrsize_t pivot_index = partition_avx512_unrolled<vtype1, vtype2, 4>(
255395
keys, indexes, left, right + 1, pivot, &smallest, &biggest);
256396
if (pivot != smallest) {
257397
qsort_64bit_<vtype1, vtype2>(

0 commit comments

Comments
 (0)