@@ -1061,7 +1061,11 @@ void DataProcessingDevice::InitTask()
10611061 // Whenever we InitTask, we consider as if the previous iteration
10621062 // was successful, so that even if there is no timer or receiving
10631063 // channel, we can still start an enumeration.
1064- mWasActive = true ;
1064+ DataProcessorContext* initialContext = nullptr ;
1065+ bool idle = state.lastActiveDataProcessor .compare_exchange_strong (initialContext, (DataProcessorContext*)-1 );
1066+ if (!idle) {
1067+ LOG (error) << " DataProcessor " << state.lastActiveDataProcessor .load ()->spec ->name << " was unexpectedly active" ;
1068+ }
10651069
10661070 // We should be ready to run here. Therefore we copy all the
10671071 // required parts in the DataProcessorContext. Eventually we should
@@ -1093,8 +1097,6 @@ void DataProcessingDevice::InitTask()
10931097
10941098void DataProcessingDevice::fillContext (DataProcessorContext& context, DeviceContext& deviceContext)
10951099{
1096- context.wasActive = &mWasActive ;
1097-
10981100 context.isSink = false ;
10991101 // If nothing is a sink, the rate limiting simply does not trigger.
11001102 bool enableRateLimiting = std::stoi (fConfig ->GetValue <std::string>(" timeframes-rate-limit" ));
@@ -1308,14 +1310,19 @@ void DataProcessingDevice::Run()
13081310 {
13091311 ServiceRegistryRef ref{mServiceRegistry };
13101312 ref.get <DriverClient>().flushPending (mServiceRegistry );
1311- auto shouldNotWait = (mWasActive &&
1313+ DataProcessorContext* lastActive = state.lastActiveDataProcessor .load ();
1314+ // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1315+ // In such case we will take care of it at next iteration.
1316+ state.lastActiveDataProcessor .compare_exchange_strong (lastActive, nullptr );
1317+
1318+ auto shouldNotWait = (lastActive != nullptr &&
13121319 (state.streaming != StreamingState::Idle) && (state.activeSignals .empty ())) ||
13131320 (state.streaming == StreamingState::EndOfStreaming);
13141321 if (firstLoop) {
13151322 shouldNotWait = true ;
13161323 firstLoop = false ;
13171324 }
1318- if (mWasActive ) {
1325+ if (lastActive != nullptr ) {
13191326 state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE;
13201327 }
13211328 if (NewStatePending ()) {
@@ -1485,10 +1492,7 @@ void DataProcessingDevice::Run()
14851492 } else {
14861493 auto ref = ServiceRegistryRef{mServiceRegistry };
14871494 ref.get <ComputingQuotaEvaluator>().handleExpired (reportExpiredOffer);
1488- mWasActive = false ;
14891495 }
1490- } else {
1491- mWasActive = false ;
14921496 }
14931497 }
14941498
@@ -1510,7 +1514,6 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
15101514 O2_SIGNPOST_ID_FROM_POINTER (dpid, device, &context);
15111515 O2_SIGNPOST_START (device, dpid, " do_prepare" , " Starting DataProcessorContext::doPrepare." );
15121516
1513- *context.wasActive = false ;
15141517 {
15151518 ref.get <CallbackService>().call <CallbackService::Id::ClockTick>();
15161519 }
@@ -1669,7 +1672,10 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
16691672 socket.Events (&info.hasPendingEvents );
16701673 if (info.hasPendingEvents ) {
16711674 info.readPolled = false ;
1672- *context.wasActive |= newMessages;
1675+ // In case there were messages, we consider it as activity
1676+ if (newMessages) {
1677+ state.lastActiveDataProcessor .store (&context);
1678+ }
16731679 }
16741680 O2_SIGNPOST_END (device, cid, " channels" , " Done processing channel %{public}s (%d)." ,
16751681 channelSpec.name .c_str (), info.id .value );
@@ -1693,24 +1699,29 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
16931699 auto & spec = ref.get <DeviceSpec const >();
16941700
16951701 if (state.streaming == StreamingState::Idle) {
1696- *context.wasActive = false ;
16971702 return ;
16981703 }
16991704
17001705 context.completed .clear ();
17011706 context.completed .reserve (16 );
1702- *context.wasActive |= DataProcessingDevice::tryDispatchComputation (ref, context.completed );
1707+ if (DataProcessingDevice::tryDispatchComputation (ref, context.completed )) {
1708+ state.lastActiveDataProcessor .store (&context);
1709+ }
17031710 DanglingContext danglingContext{*context.registry };
17041711
17051712 context.preDanglingCallbacks (danglingContext);
1706- if (*context. wasActive == false ) {
1713+ if (state. lastActiveDataProcessor . load () == nullptr ) {
17071714 ref.get <CallbackService>().call <CallbackService::Id::Idle>();
17081715 }
17091716 auto activity = ref.get <DataRelayer>().processDanglingInputs (context.expirationHandlers , *context.registry , true );
1710- *context.wasActive |= activity.expiredSlots > 0 ;
1717+ if (activity.expiredSlots > 0 ) {
1718+ state.lastActiveDataProcessor = &context;
1719+ }
17111720
17121721 context.completed .clear ();
1713- *context.wasActive |= DataProcessingDevice::tryDispatchComputation (ref, context.completed );
1722+ if (DataProcessingDevice::tryDispatchComputation (ref, context.completed )) {
1723+ state.lastActiveDataProcessor = &context;
1724+ }
17141725
17151726 context.postDanglingCallbacks (danglingContext);
17161727
@@ -1720,7 +1731,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17201731 // framework itself.
17211732 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
17221733 switchState (StreamingState::EndOfStreaming);
1723- *context. wasActive = true ;
1734+ state. lastActiveDataProcessor = &context ;
17241735 }
17251736
17261737 if (state.streaming == StreamingState::EndOfStreaming) {
@@ -1766,7 +1777,10 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17661777 // This is needed because the transport is deleted before the device.
17671778 relayer.clear ();
17681779 switchState (StreamingState::Idle);
1769- *context.wasActive = shouldProcess;
1780+ // In case we should process, note the data processor responsible for it
1781+ if (shouldProcess) {
1782+ state.lastActiveDataProcessor = &context;
1783+ }
17701784 // On end of stream we shut down all output pollers.
17711785 O2_SIGNPOST_EVENT_EMIT (device, dpid, " state" , " Shutting down output pollers." );
17721786 for (auto & poller : state.activeOutputPollers ) {
@@ -1834,6 +1848,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18341848 O2_SIGNPOST_ID_FROM_POINTER (cid, device, &info);
18351849 auto ref = ServiceRegistryRef{*context.registry };
18361850 auto & stats = ref.get <DataProcessingStats>();
1851+ auto & state = ref.get <DeviceState>();
18371852 auto & parts = info.parts ;
18381853 stats.updateStats ({(int )ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t )parts.Size ()});
18391854
@@ -1856,14 +1871,14 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
18561871 O2_SIGNPOST_EVENT_EMIT (device, cid, " handle_data" , " Got SourceInfoHeader with state %d" , (int )sih->state );
18571872 info.state = sih->state ;
18581873 insertInputInfo (pi, 2 , InputType::SourceInfo, info.id );
1859- *context. wasActive = true ;
1874+ state. lastActiveDataProcessor = &context ;
18601875 continue ;
18611876 }
18621877 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
18631878 if (dih) {
18641879 O2_SIGNPOST_EVENT_EMIT (device, cid, " handle_data" , " Got DomainInfoHeader with oldestPossibleTimeslice %d" , (int )dih->oldestPossibleTimeslice );
18651880 insertInputInfo (pi, 2 , InputType::DomainInfo, info.id );
1866- *context. wasActive = true ;
1881+ state. lastActiveDataProcessor = &context ;
18671882 continue ;
18681883 }
18691884 auto dh = o2::header::get<DataHeader*>(headerData);
@@ -1925,6 +1940,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
19251940
19261941 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const & inputInfos) {
19271942 auto & relayer = ref.get <DataRelayer>();
1943+ auto & state = ref.get <DeviceState>();
19281944 static WaitBackpressurePolicy policy;
19291945 auto & parts = info.parts ;
19301946 // We relay execution to make sure we have a complete set of parts
@@ -2012,7 +2028,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
20122028 case InputType::SourceInfo: {
20132029 LOGP (detail, " Received SourceInfo" );
20142030 auto & context = ref.get <DataProcessorContext>();
2015- *context. wasActive = true ;
2031+ state. lastActiveDataProcessor = &context ;
20162032 auto headerIndex = input.position ;
20172033 auto payloadIndex = input.position + 1 ;
20182034 assert (payloadIndex < parts.Size ());
@@ -2030,7 +2046,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
20302046 // / We have back pressure, therefore we do not process DomainInfo anymore.
20312047 // / until the previous message are processed.
20322048 auto & context = ref.get <DataProcessorContext>();
2033- *context. wasActive = true ;
2049+ state. lastActiveDataProcessor = &context ;
20342050 auto headerIndex = input.position ;
20352051 auto payloadIndex = input.position + 1 ;
20362052 assert (payloadIndex < parts.Size ());
@@ -2058,7 +2074,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
20582074 auto & context = ref.get <DataProcessorContext>();
20592075 context.domainInfoUpdatedCallback (*context.registry , oldestPossibleTimeslice, info.id );
20602076 ref.get <CallbackService>().call <CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry , (size_t )oldestPossibleTimeslice, (ChannelIndex)info.id );
2061- *context. wasActive = true ;
2077+ state. lastActiveDataProcessor = &context ;
20622078 }
20632079 auto it = std::remove_if (parts.fParts .begin (), parts.fParts .end (), [](auto & msg) -> bool { return msg.get () == nullptr ; });
20642080 parts.fParts .erase (it, parts.end ());
0 commit comments