Skip to content

Commit df05b9d

Browse files
committed
add floyd-rivest and parallelization to gmedian
1 parent b0c4ac3 commit df05b9d

File tree

3 files changed

+121
-22
lines changed

3 files changed

+121
-22
lines changed

src/data.table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ SEXP bmerge(SEXP idt, SEXP xdt, SEXP icolsArg, SEXP xcolsArg,
229229
double dquickselect(double *x, int n);
230230
double iquickselect(int *x, int n);
231231
double i64quickselect(int64_t *x, int n);
232+
double dfloyd_rivest(double *x, int n);
233+
double ifloyd_rivest(int *x, int n);
234+
double i64floyd_rivest(int64_t *x, int n);
232235

233236
// fread.c
234237
double wallclock(void);

src/gsumm.c

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -880,43 +880,68 @@ SEXP gmedian(SEXP x, SEXP narmArg) {
880880
const bool nosubset = irowslen==-1;
881881
switch(TYPEOF(x)) {
882882
case REALSXP: {
883-
double *subd = REAL(PROTECT(allocVector(REALSXP, maxgrpn))); // allocate once upfront and reuse
884883
int64_t *xi64 = (int64_t *)REAL(x);
885884
double *xd = REAL(x);
886-
for (int i=0; i<ngrp; ++i) {
887-
int thisgrpsize = grpsize[i], nacount=0;
888-
for (int j=0; j<thisgrpsize; ++j) {
889-
int k = ff[i]+j-1;
890-
if (isunsorted) k = oo[k]-1;
891-
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
892-
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
893-
else subd[j-nacount] = xd[k];
885+
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
886+
{
887+
double *thread_subd = malloc(maxgrpn * sizeof(double));
888+
if (!thread_subd) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
889+
#pragma omp for
890+
for (int i=0; i<ngrp; ++i) {
891+
int thisgrpsize = grpsize[i], nacount=0;
892+
for (int j=0; j<thisgrpsize; ++j) {
893+
int k = ff[i]+j-1;
894+
if (isunsorted) k = oo[k]-1;
895+
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
896+
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
897+
else thread_subd[j-nacount] = xd[k];
898+
}
899+
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
900+
if (nacount && !narm) {
901+
ansd[i] = NA_REAL;
902+
} else {
903+
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
904+
ansd[i] = isInt64 ?
905+
(thisgrpsize > 100 ? i64floyd_rivest((int64_t *)thread_subd, thisgrpsize) : i64quickselect((int64_t *)thread_subd, thisgrpsize)) :
906+
(thisgrpsize > 100 ? dfloyd_rivest(thread_subd, thisgrpsize) : dquickselect(thread_subd, thisgrpsize));
907+
}
894908
}
895-
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
896-
ansd[i] = (nacount && !narm) ? NA_REAL : (isInt64 ? i64quickselect((void *)subd, thisgrpsize) : dquickselect(subd, thisgrpsize));
909+
free(thread_subd);
897910
}}
898911
break;
899912
case LGLSXP: case INTSXP: {
900-
int *subi = INTEGER(PROTECT(allocVector(INTSXP, maxgrpn)));
901913
int *xi = INTEGER(x);
902-
for (int i=0; i<ngrp; i++) {
903-
const int thisgrpsize = grpsize[i];
904-
int nacount=0;
905-
for (int j=0; j<thisgrpsize; ++j) {
906-
int k = ff[i]+j-1;
907-
if (isunsorted) k = oo[k]-1;
908-
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
909-
else subi[j-nacount] = xi[k];
914+
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
915+
{
916+
int *thread_subi = malloc(maxgrpn * sizeof(int));
917+
if (!thread_subi) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
918+
#pragma omp for
919+
for (int i=0; i<ngrp; i++) {
920+
const int thisgrpsize = grpsize[i];
921+
int nacount=0;
922+
for (int j=0; j<thisgrpsize; ++j) {
923+
int k = ff[i]+j-1;
924+
if (isunsorted) k = oo[k]-1;
925+
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
926+
else thread_subi[j-nacount] = xi[k];
927+
}
928+
if (nacount && !narm) {
929+
ansd[i] = NA_REAL;
930+
} else {
931+
const int validsize = thisgrpsize-nacount;
932+
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
933+
ansd[i] = validsize > 100 ? ifloyd_rivest(thread_subi, validsize) : iquickselect(thread_subi, validsize);
934+
}
910935
}
911-
ansd[i] = (nacount && !narm) ? NA_REAL : iquickselect(subi, thisgrpsize-nacount);
936+
free(thread_subi);
912937
}}
913938
break;
914939
default:
915940
error(_("Type '%s' is not supported by GForce %s. Either add the prefix %s or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x)), "median (gmedian)", "stats::median(.)");
916941
}
917942
if (!isInt64) copyMostAttrib(x, ans);
918943
// else the integer64 class needs to be dropped since double is always returned by gmedian
919-
UNPROTECT(2); // ans, subd|subi
944+
UNPROTECT(1); // ans
920945
return ans;
921946
}
922947

src/quickselect.c

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,74 @@ double i64quickselect(int64_t *x, int n)
7171
int64_t a, b;
7272
BODY(i64swap);
7373
}
74+
75+
// Floyd-Rivest selection algorithm
76+
77+
#undef FLOYD_RIVEST_BODY
78+
#define FLOYD_RIVEST_BODY(SWAP, TYPE) \
79+
if (n == 0) return NA_REAL; \
80+
unsigned long med = n / 2 - (n % 2 == 0); \
81+
unsigned long l = 0, ir = n - 1; \
82+
while (ir > l) { \
83+
if (ir - l > 600) { \
84+
unsigned long nn = ir - l + 1; \
85+
unsigned long i = med - l + 1; \
86+
double z = log((double)nn); \
87+
double s = 0.5 * exp(2.0*z/3.0); \
88+
double sd = 0.5 * sqrt(z*s*(nn-s)/nn); \
89+
if (i < nn/2) sd = -sd; \
90+
unsigned long newL = (unsigned long)(MAX((long)l, (long)(med - i*s/nn + sd))); \
91+
unsigned long newR = (unsigned long)(MIN((long)ir, (long)(med + (nn-i)*s/nn + sd))); \
92+
l = newL; \
93+
ir = newR; \
94+
} \
95+
TYPE pivot = x[med]; \
96+
unsigned long i = l, j = ir; \
97+
SWAP(x + l, x + med); \
98+
if (x[ir] > pivot) { \
99+
SWAP(x + ir, x + l); \
100+
pivot = x[l]; \
101+
} \
102+
while (i < j) { \
103+
SWAP(x + i, x + j); \
104+
i++; j--; \
105+
while (x[i] < pivot) i++; \
106+
while (x[j] > pivot) j--; \
107+
} \
108+
if (x[l] == pivot) { \
109+
SWAP(x + l, x + j); \
110+
} else { \
111+
j++; \
112+
SWAP(x + j, x + ir); \
113+
} \
114+
if (j <= med) l = j + 1; \
115+
if (med <= j) ir = j - 1; \
116+
} \
117+
a = x[med]; \
118+
if (n % 2 == 1) { \
119+
return (double)a; \
120+
} else { \
121+
b = x[med + 1]; \
122+
for (unsigned long i = med + 2; i < n; i++) { \
123+
if (x[i] < b) b = x[i]; \
124+
} \
125+
return ((double)a + (double)b) / 2.0; \
126+
}
127+
128+
double dfloyd_rivest(double *x, int n)
129+
{
130+
double a, b;
131+
FLOYD_RIVEST_BODY(dswap, double);
132+
}
133+
134+
double ifloyd_rivest(int *x, int n)
135+
{
136+
int a, b;
137+
FLOYD_RIVEST_BODY(iswap, int);
138+
}
139+
140+
double i64floyd_rivest(int64_t *x, int n)
141+
{
142+
int64_t a, b;
143+
FLOYD_RIVEST_BODY(i64swap, int64_t);
144+
}

0 commit comments

Comments
 (0)