@@ -94,8 +94,8 @@ namespace SampleFlow
9494 *
9595 * The implementation of this class is thread-safe, i.e., its
9696 * consume() member function can be called concurrently and from multiple
97- * threads. However, it ensures that the action is executed only once
98- * at a time.
97+ * threads. A constructor argument determines whether the action is executed
98+ * only once at any given time, or whether it can be executed concurrently .
9999 *
100100 *
101101 * @tparam InputType The C++ type used for the samples $x_k$.
@@ -107,13 +107,19 @@ namespace SampleFlow
107107 /* *
108108 * Constructor. Take the action (a function object) as argument.
109109 *
110- * The second argument (defaulted to ParallelMode::synchronous) indicates
110+ * The second argument determines whether the action needs to be
111+ * protected by a mutex or whether the action can be executed multiple
112+ * times concurrently if there are several samples coming in in short
113+ * succession on different threads.
114+ *
115+ * The third argument (defaulted to ParallelMode::synchronous) indicates
111116 * whether incoming samples should be processed immediately, on the current
112117 * thread, or can be deferred to (possibly out of order) processing on
113118 * a separate thread. See ParallelMode for more information. Whether this
114119 * is possible or not depends on what the `action` function does.
115120 */
116121 Action (const std::function<void (InputType, AuxiliaryData)> &action,
122+ const bool allow_concurrent_action = false ,
117123 const ParallelMode supported_parallel_modes = ParallelMode::synchronous);
118124
119125 /* *
@@ -140,21 +146,25 @@ namespace SampleFlow
140146
141147 private:
142148 /* *
143- * A mutex used to lock access to all member variables when running
144- * on multiple threads .
149+ * A mutex used to synchronize the call to the action function, if so
150+ * desired by the caller .
145151 */
146152 mutable std::mutex mutex;
147153
154+ const bool allow_concurrent_action;
155+
148156 const std::function<void (InputType, AuxiliaryData)> action_function;
149157 };
150158
151159
152160 template <typename InputType>
153161 Action<InputType>::
154162 Action (const std::function<void (InputType, AuxiliaryData)> &action,
163+ const bool allow_concurrent_action,
155164 const ParallelMode supported_parallel_modes)
156165 :
157166 Consumer<InputType>(supported_parallel_modes),
167+ allow_concurrent_action (allow_concurrent_action),
158168 action_function (action)
159169 {}
160170
@@ -173,9 +183,13 @@ namespace SampleFlow
173183 Action<InputType>::
174184 consume (InputType sample, AuxiliaryData aux_data)
175185 {
176- std::lock_guard<std::mutex> lock (mutex);
177-
178- action_function (std::move (sample), std::move (aux_data));
186+ if (allow_concurrent_action)
187+ action_function (std::move (sample), std::move (aux_data));
188+ else
189+ {
190+ std::lock_guard<std::mutex> lock (mutex);
191+ action_function (std::move (sample), std::move (aux_data));
192+ }
179193 }
180194 }
181195}
0 commit comments