@@ -221,104 +221,170 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
221221 }
222222}
223223
224- auto DataProcessingHelpers::routeForwardedMessages (FairMQDeviceProxy& proxy,
225- std::vector<MessageSet>& currentSetOfInputs,
226- const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
224+ void DataProcessingHelpers::routeForwardedMessages (FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& messages, std::vector<fair::mq::Parts>& forwardedParts,
225+ const bool copyByDefault, bool consume)
226+ {
227+ O2_SIGNPOST_ID_GENERATE (sid, forwarding);
228+ std::vector<ChannelIndex> forwardingChoices{};
229+ size_t pi = 0 ;
230+ while (pi < messages.size ()) {
231+ auto & header = messages[pi];
232+
233+ // If is now possible that the record is not complete when
234+ // we forward it, because of a custom completion policy.
235+ // this means that we need to skip the empty entries in the
236+ // record for being forwarded.
237+ if (header->GetData () == nullptr ) {
238+ pi += 2 ;
239+ continue ;
240+ }
241+ auto dih = o2::header::get<DomainInfoHeader*>(header->GetData ());
242+ if (dih) {
243+ pi += 2 ;
244+ continue ;
245+ }
246+ auto sih = o2::header::get<SourceInfoHeader*>(header->GetData ());
247+ if (sih) {
248+ pi += 2 ;
249+ continue ;
250+ }
251+
252+ auto dph = o2::header::get<DataProcessingHeader*>(header->GetData ());
253+ auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData ());
254+
255+ if (dph == nullptr || dh == nullptr ) {
256+ // Complain only if this is not an out-of-band message
257+ LOGP (error, " Data is missing {}{}{}" ,
258+ dph ? " DataProcessingHeader" : " " , dph || dh ? " and" : " " , dh ? " DataHeader" : " " );
259+ pi += 2 ;
260+ continue ;
261+ }
262+
263+ // At least one payload.
264+ auto & payload = messages[pi + 1 ];
265+ // Calculate the number of messages which should be handled together
266+ // all in one go.
267+ size_t numberOfMessages = 0 ;
268+ if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex ) {
269+ // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
270+ numberOfMessages = dh->splitPayloadParts ;
271+ } else {
272+ // Sequence of splitPayloadParts (header, payload) pairs belonging together.
273+ // In case splitPayloadParts = 0, we consider this as a single message pair
274+ numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1 ) * 2 ;
275+ }
276+
277+ if (payload.get () == nullptr && consume == true ) {
278+ // If the payload is not there, it means we already
279+ // processed it with ConsumeExisiting. Therefore we
280+ // need to do something only if this is the last consume.
281+ header.reset (nullptr );
282+ pi += numberOfMessages;
283+ continue ;
284+ }
285+
286+ // We need to find the forward route only for the first
287+ // part of a split payload. All the others will use the same.
288+ // Therefore, we reset and recompute the forwarding choice:
289+ //
290+ // - If this is the first payload of a [header0][payload0][header0][payload1]... sequence,
291+ // which is actually always created and handled together. Notice that in this
292+ // case we have splitPayloadParts == splitPayloadIndex
293+ // - If this is the first payload of a [header0][payload0][header1][payload1]... sequence
294+ // belonging to the same multipart message (and therefore we are guaranteed that they
295+ // need to be routed together).
296+ // - If the message is not a multipart (splitPayloadParts 0) or has only one part
297+ // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
298+ // we will already use the same choice in the for loop below.
299+ //
300+
301+ forwardingChoices.clear ();
302+ proxy.getMatchingForwardChannelIndexes (forwardingChoices, *dh, dph->startTime );
303+
304+ if (forwardingChoices.empty ()) {
305+ // Nothing to forward go to the next messageset
306+ pi += numberOfMessages;
307+ continue ;
308+ }
309+
310+ // In case of more than one forward route, we need to copy the message.
311+ // This will eventually use the same memory if running with the same backend.
312+ if (copyByDefault || forwardingChoices.size () > 1 ) {
313+ for (auto & choice : forwardingChoices) {
314+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding a copy of %{public}s to route %d." ,
315+ fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), choice.value );
316+
317+ for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
318+ auto && newMsg = header->GetTransport ()->CreateMessage ();
319+ newMsg->Copy (*messages[ppi]);
320+ forwardedParts[choice.value ].AddPart (std::move (newMsg));
321+ }
322+ }
323+ } else {
324+ O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding %{public}s to route %d." ,
325+ fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), forwardingChoices.back ().value );
326+ for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
327+ forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messages[ppi]));
328+ }
329+ }
330+ pi += numberOfMessages;
331+ }
332+ }
333+
334+ auto DataProcessingHelpers::routeForwardedMessageSet (FairMQDeviceProxy& proxy,
335+ std::vector<MessageSet>& currentSetOfInputs,
336+ const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
227337{
228338 // we collect all messages per forward in a map and send them together
229339 std::vector<fair::mq::Parts> forwardedParts;
230340 forwardedParts.resize (proxy.getNumForwards ());
231341 std::vector<ChannelIndex> forwardingChoices{};
232- O2_SIGNPOST_ID_GENERATE (sid, forwarding);
233342
234343 for (size_t ii = 0 , ie = currentSetOfInputs.size (); ii < ie; ++ii) {
235- auto & messageSet = currentSetOfInputs[ii];
236-
237- for (size_t pi = 0 ; pi < messageSet.size (); ++pi) {
238- auto & header = messageSet.header (pi);
344+ auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages );
345+ routeForwardedMessages (proxy, span, forwardedParts, copyByDefault, consume);
346+ }
347+ return forwardedParts;
348+ }
239349
240- // If is now possible that the record is not complete when
241- // we forward it, because of a custom completion policy.
242- // this means that we need to skip the empty entries in the
243- // record for being forwarded.
244- if (header-> GetData () == nullptr ) {
245- continue ;
246- }
350+ void DataProcessingHelpers::cleanForwardedMessageSet (std::vector<MessageSet>& currentSetOfInputs)
351+ {
352+ for ( size_t ii = 0 , ie = currentSetOfInputs. size (); ii < ie; ++ii) {
353+ auto messages = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]. messages );
354+ size_t pi = 0 ;
355+ while (pi < messages. size ()) {
356+ auto & header = messages[pi];
247357 auto dih = o2::header::get<DomainInfoHeader*>(header->GetData ());
248- if (dih) {
249- continue ;
250- }
251358 auto sih = o2::header::get<SourceInfoHeader*>(header->GetData ());
252- if (sih) {
253- continue ;
254- }
255-
256359 auto dph = o2::header::get<DataProcessingHeader*>(header->GetData ());
257360 auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData ());
258-
259- if (dph == nullptr || dh == nullptr ) {
260- // Complain only if this is not an out-of-band message
261- LOGP (error, " Data is missing {}{}{}" ,
262- dph ? " DataProcessingHeader" : " " , dph || dh ? " and" : " " , dh ? " DataHeader" : " " );
361+ if (header->GetData () == nullptr || sih || dih || dph == nullptr || dh == nullptr ) {
362+ pi += 2 ;
263363 continue ;
264364 }
265365
266- auto & payload = messageSet.payload (pi);
366+ // At least one payload.
367+ auto & payload = messages[pi + 1 ];
267368
268- if (payload.get () == nullptr && consume == true ) {
369+ if (payload.get () == nullptr ) {
269370 // If the payload is not there, it means we already
270371 // processed it with ConsumeExisiting. Therefore we
271372 // need to do something only if this is the last consume.
272373 header.reset (nullptr );
273- continue ;
274374 }
275375
276- // We need to find the forward route only for the first
277- // part of a split payload. All the others will use the same.
278- // Therefore, we reset and recompute the forwarding choice:
279- //
280- // - If this is the first payload of a [header0][payload0][header0][payload1] sequence,
281- // which is actually always created and handled together
282- // - If the message is not a multipart (splitPayloadParts 0) or has only one part
283- // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
284- // we will already use the same choice in the for loop below.
285- if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads (pi) > 0 ) {
286- forwardingChoices.clear ();
287- proxy.getMatchingForwardChannelIndexes (forwardingChoices, *dh, dph->startTime );
288- }
289-
290- if (forwardingChoices.empty ()) {
291- // Nothing to forward go to the next messageset
292- continue ;
293- }
294-
295- // In case of more than one forward route, we need to copy the message.
296- // This will eventually use the same memory if running with the same backend.
297- if (copyByDefault || forwardingChoices.size () > 1 ) {
298- for (auto & choice : forwardingChoices) {
299- auto && newHeader = header->GetTransport ()->CreateMessage ();
300- O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding a copy of %{public}s to route %d." ,
301- fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), choice.value );
302- newHeader->Copy (*header);
303- forwardedParts[choice.value ].AddPart (std::move (newHeader));
304-
305- for (size_t payloadIndex = 0 ; payloadIndex < messageSet.getNumberOfPayloads (pi); ++payloadIndex) {
306- auto && newPayload = header->GetTransport ()->CreateMessage ();
307- newPayload->Copy (*messageSet.payload (pi, payloadIndex));
308- forwardedParts[choice.value ].AddPart (std::move (newPayload));
309- }
310- }
376+ // Calculate the number of messages which should be handled together
377+ // all in one go.
378+ if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex ) {
379+ // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
380+ pi += dh->splitPayloadParts ;
311381 } else {
312- O2_SIGNPOST_EVENT_EMIT (forwarding, sid, " forwardInputs" , " Forwarding %{public}s to route %d." ,
313- fmt::format (" {}/{}/{}@timeslice:{} tfCounter:{}" , dh->dataOrigin , dh->dataDescription , dh->subSpecification , dph->startTime , dh->tfCounter ).c_str (), forwardingChoices.back ().value );
314- forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messageSet.header (pi)));
315- for (size_t payloadIndex = 0 ; payloadIndex < messageSet.getNumberOfPayloads (pi); ++payloadIndex) {
316- forwardedParts[forwardingChoices.back ().value ].AddPart (std::move (messageSet.payload (pi, payloadIndex)));
317- }
382+ // Sequence of splitPayloadParts (header, payload) pairs belonging together.
383+ // In case splitPayloadParts = 0, we consider this as a single message pair
384+ pi += (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1 ) * 2 ;
318385 }
319386 }
320387 }
321- return forwardedParts;
322- };
388+ }
323389
324390} // namespace o2::framework
0 commit comments