Skip to content

Commit e074490

Browse files
authored
Merge 9610896 into sapling-pr-archive-ktf
2 parents e347ff6 + 9610896 commit e074490

File tree

2 files changed

+48
-40
lines changed

2 files changed

+48
-40
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -141,20 +141,34 @@ void on_transition_requested_expired(uv_timer_t* handle)
141141
state.transitionHandling = TransitionHandlingState::Expired;
142142
}
143143

144+
auto switchState(ServiceRegistryRef& ref, StreamingState newState) -> void
145+
{
146+
auto& state = ref.get<DeviceState>();
147+
auto& context = ref.get<DataProcessorContext>();
148+
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
149+
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
150+
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
151+
state.streaming = newState;
152+
ref.get<ControlService>().notifyStreamingState(state.streaming);
153+
};
154+
144155
void on_data_processing_expired(uv_timer_t* handle)
145156
{
146157
auto* ref = (ServiceRegistryRef*)handle->data;
147158
auto& state = ref->get<DeviceState>();
159+
auto& spec = ref->get<DeviceSpec const>();
148160
state.loopReason |= DeviceState::TIMER_EXPIRED;
149161

150162
// Check if this is a source device
151163
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
152164

153-
// Source devices should never end up in this callback, since the exitTransitionTimeout should
154-
// be reset to the dataProcessingTimeout and the timers cohalesced.
155-
assert(hasOnlyGenerated(ref->get<DeviceSpec const>()) == false);
156-
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
157-
state.allowedProcessing = DeviceState::CalibrationOnly;
165+
if (hasOnlyGenerated(spec)) {
166+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
167+
switchState(*ref, StreamingState::EndOfStreaming);
168+
} else {
169+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
170+
state.allowedProcessing = DeviceState::CalibrationOnly;
171+
}
158172
}
159173

160174
void on_communication_requested(uv_async_t* s)
@@ -1236,7 +1250,7 @@ void DataProcessingDevice::PreRun()
12361250
O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
12371251
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
12381252
state.quitRequested = false;
1239-
state.streaming = StreamingState::Streaming;
1253+
switchState(ref, StreamingState::Streaming);
12401254
state.allowedProcessing = DeviceState::Any;
12411255
for (auto& info : state.inputChannelInfos) {
12421256
if (info.state != InputChannelState::Pull) {
@@ -1365,14 +1379,7 @@ void DataProcessingDevice::Run()
13651379
// Check if we only have timers
13661380
auto& spec = ref.get<DeviceSpec const>();
13671381
if (hasOnlyTimers(spec)) {
1368-
state.streaming = StreamingState::EndOfStreaming;
1369-
}
1370-
1371-
// If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively
1372-
// the same (because source devices are not allowed to produce any calibration).
1373-
// should be the same.
1374-
if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) {
1375-
deviceContext.exitTransitionTimeout = deviceContext.dataProcessingTimeout;
1382+
switchState(ref, StreamingState::EndOfStreaming);
13761383
}
13771384

13781385
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
@@ -1385,12 +1392,14 @@ void DataProcessingDevice::Run()
13851392
state.transitionHandling = TransitionHandlingState::Requested;
13861393
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
13871394
uv_update_time(state.loop);
1388-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout);
1395+
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1396+
deviceContext.exitTransitionTimeout);
13891397
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1390-
if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1391-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
1398+
int timeout = hasOnlyGenerated(spec) ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1399+
if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1400+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
13921401
} else {
1393-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before switching to READY state.", (int)deviceContext.exitTransitionTimeout);
1402+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before switching to READY state.", timeout);
13941403
}
13951404
} else {
13961405
state.transitionHandling = TransitionHandlingState::Expired;
@@ -1728,15 +1737,6 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17281737
{
17291738
auto& context = ref.get<DataProcessorContext>();
17301739
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1731-
auto switchState = [ref](StreamingState newState) {
1732-
auto& state = ref.get<DeviceState>();
1733-
auto& context = ref.get<DataProcessorContext>();
1734-
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1735-
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
1736-
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
1737-
state.streaming = newState;
1738-
ref.get<ControlService>().notifyStreamingState(state.streaming);
1739-
};
17401740
auto& state = ref.get<DeviceState>();
17411741
auto& spec = ref.get<DeviceSpec const>();
17421742

@@ -1772,7 +1772,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
17721772
// dependent on the callback, not something which is controlled by the
17731773
// framework itself.
17741774
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1775-
switchState(StreamingState::EndOfStreaming);
1775+
switchState(ref, StreamingState::EndOfStreaming);
17761776
state.lastActiveDataProcessor = &context;
17771777
}
17781778

@@ -1818,7 +1818,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18181818
}
18191819
// This is needed because the transport is deleted before the device.
18201820
relayer.clear();
1821-
switchState(StreamingState::Idle);
1821+
switchState(ref, StreamingState::Idle);
18221822
// In case we should process, note the data processor responsible for it
18231823
if (shouldProcess) {
18241824
state.lastActiveDataProcessor = &context;
@@ -2328,13 +2328,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
23282328
}
23292329
};
23302330

2331-
auto switchState = [ref](StreamingState newState) {
2332-
auto& control = ref.get<ControlService>();
2333-
auto& state = ref.get<DeviceState>();
2334-
state.streaming = newState;
2335-
control.notifyStreamingState(state.streaming);
2336-
};
2337-
23382331
ref.get<DataRelayer>().getReadyToProcess(completed);
23392332
if (completed.empty() == true) {
23402333
LOGP(debug, "No computations available for dispatching.");
@@ -2510,7 +2503,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25102503
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
25112504
} else {
25122505
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2513-
state.streaming = StreamingState::Idle;
2506+
switchState(ref, StreamingState::Idle);
25142507
}
25152508
if (shouldProcess(action)) {
25162509
auto& timingInfo = ref.get<TimingInfo>();
@@ -2598,7 +2591,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25982591
for (auto& channel : spec.outputChannels) {
25992592
DataProcessingHelpers::sendEndOfStream(ref, channel);
26002593
}
2601-
switchState(StreamingState::Idle);
2594+
switchState(ref, StreamingState::Idle);
26022595
}
26032596

26042597
return true;

Framework/Core/src/DataRelayer.cxx

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11+
#include "Framework/DeviceState.h"
1112
#include "Framework/RootSerializationSupport.h"
1213
#include "Framework/DataRelayer.h"
1314
#include "Framework/DataProcessingStats.h"
@@ -17,6 +18,7 @@
1718
#include "Framework/DataDescriptorMatcher.h"
1819
#include "Framework/DataSpecUtils.h"
1920
#include "Framework/DataProcessingHeader.h"
21+
#include "Framework/DataProcessingContext.h"
2022
#include "Framework/DataRef.h"
2123
#include "Framework/InputRecord.h"
2224
#include "Framework/InputSpan.h"
@@ -42,11 +44,11 @@
4244
#include <Monitoring/Metric.h>
4345
#include <Monitoring/Monitoring.h>
4446

47+
#include <fairlogger/Logger.h>
4548
#include <fairmq/Channel.h>
4649
#include <fmt/format.h>
4750
#include <fmt/ostream.h>
4851
#include <gsl/span>
49-
#include <numeric>
5052
#include <string>
5153

5254
using namespace o2::framework::data_matcher;
@@ -55,6 +57,8 @@ using DataProcessingHeader = o2::framework::DataProcessingHeader;
5557
using Verbosity = o2::monitoring::Verbosity;
5658

5759
O2_DECLARE_DYNAMIC_LOG(data_relayer);
60+
// Stream which keeps track of the calibration lifetime logic
61+
O2_DECLARE_DYNAMIC_LOG(calibration);
5862

5963
namespace o2::framework
6064
{
@@ -333,7 +337,11 @@ void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex chan
333337
if (element.size() == 0) {
334338
auto& state = mContext.get<DeviceState>();
335339
if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
336-
LOGP(warning, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
340+
if (state.allowedProcessing == DeviceState::CalibrationOnly) {
341+
LOGP(info, "Missing {} (lifetime:{}) while dropping non-calibration data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
342+
} else {
343+
LOGP(warn, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
344+
}
337345
} else {
338346
LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
339347
}
@@ -480,6 +488,13 @@ DataRelayer::RelayChoice
480488
// We are in calibration mode and the data does not have the calibration bit set.
481489
// We do not store it.
482490
if (services.get<DeviceState>().allowedProcessing == DeviceState::ProcessingType::CalibrationOnly && !isCalibrationData(messages[mi])) {
491+
O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
492+
O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
493+
"Dropping incoming %zu messages because they are data processing.", nPayloads);
494+
// Actually dropping messages.
495+
for (size_t i = mi; i < mi + nPayloads + 1; i++) {
496+
auto discard = std::move(messages[i]);
497+
}
483498
mi += nPayloads;
484499
continue;
485500
}

0 commit comments

Comments
 (0)