Skip to content

Commit 0f2d5c4

Browse files
authored
Merge 5b5620d into sapling-pr-archive-ktf
2 parents 8b7d0c4 + 5b5620d commit 0f2d5c4

File tree

3 files changed

+100
-108
lines changed

3 files changed

+100
-108
lines changed

DataFormats/Headers/include/Headers/DataHeader.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,8 @@ struct BaseHeader {
373373
uint32_t flags;
374374
struct {
375375
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
376-
flagsReserved : 15, // reserved for future use
376+
flagsReserved : 14, // reserved for future use. MUST be filled with 0s.
377+
flagsDisabled : 1, // header should be ignored if this is 1
377378
flagsDerivedHeader : 16; // reserved for usage by the derived header
378379
};
379380
};
@@ -467,15 +468,20 @@ auto get(const std::byte* buffer, size_t /*len*/ = 0)
467468
// otherwise, we keep the code related to the exception outside the header file.
468469
// Note: Can not check on size because the O2 data model requires variable size headers
469470
// to be supported.
470-
if (current->sanityCheck(HeaderValueType::sVersion)) {
471+
if (current->sanityCheck(HeaderValueType::sVersion) && current->flagsDisabled == 0) {
472+
// If the first header matches and it's enabled, we return it
473+
// otherwise we look for more.
471474
return reinterpret_cast<HeaderConstPtrType>(current);
472475
}
473476
}
474477
auto* prev = current;
475478
while ((current = current->next())) {
476479
prev = current;
477480
if (current->description == HeaderValueType::sHeaderType) {
478-
if (current->sanityCheck(HeaderValueType::sVersion)) {
481+
// This is needed to allow disabling some headers from being picked up
482+
// even if they are matching. This is handy to have a quick
483+
// way to disable a sub headers without having to drop them.
484+
if (current->sanityCheck(HeaderValueType::sVersion) && current->flagsDisabled == 0) {
479485
return reinterpret_cast<HeaderConstPtrType>(current);
480486
}
481487
}

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 88 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -221,136 +221,124 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221
}
222222
}
223223

224-
static auto toBeForwardedHeader = [](void* header) -> bool {
225-
// If is now possible that the record is not complete when
226-
// we forward it, because of a custom completion policy.
227-
// this means that we need to skip the empty entries in the
228-
// record for being forwarded.
229-
if (header == nullptr) {
230-
return false;
231-
}
232-
auto sih = o2::header::get<SourceInfoHeader*>(header);
233-
if (sih) {
234-
return false;
235-
}
236-
237-
auto dih = o2::header::get<DomainInfoHeader*>(header);
238-
if (dih) {
239-
return false;
240-
}
241-
242-
auto dh = o2::header::get<header::DataHeader*>(header);
243-
if (!dh) {
244-
return false;
245-
}
246-
auto dph = o2::header::get<DataProcessingHeader*>(header);
247-
if (!dph) {
248-
return false;
249-
}
250-
return true;
251-
};
252-
253-
static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
254-
FairMQDeviceProxy& proxy,
255-
std::unique_ptr<fair::mq::Message>& header,
256-
std::unique_ptr<fair::mq::Message>& payload,
257-
size_t total,
258-
bool consume) {
259-
if (header.get() == nullptr) {
260-
// Missing an header is not an error anymore.
261-
// it simply means that we did not receive the
262-
// given input, but we were asked to
263-
// consume existing, so we skip it.
264-
return false;
265-
}
266-
if (payload.get() == nullptr && consume == true) {
267-
// If the payload is not there, it means we already
268-
// processed it with ConsumeExisiting. Therefore we
269-
// need to do something only if this is the last consume.
270-
header.reset(nullptr);
271-
return false;
272-
}
273-
274-
auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
275-
if (fdph == nullptr) {
276-
LOG(error) << "Data is missing DataProcessingHeader";
277-
return false;
278-
}
279-
auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
280-
if (fdh == nullptr) {
281-
LOG(error) << "Data is missing DataHeader";
282-
return false;
283-
}
284-
285-
// We need to find the forward route only for the first
286-
// part of a split payload. All the others will use the same.
287-
// but always check if we have a sequence of multiple payloads
288-
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
289-
proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
290-
}
291-
return cachedForwardingChoices.empty() == false;
292-
};
293-
294-
std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
295-
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
224+
auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot,
225+
std::vector<MessageSet>& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice,
226+
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
296227
{
297228
// we collect all messages per forward in a map and send them together
298229
std::vector<fair::mq::Parts> forwardedParts;
299230
forwardedParts.resize(proxy.getNumForwards());
300-
std::vector<ChannelIndex> cachedForwardingChoices{};
231+
std::vector<ChannelIndex> forwardingChoices{};
301232
O2_SIGNPOST_ID_GENERATE(sid, forwarding);
302233
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
303-
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
234+
slot.index, oldestTimeslice.timeslice.value, copyByDefault ? "with copy" : "", copyByDefault && consume ? " and " : "", consume ? "with consume" : "");
304235

305236
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
306237
auto& messageSet = currentSetOfInputs[ii];
307-
// In case the messageSet is empty, there is nothing to be done.
308-
if (messageSet.size() == 0) {
309-
continue;
310-
}
311-
if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
312-
continue;
313-
}
314-
cachedForwardingChoices.clear();
315238

316-
for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
317-
auto& messageSet = currentSetOfInputs[ii];
239+
for (size_t pi = 0; pi < messageSet.size(); ++pi) {
318240
auto& header = messageSet.header(pi);
241+
242+
// If is now possible that the record is not complete when
243+
// we forward it, because of a custom completion policy.
244+
// this means that we need to skip the empty entries in the
245+
// record for being forwarded.
246+
if (header->GetData() == nullptr) {
247+
continue;
248+
}
249+
250+
auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
251+
auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
252+
253+
if (dph == nullptr || dh == nullptr) {
254+
// Complain only if this is not an out-of-band message
255+
auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
256+
auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
257+
if (dih == nullptr || sih == nullptr) {
258+
LOGP(error, "Data is missing {}{}{}",
259+
dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
260+
}
261+
continue;
262+
}
263+
319264
auto& payload = messageSet.payload(pi);
320-
auto total = messageSet.getNumberOfPayloads(pi);
321265

322-
if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
266+
if (payload.get() == nullptr && consume == true) {
267+
// If the payload is not there, it means we already
268+
// processed it with ConsumeExisiting. Therefore we
269+
// need to do something only if this is the last consume.
270+
header.reset(nullptr);
323271
continue;
324272
}
325273

326-
// In case of more than one forward route, we need to copy the message.
327-
// This will eventually use the same mamory if running with the same backend.
328-
if (cachedForwardingChoices.size() > 1) {
329-
copy = true;
274+
// We need to find the forward route only for the first
275+
// part of a split payload. All the others will use the same.
276+
// Therefore, we reset and recompute the forwarding choice:
277+
//
278+
// - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
279+
// which is actually always created and handled together
280+
// - If the message is not a multipart (splitPayloadParts 0) or has only one part
281+
// - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
282+
// we will already use the same choice in the for loop below.
283+
if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) {
284+
forwardingChoices.clear();
285+
proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
286+
}
287+
288+
if (forwardingChoices.empty()) {
289+
// Nothing to forward go to the next messageset
290+
continue;
330291
}
331-
auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
332-
auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
333292

334-
if (copy) {
335-
for (auto& cachedForwardingChoice : cachedForwardingChoices) {
293+
// In case of more than one forward route, we need to copy the message.
294+
// This will eventually use the same memory if running with the same backend.
295+
if (copyByDefault || forwardingChoices.size()) {
296+
for (auto& choice : forwardingChoices) {
336297
auto&& newHeader = header->GetTransport()->CreateMessage();
337298
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
338-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
299+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
339300
newHeader->Copy(*header);
340-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
301+
auto dih = o2::header::get<DomainInfoHeader*>(newHeader->GetData());
302+
if (dih) {
303+
const_cast<DomainInfoHeader*>(dih)->flagsDisabled = 1;
304+
}
305+
auto sih = o2::header::get<SourceInfoHeader*>(newHeader->GetData());
306+
if (sih) {
307+
const_cast<SourceInfoHeader*>(sih)->flagsDisabled = 1;
308+
}
309+
forwardedParts[choice.value].AddPart(std::move(newHeader));
341310

342311
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
343312
auto&& newPayload = header->GetTransport()->CreateMessage();
344313
newPayload->Copy(*messageSet.payload(pi, payloadIndex));
345-
forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
314+
forwardedParts[choice.value].AddPart(std::move(newPayload));
346315
}
347316
}
348317
} else {
349318
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
350-
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
351-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
319+
fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
320+
auto dih = o2::header::get<DomainInfoHeader*>(messageSet.header(pi)->GetData());
321+
auto sih = o2::header::get<SourceInfoHeader*>(messageSet.header(pi)->GetData());
322+
// We need to copy the header if it has extra timeframe accounting
323+
// information attached to it, so that we can disable it without having
324+
// a race condition in shared memory.
325+
if (dih || sih) {
326+
auto&& newHeader = header->GetTransport()->CreateMessage();
327+
newHeader->Copy(*header);
328+
auto dih = o2::header::get<DomainInfoHeader*>(newHeader->GetData());
329+
if (dih) {
330+
const_cast<DomainInfoHeader*>(dih)->flagsDisabled = 1;
331+
}
332+
auto sih = o2::header::get<SourceInfoHeader*>(newHeader->GetData());
333+
if (sih) {
334+
const_cast<SourceInfoHeader*>(sih)->flagsDisabled = 1;
335+
}
336+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(newHeader));
337+
} else {
338+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
339+
}
352340
for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
353-
forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
341+
forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
354342
}
355343
}
356344
}

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
162162

163163
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
164164
REQUIRE(result.size() == 1); // One route
165-
REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2
165+
REQUIRE(result[0].Size() == 2);
166166
// Correct behavior below:
167167
// REQUIRE(result[0].Size() == 2);
168168
// REQUIRE(o2::header::get<SourceInfoHeader*>(result[0].At(0)->GetData()) == nullptr);
@@ -223,10 +223,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
223223

224224
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
225225
REQUIRE(result.size() == 1); // One route
226-
REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong
227-
// FIXME: actually correct behavior below
228-
// REQUIRE(result[0].Size() == 2); // Two messages
229-
// REQUIRE(o2::header::get<DomainInfoHeader*>(result[0].At(0)->GetData()) == nullptr); // it should not have the end of stream
226+
REQUIRE(result[0].Size() == 2);
227+
REQUIRE(o2::header::get<DomainInfoHeader*>(result[0].At(0)->GetData()) == nullptr); // it should not have the end of stream
230228
}
231229

232230
TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")

0 commit comments

Comments
 (0)