@@ -374,10 +374,12 @@ xfarray_load_next(
374
374
# define xfarray_sort_bump_loads (si ) do { (si)->loads++; } while (0)
375
375
# define xfarray_sort_bump_stores (si ) do { (si)->stores++; } while (0)
376
376
# define xfarray_sort_bump_compares (si ) do { (si)->compares++; } while (0)
377
+ # define xfarray_sort_bump_heapsorts (si ) do { (si)->heapsorts++; } while (0)
377
378
#else
378
379
# define xfarray_sort_bump_loads (si )
379
380
# define xfarray_sort_bump_stores (si )
380
381
# define xfarray_sort_bump_compares (si )
382
+ # define xfarray_sort_bump_heapsorts (si )
381
383
#endif /* DEBUG */
382
384
383
385
/* Load an array element for sorting. */
@@ -440,15 +442,19 @@ xfarray_sortinfo_alloc(
440
442
/*
441
443
* Tail-call recursion during the partitioning phase means that
442
444
* quicksort will never recurse more than log2(nr) times. We need one
443
- * extra level of stack to hold the initial parameters.
445
+ * extra level of stack to hold the initial parameters. In-memory
446
+ * sort will always take care of the last few levels of recursion for
447
+ * us, so we can reduce the stack depth by that much.
444
448
*/
445
- max_stack_depth = ilog2 (array -> nr ) + 1 ;
449
+ max_stack_depth = ilog2 (array -> nr ) + 1 - (XFARRAY_ISORT_SHIFT - 1 );
450
+ if (max_stack_depth < 1 )
451
+ max_stack_depth = 1 ;
446
452
447
453
/* Each level of quicksort uses a lo and a hi index */
448
454
nr_bytes += max_stack_depth * sizeof (xfarray_idx_t ) * 2 ;
449
455
450
- /* One record for the pivot */
451
- nr_bytes += array -> obj_size ;
456
+ /* Scratchpad for in-memory sort, or one record for the pivot */
457
+ nr_bytes += ( XFARRAY_ISORT_NR * array -> obj_size ) ;
452
458
453
459
si = kvzalloc (nr_bytes , XCHK_GFP_FLAGS );
454
460
if (!si )
@@ -490,18 +496,18 @@ xfarray_sort_terminated(
490
496
return false;
491
497
}
492
498
493
- /* Do we want an insertion sort? */
499
+ /* Do we want an in-memory sort? */
494
500
static inline bool
495
501
xfarray_want_isort (
496
502
struct xfarray_sortinfo * si ,
497
503
xfarray_idx_t start ,
498
504
xfarray_idx_t end )
499
505
{
500
506
/*
501
- * For array subsets smaller than 8 elements , it's slightly faster to
502
- * use insertion sort than quicksort's stack machine.
507
+ * For array subsets that fit in the scratchpad , it's much faster to
508
+ * use the kernel's heapsort than quicksort's stack machine.
503
509
*/
504
- return (end - start ) < 8 ;
510
+ return (end - start ) < XFARRAY_ISORT_NR ;
505
511
}
506
512
507
513
/* Return the scratch space within the sortinfo structure. */
@@ -511,125 +517,32 @@ static inline void *xfarray_sortinfo_isort_scratch(struct xfarray_sortinfo *si)
511
517
}
512
518
513
519
/*
514
- * Perform an insertion sort on a subset of the array.
515
- * Though insertion sort is an O(n^2) algorithm, for small set sizes it's
516
- * faster than quicksort's stack machine, so we let it take over for that.
517
- * This ought to be replaced with something more efficient.
520
+ * Sort a small number of array records using scratchpad memory. The records
521
+ * need not be contiguous in the xfile's memory pages.
518
522
*/
519
523
STATIC int
520
524
xfarray_isort (
521
525
struct xfarray_sortinfo * si ,
522
526
xfarray_idx_t lo ,
523
527
xfarray_idx_t hi )
524
528
{
525
- void * a = xfarray_sortinfo_isort_scratch (si );
526
- void * b = xfarray_scratch (si -> array );
527
- xfarray_idx_t tmp ;
528
- xfarray_idx_t i ;
529
- xfarray_idx_t run ;
529
+ void * scratch = xfarray_sortinfo_isort_scratch (si );
530
+ loff_t lo_pos = xfarray_pos (si -> array , lo );
531
+ loff_t len = xfarray_pos (si -> array , hi - lo + 1 );
530
532
int error ;
531
533
532
534
trace_xfarray_isort (si , lo , hi );
533
535
534
- /*
535
- * Move the smallest element in a[lo..hi] to a[lo]. This
536
- * simplifies the loop control logic below.
537
- */
538
- tmp = lo ;
539
- error = xfarray_sort_load (si , tmp , b );
536
+ xfarray_sort_bump_loads (si );
537
+ error = xfile_obj_load (si -> array -> xfile , scratch , len , lo_pos );
540
538
if (error )
541
539
return error ;
542
- for (run = lo + 1 ; run <= hi ; run ++ ) {
543
- /* if a[run] < a[tmp], tmp = run */
544
- error = xfarray_sort_load (si , run , a );
545
- if (error )
546
- return error ;
547
- if (xfarray_sort_cmp (si , a , b ) < 0 ) {
548
- tmp = run ;
549
- memcpy (b , a , si -> array -> obj_size );
550
- }
551
540
552
- if (xfarray_sort_terminated (si , & error ))
553
- return error ;
554
- }
541
+ xfarray_sort_bump_heapsorts (si );
542
+ sort (scratch , hi - lo + 1 , si -> array -> obj_size , si -> cmp_fn , NULL );
555
543
556
- /*
557
- * The smallest element is a[tmp]; swap with a[lo] if tmp != lo.
558
- * Recall that a[tmp] is already in *b.
559
- */
560
- if (tmp != lo ) {
561
- error = xfarray_sort_load (si , lo , a );
562
- if (error )
563
- return error ;
564
- error = xfarray_sort_store (si , tmp , a );
565
- if (error )
566
- return error ;
567
- error = xfarray_sort_store (si , lo , b );
568
- if (error )
569
- return error ;
570
- }
571
-
572
- /*
573
- * Perform an insertion sort on a[lo+1..hi]. We already made sure
574
- * that the smallest value in the original range is now in a[lo],
575
- * so the inner loop should never underflow.
576
- *
577
- * For each a[lo+2..hi], make sure it's in the correct position
578
- * with respect to the elements that came before it.
579
- */
580
- for (run = lo + 2 ; run <= hi ; run ++ ) {
581
- error = xfarray_sort_load (si , run , a );
582
- if (error )
583
- return error ;
584
-
585
- /*
586
- * Find the correct place for a[run] by walking leftwards
587
- * towards the start of the range until a[tmp] is no longer
588
- * greater than a[run].
589
- */
590
- tmp = run - 1 ;
591
- error = xfarray_sort_load (si , tmp , b );
592
- if (error )
593
- return error ;
594
- while (xfarray_sort_cmp (si , a , b ) < 0 ) {
595
- tmp -- ;
596
- error = xfarray_sort_load (si , tmp , b );
597
- if (error )
598
- return error ;
599
-
600
- if (xfarray_sort_terminated (si , & error ))
601
- return error ;
602
- }
603
- tmp ++ ;
604
-
605
- /*
606
- * If tmp != run, then a[tmp..run-1] are all less than a[run],
607
- * so right barrel roll a[tmp..run] to get this range in
608
- * sorted order.
609
- */
610
- if (tmp == run )
611
- continue ;
612
-
613
- for (i = run ; i >= tmp ; i -- ) {
614
- error = xfarray_sort_load (si , i - 1 , b );
615
- if (error )
616
- return error ;
617
- error = xfarray_sort_store (si , i , b );
618
- if (error )
619
- return error ;
620
-
621
- if (xfarray_sort_terminated (si , & error ))
622
- return error ;
623
- }
624
- error = xfarray_sort_store (si , tmp , a );
625
- if (error )
626
- return error ;
627
-
628
- if (xfarray_sort_terminated (si , & error ))
629
- return error ;
630
- }
631
-
632
- return 0 ;
544
+ xfarray_sort_bump_stores (si );
545
+ return xfile_obj_store (si -> array -> xfile , scratch , len , lo_pos );
633
546
}
634
547
635
548
/* Return a pointer to the xfarray pivot record within the sortinfo struct. */
@@ -783,9 +696,8 @@ xfarray_qsort_push(
783
696
* current stack frame. This guarantees that we won't need more than
784
697
* log2(nr) stack space.
785
698
*
786
- * 4. Use insertion sort for small sets since since insertion sort is faster
787
- * for small, mostly sorted array segments. In the author's experience,
788
- * substituting insertion sort for arrays smaller than 8 elements yields
699
+ * 4. For small sets, load the records into the scratchpad and run heapsort on
700
+ * them because that is very fast. In the author's experience, this yields
789
701
* a ~10% reduction in runtime.
790
702
*/
791
703
0 commit comments