Skip to content

Commit 8e9a431

Browse files
committed
move parallel reducer out of header
1 parent aeacb93 commit 8e9a431

File tree

7 files changed

+290
-171
lines changed

7 files changed

+290
-171
lines changed

RcppParallel.Rproj

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ Encoding: UTF-8
1313
RnwWeave: Sweave
1414
LaTeX: pdfLaTeX
1515

16+
AutoAppendNewline: Yes
17+
StripTrailingWhitespace: Yes
18+
1619
BuildType: Package
17-
PackageInstallArgs: --with-keep.source --clean
20+
PackageCleanBeforeInstall: No
21+
PackageInstallArgs: --with-keep.source
1822
PackageCheckArgs: --as-cran
1923
PackageRoxygenize: rd,collate,namespace

inst/include/RcppParallel/Common.h

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,57 @@ inline int resolveValue(const char* envvar,
1515
// if the requested value is non-zero and not the default, we can use it
1616
if (requestedValue != defaultValue && requestedValue > 0)
1717
return requestedValue;
18-
18+
1919
// otherwise, try reading the default from associated envvar
2020
// if the environment variable is unset, use the default
2121
const char* var = getenv(envvar);
2222
if (var == NULL)
2323
return defaultValue;
24-
24+
2525
// try to convert the string to a number
2626
// if an error occurs during conversion, just use default
2727
errno = 0;
2828
char* end;
2929
long value = strtol(var, &end, 10);
30-
30+
3131
// check for conversion failure
3232
if (end == var || *end != '\0' || errno == ERANGE)
3333
return defaultValue;
34-
35-
// okay, return the parsed environment variable value
34+
35+
// okay, return the parsed environment variable value
3636
return value;
3737
}
3838

39+
// Tag type used for disambiguating splitting constructors
40+
struct Split {};
41+
3942
// Work executed within a background thread. We implement dynamic
4043
// dispatch using vtables so we can have a stable type to cast
4144
// to from the void* passed to the worker thread (required because
4245
// the tinythreads interface allows to pass only a void* to the
4346
// thread main rather than a generic type / template)
44-
struct Worker
45-
{
47+
struct Worker
48+
{
4649
// construct and destruct (delete virtually)
4750
Worker() {}
4851
virtual ~Worker() {}
49-
52+
5053
// dispatch work over a range of values
51-
virtual void operator()(std::size_t begin, std::size_t end) = 0;
52-
53-
// disable copying and assignment
54+
virtual void operator()(std::size_t begin, std::size_t end) = 0;
55+
5456
private:
57+
// disable copying and assignment
5558
Worker(const Worker&);
5659
void operator=(const Worker&);
5760
};
5861

59-
// Tag type used for disambiguating splitting constructors
60-
struct Split {};
61-
6262
// Used for controlling the stack size for threads / tasks within a scope.
6363
class ThreadStackSizeControl
6464
{
6565
public:
6666
ThreadStackSizeControl();
6767
~ThreadStackSizeControl();
68-
68+
6969
private:
7070
// COPYING: not copyable
7171
ThreadStackSizeControl(const ThreadStackSizeControl&);

inst/include/RcppParallel/TBB.h

Lines changed: 83 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -13,110 +13,91 @@
1313

1414
namespace RcppParallel {
1515

16-
struct TBBWorker
17-
{
18-
explicit TBBWorker(Worker& worker) : worker_(worker) {}
19-
20-
void operator()(const tbb::blocked_range<size_t>& r) const {
21-
worker_(r.begin(), r.end());
22-
}
16+
// This class is primarily used to implement type erasure. The goals here were:
17+
//
18+
// 1. Hide the tbb symbols / implementation details from client R packages.
19+
// That is, they should get the tools they need only via RcppParallel.
20+
//
21+
// 2. Do this in a way that preserves binary compatibility with pre-existing
22+
// classes that make use of parallelReduce().
23+
//
24+
// 3. Ensure that those packages, when re-compiled without source changes,
25+
// can still function as expected.
26+
//
27+
// The downside here is that all the indirection through std::function<>
28+
// and the requirement for RTTI is probably expensive, but I couldn't find
29+
// a better way forward that could also preserve binary compatibility with
30+
// existing pre-built pacakges.
31+
//
32+
// Hopefully, in a future release, we can do away with this wrapper, once
33+
// packages have been rebuilt and no longer implicitly depend on TBB internals.
34+
struct ReducerWrapper {
2335

24-
private:
25-
Worker& worker_;
26-
};
27-
28-
template <typename Reducer>
29-
struct TBBReducer
30-
{
31-
explicit TBBReducer(Reducer& reducer)
32-
: pSplitReducer_(NULL), reducer_(reducer)
33-
{
34-
}
35-
36-
TBBReducer(TBBReducer& tbbReducer, tbb::split)
37-
: pSplitReducer_(new Reducer(tbbReducer.reducer_, RcppParallel::Split())),
38-
reducer_(*pSplitReducer_)
36+
template <typename T>
37+
ReducerWrapper(T* reducer)
3938
{
40-
}
41-
42-
virtual ~TBBReducer() { delete pSplitReducer_; }
39+
self_ = reinterpret_cast<void*>(reducer);
40+
owned_ = false;
4341

44-
void operator()(const tbb::blocked_range<size_t>& r) {
45-
reducer_(r.begin(), r.end());
46-
}
47-
48-
void join(const TBBReducer& tbbReducer) {
49-
reducer_.join(tbbReducer.reducer_);
42+
work_ = [&](void* self, std::size_t begin, std::size_t end)
43+
{
44+
(*reinterpret_cast<T*>(self))(begin, end);
45+
};
46+
47+
split_ = [&](void* object, Split split)
48+
{
49+
return new T(*reinterpret_cast<T*>(object), split);
50+
};
51+
52+
join_ = [&](void* self, void* other)
53+
{
54+
(*reinterpret_cast<T*>(self)).join(*reinterpret_cast<T*>(other));
55+
};
56+
57+
deleter_ = [&](void* object)
58+
{
59+
delete (T*) object;
60+
};
5061
}
51-
52-
private:
53-
Reducer* pSplitReducer_;
54-
Reducer& reducer_;
55-
};
5662

57-
template <typename Reducer>
58-
class TBBParallelReduceExecutor
59-
{
60-
public:
61-
62-
TBBParallelReduceExecutor(Reducer& reducer,
63-
std::size_t begin,
64-
std::size_t end,
65-
std::size_t grainSize)
66-
: reducer_(reducer),
67-
begin_(begin),
68-
end_(end),
69-
grainSize_(grainSize)
63+
~ReducerWrapper()
7064
{
65+
if (owned_)
66+
{
67+
deleter_(self_);
68+
self_ = nullptr;
69+
}
7170
}
72-
73-
void operator()() const
71+
72+
void operator()(std::size_t begin, std::size_t end) const
7473
{
75-
TBBReducer<Reducer> tbbReducer(reducer_);
76-
tbb::parallel_reduce(
77-
tbb::blocked_range<std::size_t>(begin_, end_, grainSize_),
78-
tbbReducer
79-
);
74+
work_(self_, begin, end);
8075
}
81-
82-
private:
83-
Reducer& reducer_;
84-
std::size_t begin_;
85-
std::size_t end_;
86-
std::size_t grainSize_;
87-
};
8876

89-
template <typename Reducer>
90-
class TBBArenaParallelReduceExecutor
91-
{
92-
public:
93-
94-
TBBArenaParallelReduceExecutor(tbb::task_group& group,
95-
Reducer& reducer,
96-
std::size_t begin,
97-
std::size_t end,
98-
std::size_t grainSize)
99-
: group_(group),
100-
reducer_(reducer),
101-
begin_(begin),
102-
end_(end),
103-
grainSize_(grainSize)
77+
ReducerWrapper(const ReducerWrapper& rhs, Split split)
10478
{
79+
self_ = rhs.split_(rhs.self_, split);
80+
owned_ = true;
81+
82+
work_ = rhs.work_;
83+
split_ = rhs.split_;
84+
join_ = rhs.join_;
85+
deleter_ = rhs.deleter_;
10586
}
106-
107-
void operator()() const
87+
88+
void join(const ReducerWrapper& rhs) const
10889
{
109-
TBBParallelReduceExecutor<Reducer> executor(reducer_, begin_, end_, grainSize_);
110-
group_.run_and_wait(executor);
90+
join_(self_, rhs.self_);
11191
}
112-
92+
11393
private:
114-
115-
tbb::task_group& group_;
116-
Reducer& reducer_;
117-
std::size_t begin_;
118-
std::size_t end_;
119-
std::size_t grainSize_;
94+
void* self_ = nullptr;
95+
bool owned_ = false;
96+
97+
std::function<void (void*, std::size_t, std::size_t)> work_;
98+
std::function<void*(void*, Split)> split_;
99+
std::function<void (void*, void*)> join_;
100+
std::function<void(void*)> deleter_;
120101
};
121102

122103
void tbbParallelFor(std::size_t begin,
@@ -125,20 +106,21 @@ void tbbParallelFor(std::size_t begin,
125106
std::size_t grainSize = 1,
126107
int numThreads = -1);
127108

109+
void tbbParallelReduceImpl(std::size_t begin,
110+
std::size_t end,
111+
ReducerWrapper& wrapper,
112+
std::size_t grainSize = 1,
113+
int numThreads = -1);
114+
128115
template <typename Reducer>
129-
inline void tbbParallelReduce(std::size_t begin,
130-
std::size_t end,
131-
Reducer& reducer,
132-
std::size_t grainSize = 1,
133-
int numThreads = -1)
116+
void tbbParallelReduce(std::size_t begin,
117+
std::size_t end,
118+
Reducer& reducer,
119+
std::size_t grainSize = 1,
120+
int numThreads = -1)
134121
{
135-
ThreadStackSizeControl control;
136-
137-
tbb::task_group group;
138-
TBBArenaParallelReduceExecutor<Reducer> executor(group, reducer, begin, end, grainSize);
139-
140-
tbb::task_arena arena(numThreads);
141-
arena.execute(executor);
122+
ReducerWrapper wrapper(&reducer);
123+
tbbParallelReduceImpl(begin, end, wrapper, grainSize, numThreads);
142124
}
143125

144126
} // namespace RcppParallel

inst/skeleton/vector-sum.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,37 @@ using namespace Rcpp;
1919
using namespace RcppParallel;
2020

2121
struct Sum : public Worker
22-
{
22+
{
2323
// source vector
2424
const RVector<double> input;
25-
25+
2626
// accumulated value
2727
double value;
28-
28+
2929
// constructors
3030
Sum(const NumericVector input) : input(input), value(0) {}
3131
Sum(const Sum& sum, Split) : input(sum.input), value(0) {}
32-
32+
3333
// accumulate just the element of the range I've been asked to
3434
void operator()(std::size_t begin, std::size_t end) {
3535
value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
3636
}
37-
37+
3838
// join my value with that of another Sum
39-
void join(const Sum& rhs) {
40-
value += rhs.value;
39+
void join(const Sum& rhs) {
40+
value += rhs.value;
4141
}
4242
};
4343

4444
// [[Rcpp::export]]
4545
double parallelVectorSum(NumericVector x) {
46-
47-
// declare the SumBody instance
46+
47+
// declare the SumBody instance
4848
Sum sum(x);
49-
49+
5050
// call parallel_reduce to start the work
5151
parallelReduce(0, x.length(), sum);
52-
52+
5353
// return the computed sum
5454
return sum.value;
5555
}

0 commit comments

Comments
 (0)