Skip to content

Commit 8a2df7c

Browse files
authored
Merge 41c21d7 into sapling-pr-archive-ktf
2 parents 727b768 + 41c21d7 commit 8a2df7c

File tree

1 file changed

+103
-85
lines changed

1 file changed

+103
-85
lines changed

Framework/Core/include/Framework/DataAllocator.h

Lines changed: 103 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,6 @@ namespace o2::framework
5757
{
5858
struct ServiceRegistry;
5959

60-
#define ERROR_STRING \
61-
"data type T not supported by API, " \
62-
"\n specializations available for" \
63-
"\n - trivially copyable, non-polymorphic structures" \
64-
"\n - arrays of those" \
65-
"\n - TObject with additional constructor arguments" \
66-
"\n - std containers of those"
67-
6860
/// Helper to allow framework managed objecs to have a callback
6961
/// when they go out of scope. For example, this could
7062
/// be used to serialize a message into a buffer before the
@@ -130,6 +122,10 @@ struct LifetimeHolder {
130122
}
131123
};
132124

125+
template <typename T>
126+
concept VectorOfMessageableTypes = is_specialization_v<T, std::vector> &&
127+
is_messageable<typename T::value_type>::value;
128+
133129
/// This allocator is responsible to make sure that the messages created match
134130
/// the provided spec and that depending on how many pipelined reader we
135131
/// have, messages get created on the channel for the reader of the current
@@ -163,93 +159,115 @@ class DataAllocator
163159
// and with subspecification 0xdeadbeef.
164160
void cookDeadBeef(const Output& spec);
165161

166-
/// Generic helper to create an object which is owned by the framework and
167-
/// returned as a reference to the own object.
168-
/// Note: decltype(auto) will deduce the return type from the expression and it
169-
/// will be lvalue reference for the framework-owned objects. Instances of local
170-
/// variables like shared_ptr will be returned by value/move/return value optimization.
171-
/// Objects created this way will be sent to the channel specified by @spec
172162
template <typename T, typename... Args>
163+
requires(is_specialization_v<T, UninitializedVector>)
173164
decltype(auto) make(const Output& spec, Args... args)
174165
{
175166
auto& timingInfo = mRegistry.get<TimingInfo>();
176167
auto& context = mRegistry.get<MessageContext>();
177168

178-
if constexpr (is_specialization_v<T, UninitializedVector>) {
179-
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
180-
// plain buffer as polymorphic spectator std::vector, which does not run constructors / destructors
181-
using ValueType = typename T::value_type;
182-
183-
// Note: initial payload size is 0 and will be set by the context before sending
184-
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
185-
return context.add<MessageContext::VectorObject<ValueType, MessageContext::ContainerRefObject<std::vector<ValueType, o2::pmr::NoConstructAllocator<ValueType>>>>>(
186-
std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...)
187-
.get();
188-
} else if constexpr (is_specialization_v<T, std::vector> && has_messageable_value_type<T>::value) {
189-
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
190-
// this catches all std::vector objects with messageable value type before checking if is also
191-
// has a root dictionary, so non-serialized transmission is preferred
192-
using ValueType = typename T::value_type;
193-
194-
// Note: initial payload size is 0 and will be set by the context before sending
195-
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
196-
return context.add<MessageContext::VectorObject<ValueType>>(std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...).get();
197-
} else if constexpr (has_root_dictionary<T>::value == true && is_messageable<T>::value == false) {
198-
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
199-
// Extended support for types implementing the Root ClassDef interface, both TObject
200-
// derived types and others
201-
if constexpr (enable_root_serialization<T>::value) {
202-
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodROOT, 0);
203-
204-
return context.add<typename enable_root_serialization<T>::object_type>(std::move(headerMessage), routeIndex, std::forward<Args>(args)...).get();
205-
} else {
206-
static_assert(enable_root_serialization<T>::value, "Please make sure you include RootMessageContext.h");
207-
}
208-
// Note: initial payload size is 0 and will be set by the context before sending
209-
} else if constexpr (std::is_base_of_v<std::string, T>) {
210-
auto* s = new std::string(args...);
211-
adopt(spec, s);
212-
return *s;
213-
} else if constexpr (requires { static_cast<struct TableBuilder>(std::declval<std::decay_t<T>>()); }) {
214-
auto tb = std::move(LifetimeHolder<TableBuilder>(new std::decay_t<T>(args...)));
215-
adopt(spec, tb);
216-
return tb;
217-
} else if constexpr (requires { static_cast<struct TreeToTable>(std::declval<std::decay_t<T>>()); }) {
218-
auto t2t = std::move(LifetimeHolder<TreeToTable>(new std::decay_t<T>(args...)));
219-
adopt(spec, t2t);
220-
return t2t;
221-
} else if constexpr (sizeof...(Args) == 0) {
222-
if constexpr (is_messageable<T>::value == true) {
223-
return *reinterpret_cast<T*>(newChunk(spec, sizeof(T)).data());
224-
} else {
225-
static_assert(always_static_assert_v<T>, ERROR_STRING);
226-
}
227-
} else if constexpr (sizeof...(Args) == 1) {
228-
using FirstArg = typename std::tuple_element<0, std::tuple<Args...>>::type;
229-
if constexpr (std::is_integral_v<FirstArg>) {
230-
if constexpr (is_messageable<T>::value == true) {
231-
auto [nElements] = std::make_tuple(args...);
232-
auto size = nElements * sizeof(T);
233-
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
234-
235-
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, size);
236-
return context.add<MessageContext::SpanObject<T>>(std::move(headerMessage), routeIndex, 0, nElements).get();
237-
}
238-
} else if constexpr (std::is_same_v<FirstArg, std::shared_ptr<arrow::Schema>>) {
239-
if constexpr (std::is_base_of_v<arrow::ipc::RecordBatchWriter, T>) {
240-
auto [schema] = std::make_tuple(args...);
241-
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
242-
create(spec, &writer, schema);
243-
return writer;
244-
}
245-
} else {
246-
static_assert(always_static_assert_v<T>, ERROR_STRING);
247-
}
169+
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
170+
// plain buffer as polymorphic spectator std::vector, which does not run constructors / destructors
171+
using ValueType = typename T::value_type;
172+
173+
// Note: initial payload size is 0 and will be set by the context before sending
174+
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
175+
return context.add<MessageContext::VectorObject<ValueType, MessageContext::ContainerRefObject<std::vector<ValueType, o2::pmr::NoConstructAllocator<ValueType>>>>>(
176+
std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...)
177+
.get();
178+
}
179+
180+
template <typename T, typename... Args>
181+
requires VectorOfMessageableTypes<T>
182+
decltype(auto) make(const Output& spec, Args... args)
183+
{
184+
auto& timingInfo = mRegistry.get<TimingInfo>();
185+
auto& context = mRegistry.get<MessageContext>();
186+
187+
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
188+
// this catches all std::vector objects with messageable value type before checking if is also
189+
// has a root dictionary, so non-serialized transmission is preferred
190+
using ValueType = typename T::value_type;
191+
192+
// Note: initial payload size is 0 and will be set by the context before sending
193+
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
194+
return context.add<MessageContext::VectorObject<ValueType>>(std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...).get();
195+
}
196+
197+
template <typename T, typename... Args>
198+
requires(!VectorOfMessageableTypes<T> && has_root_dictionary<T>::value == true && is_messageable<T>::value == false)
199+
decltype(auto) make(const Output& spec, Args... args)
200+
{
201+
auto& timingInfo = mRegistry.get<TimingInfo>();
202+
auto& context = mRegistry.get<MessageContext>();
203+
204+
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
205+
// Extended support for types implementing the Root ClassDef interface, both TObject
206+
// derived types and others
207+
if constexpr (enable_root_serialization<T>::value) {
208+
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodROOT, 0);
209+
210+
return context.add<typename enable_root_serialization<T>::object_type>(std::move(headerMessage), routeIndex, std::forward<Args>(args)...).get();
248211
} else {
249-
static_assert(always_static_assert_v<T>, ERROR_STRING);
212+
static_assert(enable_root_serialization<T>::value, "Please make sure you include RootMessageContext.h");
250213
}
251214
}
252215

216+
template <typename T, typename... Args>
217+
requires(std::is_base_of_v<std::string, T>)
218+
decltype(auto) make(const Output& spec, Args... args)
219+
{
220+
auto* s = new std::string(args...);
221+
adopt(spec, s);
222+
return *s;
223+
}
224+
225+
template <typename T, typename... Args>
226+
requires(requires { static_cast<struct TableBuilder>(std::declval<std::decay_t<T>>()); })
227+
decltype(auto) make(const Output& spec, Args... args)
228+
{
229+
auto tb = std::move(LifetimeHolder<TableBuilder>(new std::decay_t<T>(args...)));
230+
adopt(spec, tb);
231+
return tb;
232+
}
233+
234+
template <typename T, typename... Args>
235+
requires(requires { static_cast<struct TreeToTable>(std::declval<std::decay_t<T>>()); })
236+
decltype(auto) make(const Output& spec, Args... args)
237+
{
238+
auto t2t = std::move(LifetimeHolder<TreeToTable>(new std::decay_t<T>(args...)));
239+
adopt(spec, t2t);
240+
return t2t;
241+
}
242+
243+
template <typename T, typename... Args>
244+
requires(sizeof...(Args) == 0 && is_messageable<T>::value == true)
245+
decltype(auto) make(const Output& spec, Args... args)
246+
{
247+
return *reinterpret_cast<T*>(newChunk(spec, sizeof(T)).data());
248+
}
249+
250+
template <typename T, typename Arg>
251+
requires(std::is_integral_v<Arg> && is_messageable<T>::value)
252+
decltype(auto) make(const Output& spec, Arg args)
253+
{
254+
auto& timingInfo = mRegistry.get<TimingInfo>();
255+
auto& context = mRegistry.get<MessageContext>();
256+
auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
257+
258+
fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, sizeof(T));
259+
return context.add<MessageContext::SpanObject<T>>(std::move(headerMessage), routeIndex, 0, 1).get();
260+
}
261+
262+
template <typename T, typename Arg>
263+
requires(std::is_same_v<Arg, std::shared_ptr<arrow::Schema>>)
264+
decltype(auto) make(const Output& spec, Arg schema)
265+
{
266+
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
267+
create(spec, &writer, schema);
268+
return writer;
269+
}
270+
253271
/// Adopt a string in the framework and serialize / send
254272
/// it to the consumers of @a spec once done.
255273
void

0 commit comments

Comments
 (0)