Skip to content

Commit 3be56de

Browse files
gitamohrpixar-oss
authored andcommitted
work: Provide malloc-tag specific task wrappers. Evidently just
having an extra unused pointer in the task impacts some internal perf tests (Internal change: 2394417)
1 parent 014de02 commit 3be56de

File tree

2 files changed

+127
-16
lines changed

2 files changed

+127
-16
lines changed

pxr/base/work/dispatcher.h

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,17 @@ class Work_Dispatcher
6161

6262
template <class Callable>
6363
inline void Run(Callable &&c) {
64-
_dispatcher.Run(
65-
_InvokerTask<typename std::remove_reference<Callable>::type>(
66-
std::forward<Callable>(c), &_errors));
64+
if (TfMallocTag::IsInitialized()) {
65+
_dispatcher.Run(
66+
_MallocTagsInvokerTask<
67+
typename std::remove_reference<Callable>::type>(
68+
std::forward<Callable>(c), &_errors));
69+
}
70+
else {
71+
_dispatcher.Run(
72+
_InvokerTask<typename std::remove_reference<Callable>::type>(
73+
std::forward<Callable>(c), &_errors));
74+
}
6775
}
6876

6977
template <class Callable, class A0, class ... Args>
@@ -103,20 +111,51 @@ class Work_Dispatcher
103111
template <class Fn>
104112
struct _InvokerTask {
105113
explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
114+
: _fn(std::move(fn))
115+
, _errors(err) {}
116+
117+
explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
118+
: _fn(fn)
119+
, _errors(err) {}
120+
121+
// Ensure only moves happen, no copies.
122+
_InvokerTask(_InvokerTask &&other) = default;
123+
_InvokerTask(const _InvokerTask &other) = delete;
124+
_InvokerTask &operator=(const _InvokerTask &other) = delete;
125+
126+
void operator()() const {
127+
TfErrorMark m;
128+
_fn();
129+
if (!m.IsClean())
130+
Work_Dispatcher::_TransportErrors(m, _errors);
131+
}
132+
private:
133+
Fn _fn;
134+
_ErrorTransports *_errors;
135+
};
136+
137+
// Function invoker helper that wraps the invocation with an ErrorMark so we
138+
// can transmit errors that occur back to the thread that Wait() s for tasks
139+
// to complete. This version also duplicates the caller's malloc tag stack
140+
// to the callee's thread.
141+
template <class Fn>
142+
struct _MallocTagsInvokerTask {
143+
explicit _MallocTagsInvokerTask(Fn &&fn, _ErrorTransports *err)
106144
: _fn(std::move(fn))
107145
, _errors(err)
108146
, _mallocTagStack(TfMallocTag::GetCurrentStackState())
109147
{}
110148

111-
explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
149+
explicit _MallocTagsInvokerTask(Fn const &fn, _ErrorTransports *err)
112150
: _fn(fn)
113151
, _errors(err)
114152
, _mallocTagStack(TfMallocTag::GetCurrentStackState()) {}
115153

116154
// Ensure only moves happen, no copies.
117-
_InvokerTask(_InvokerTask &&other) = default;
118-
_InvokerTask(const _InvokerTask &other) = delete;
119-
_InvokerTask &operator=(const _InvokerTask &other) = delete;
155+
_MallocTagsInvokerTask(_MallocTagsInvokerTask &&other) = default;
156+
_MallocTagsInvokerTask(const _MallocTagsInvokerTask &other) = delete;
157+
_MallocTagsInvokerTask &
158+
operator=(const _MallocTagsInvokerTask &other) = delete;
120159

121160
void operator()() const {
122161
TfErrorMark m;

pxr/base/work/loops.h

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,31 @@ class Work_LoopsTaskWrapper
3131
Fn &&callback,
3232
Work_ErrorTransports *errors)
3333
: _callback(callback)
34+
, _errors(errors) {}
35+
36+
template <typename ... Args>
37+
void operator()(Args&&... args) const {
38+
TfErrorMark m;
39+
_callback(std::forward<Args>(args)...);
40+
if (!m.IsClean()) {
41+
TfErrorTransport transport = m.Transport();
42+
_errors->grow_by(1)->swap(transport);
43+
}
44+
}
45+
46+
private:
47+
Fn & _callback;
48+
Work_ErrorTransports *_errors;
49+
};
50+
51+
template <class Fn>
52+
class Work_MallocTagsLoopsTaskWrapper
53+
{
54+
public:
55+
Work_MallocTagsLoopsTaskWrapper(
56+
Fn &&callback,
57+
Work_ErrorTransports *errors)
58+
: _callback(callback)
3459
, _errors(errors)
3560
, _mallocTagStack(TfMallocTag::GetCurrentStackState()) {}
3661

@@ -59,6 +84,31 @@ class Work_LoopsForEachTaskWrapper
5984
Fn &&callback,
6085
Work_ErrorTransports *errors)
6186
: _callback(callback)
87+
, _errors(errors) {}
88+
89+
template <typename Arg>
90+
void operator()(Arg &&arg) const {
91+
TfErrorMark m;
92+
_callback(std::forward<Arg>(arg));
93+
if (!m.IsClean()) {
94+
TfErrorTransport transport = m.Transport();
95+
_errors->grow_by(1)->swap(transport);
96+
}
97+
}
98+
99+
private:
100+
Fn & _callback;
101+
Work_ErrorTransports *_errors;
102+
};
103+
104+
template <class Fn>
105+
class Work_MallocTagsLoopsForEachTaskWrapper
106+
{
107+
public:
108+
Work_MallocTagsLoopsForEachTaskWrapper(
109+
Fn &&callback,
110+
Work_ErrorTransports *errors)
111+
: _callback(callback)
62112
, _errors(errors)
63113
, _mallocTagStack(TfMallocTag::GetCurrentStackState()) {}
64114

@@ -124,9 +174,17 @@ WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
124174
if (WorkHasConcurrency()) {
125175
PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
126176
Work_ErrorTransports errorTransports;
127-
Work_LoopsTaskWrapper<Fn>
128-
task(std::forward<Fn>(callback), &errorTransports);
129-
WorkImpl_ParallelForN(n, task, grainSize);
177+
if (TfMallocTag::IsInitialized()) {
178+
Work_MallocTagsLoopsTaskWrapper<Fn>
179+
task(std::forward<Fn>(callback), &errorTransports);
180+
WorkImpl_ParallelForN(n, task, grainSize);
181+
}
182+
else {
183+
Work_LoopsTaskWrapper<Fn>
184+
task(std::forward<Fn>(callback), &errorTransports);
185+
WorkImpl_ParallelForN(n, task, grainSize);
186+
}
187+
130188
for (auto &et: errorTransports) {
131189
et.Post();
132190
}
@@ -179,9 +237,16 @@ WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
179237
// dispatcher.
180238
#if defined WORK_IMPL_HAS_PARALLEL_FOR_TBB_RANGE
181239
Work_ErrorTransports errorTransports;
182-
Work_LoopsTaskWrapper<Fn>
183-
task(std::forward<Fn>(callback), &errorTransports);
184-
WorkImpl_ParallelForTBBRange(range, task);
240+
if (TfMallocTag::IsInitialized()) {
241+
Work_MallocTagsLoopsTaskWrapper<Fn>
242+
task(std::forward<Fn>(callback), &errorTransports);
243+
WorkImpl_ParallelForTBBRange(range, task);
244+
}
245+
else {
246+
Work_LoopsTaskWrapper<Fn>
247+
task(std::forward<Fn>(callback), &errorTransports);
248+
WorkImpl_ParallelForTBBRange(range, task);
249+
}
185250
for (auto &et: errorTransports) {
186251
et.Post();
187252
}
@@ -253,9 +318,16 @@ WorkParallelForEach(
253318
if (WorkHasConcurrency()) {
254319
PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
255320
Work_ErrorTransports errorTransports;
256-
Work_LoopsForEachTaskWrapper<Fn>
257-
task(std::forward<Fn>(fn), &errorTransports);
258-
WorkImpl_ParallelForEach(first, last, task);
321+
if (TfMallocTag::IsInitialized()) {
322+
Work_MallocTagsLoopsForEachTaskWrapper<Fn>
323+
task(std::forward<Fn>(fn), &errorTransports);
324+
WorkImpl_ParallelForEach(first, last, task);
325+
}
326+
else {
327+
Work_LoopsForEachTaskWrapper<Fn>
328+
task(std::forward<Fn>(fn), &errorTransports);
329+
WorkImpl_ParallelForEach(first, last, task);
330+
}
259331
for (auto &et: errorTransports) {
260332
et.Post();
261333
}

0 commit comments

Comments
 (0)