Skip to content

Commit f63bf2e

Browse files
authored
Merge b33b8bc into sapling-pr-archive-ktf
2 parents 8d84e15 + b33b8bc commit f63bf2e

File tree

14 files changed

+452
-146
lines changed

14 files changed

+452
-146
lines changed

DataFormats/simulation/src/DigitizationContext.cxx

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ void DigitizationContext::applyMaxCollisionFilter(std::vector<std::tuple<int, in
577577
if (indices_old_to_new.find(lastindex) != indices_old_to_new.end()) {
578578
std::get<1>(tf_indices) = indices_old_to_new[lastindex]; // end;
579579
} else {
580-
std::get<1>(tf_indices) = newrecords.size(); // end;
580+
std::get<1>(tf_indices) = newrecords.size() - 1; // end; -1 since index inclusif
581581
}
582582
if (indices_old_to_new.find(previndex) != indices_old_to_new.end()) {
583583
std::get<2>(tf_indices) = indices_old_to_new[previndex]; // previous or "early" index
@@ -591,11 +591,6 @@ void DigitizationContext::applyMaxCollisionFilter(std::vector<std::tuple<int, in
591591
std::vector<std::tuple<int, int, int>> DigitizationContext::calcTimeframeIndices(long startOrbit, long orbitsPerTF, double orbitsEarly) const
592592
{
593593
auto timeframeindices = getTimeFrameBoundaries(mEventRecords, startOrbit, orbitsPerTF, orbitsEarly);
594-
LOG(info) << "Fixed " << timeframeindices.size() << " timeframes ";
595-
for (auto p : timeframeindices) {
596-
LOG(info) << std::get<0>(p) << " " << std::get<1>(p) << " " << std::get<2>(p);
597-
}
598-
599594
return timeframeindices;
600595
}
601596

@@ -708,7 +703,7 @@ DigitizationContext DigitizationContext::extractSingleTimeframe(int timeframeid,
708703
auto tf_ranges = timeframeindices.at(timeframeid);
709704

710705
auto startindex = std::get<0>(tf_ranges);
711-
auto endindex = std::get<1>(tf_ranges) + 1; // +1 due to endindex being "including"
706+
auto endindex = std::get<1>(tf_ranges) + 1;
712707
auto earlyindex = std::get<2>(tf_ranges);
713708

714709
if (earlyindex >= 0) {

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ add_executable(o2-test-framework-core
240240
test/test_OverrideLabels.cxx
241241
test/test_O2DataModelHelpers.cxx
242242
test/test_RootConfigParamHelpers.cxx
243+
test/test_ResourcesMonitoringHelpers.cxx
243244
test/test_Services.cxx
244245
test/test_StringHelpers.cxx
245246
test/test_StaticFor.cxx

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ struct ForwardChannelState;
2121
struct OutputChannelInfo;
2222
struct OutputChannelSpec;
2323
struct OutputChannelState;
24+
struct ProcessingPolicies;
25+
struct DeviceSpec;
26+
enum struct StreamingState;
27+
enum struct TransitionHandlingState;
2428

2529
/// Generic helpers for DataProcessing releated functions.
2630
struct DataProcessingHelpers {
@@ -35,6 +39,12 @@ struct DataProcessingHelpers {
3539
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, OutputChannelInfo const& info, OutputChannelState& state, size_t timeslice);
3640
/// Broadcast the oldest possible timeslice to all channels in output
3741
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const& ref, size_t timeslice);
42+
/// change the device StreamingState to newState
43+
static void switchState(ServiceRegistryRef const& ref, StreamingState newState);
44+
/// check if spec is a source devide
45+
static bool hasOnlyGenerated(DeviceSpec const& spec);
46+
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
47+
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
3848
};
3949

4050
} // namespace o2::framework

Framework/Core/include/Framework/DriverInfo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ struct DriverInfo {
149149
std::string uniqueWorkflowId = "";
150150
/// Metrics gathering interval
151151
unsigned short resourcesMonitoringInterval = 0;
152+
/// Where to dump the metrics
153+
std::string resourcesMonitoringFilename = "performanceMetrics.json";
152154
/// Metrics gathering dump to disk interval
153155
unsigned short resourcesMonitoringDumpInterval = 0;
154156
/// Port used by the websocket control. 0 means not initialised.

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 7 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -125,63 +125,6 @@ void on_idle_timer(uv_timer_t* handle)
125125
state->loopReason |= DeviceState::TIMER_EXPIRED;
126126
}
127127

128-
bool hasOnlyTimers(DeviceSpec const& spec)
129-
{
130-
return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
131-
}
132-
133-
bool hasOnlyGenerated(DeviceSpec const& spec)
134-
{
135-
return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
136-
}
137-
138-
void on_transition_requested_expired(uv_timer_t* handle)
139-
{
140-
auto* ref = (ServiceRegistryRef*)handle->data;
141-
auto& state = ref->get<DeviceState>();
142-
state.loopReason |= DeviceState::TIMER_EXPIRED;
143-
// Check if this is a source device
144-
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
145-
auto& spec = ref->get<DeviceSpec const>();
146-
std::string messageOnExpire = hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
147-
if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
148-
O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
149-
} else {
150-
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
151-
}
152-
state.transitionHandling = TransitionHandlingState::Expired;
153-
}
154-
155-
auto switchState(ServiceRegistryRef& ref, StreamingState newState) -> void
156-
{
157-
auto& state = ref.get<DeviceState>();
158-
auto& context = ref.get<DataProcessorContext>();
159-
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
160-
O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
161-
O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
162-
state.streaming = newState;
163-
ref.get<ControlService>().notifyStreamingState(state.streaming);
164-
};
165-
166-
void on_data_processing_expired(uv_timer_t* handle)
167-
{
168-
auto* ref = (ServiceRegistryRef*)handle->data;
169-
auto& state = ref->get<DeviceState>();
170-
auto& spec = ref->get<DeviceSpec const>();
171-
state.loopReason |= DeviceState::TIMER_EXPIRED;
172-
173-
// Check if this is a source device
174-
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
175-
176-
if (hasOnlyGenerated(spec)) {
177-
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
178-
switchState(*ref, StreamingState::EndOfStreaming);
179-
} else {
180-
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
181-
state.allowedProcessing = DeviceState::CalibrationOnly;
182-
}
183-
}
184-
185128
void on_communication_requested(uv_async_t* s)
186129
{
187130
auto* state = (DeviceState*)s->data;
@@ -1267,7 +1210,7 @@ void DataProcessingDevice::PreRun()
12671210
O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
12681211
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
12691212
state.quitRequested = false;
1270-
switchState(ref, StreamingState::Streaming);
1213+
DataProcessingHelpers::switchState(ref, StreamingState::Streaming);
12711214
state.allowedProcessing = DeviceState::Any;
12721215
for (auto& info : state.inputChannelInfos) {
12731216
if (info.state != InputChannelState::Pull) {
@@ -1338,58 +1281,6 @@ void DataProcessingDevice::Reset()
13381281
ref.get<CallbackService>().call<CallbackService::Id::Reset>();
13391282
}
13401283

1341-
TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies)
1342-
{
1343-
auto& state = ref.get<DeviceState>();
1344-
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
1345-
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
1346-
return state.transitionHandling;
1347-
}
1348-
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1349-
auto& deviceContext = ref.get<DeviceContext>();
1350-
// Check if we only have timers
1351-
auto& spec = ref.get<DeviceSpec const>();
1352-
if (hasOnlyTimers(spec)) {
1353-
switchState(ref, StreamingState::EndOfStreaming);
1354-
}
1355-
1356-
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1357-
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1358-
uv_update_time(state.loop);
1359-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1360-
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1361-
}
1362-
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1363-
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1364-
uv_update_time(state.loop);
1365-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1366-
deviceContext.exitTransitionTimeout);
1367-
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1368-
bool onlyGenerated = hasOnlyGenerated(spec);
1369-
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1370-
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1371-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1372-
} else {
1373-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1374-
"New state requested. Waiting for %d seconds before %{public}s",
1375-
timeout,
1376-
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1377-
}
1378-
return TransitionHandlingState::Requested;
1379-
} else {
1380-
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
1381-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1382-
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
1383-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1384-
} else if (policies.termination == TerminationPolicy::QUIT) {
1385-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1386-
} else {
1387-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1388-
}
1389-
return TransitionHandlingState::Expired;
1390-
}
1391-
}
1392-
13931284
void DataProcessingDevice::Run()
13941285
{
13951286
ServiceRegistryRef ref{mServiceRegistry};
@@ -1442,7 +1333,7 @@ void DataProcessingDevice::Run()
14421333
shouldNotWait = true;
14431334
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
14441335
}
1445-
state.transitionHandling = updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
1336+
state.transitionHandling = DataProcessingHelpers::updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
14461337
// If we are Idle, we can then consider the transition to be expired.
14471338
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
14481339
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
@@ -1828,7 +1719,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18281719
// dependent on the callback, not something which is controlled by the
18291720
// framework itself.
18301721
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1831-
switchState(ref, StreamingState::EndOfStreaming);
1722+
DataProcessingHelpers::switchState(ref, StreamingState::EndOfStreaming);
18321723
state.lastActiveDataProcessor = &context;
18331724
}
18341725

@@ -1841,7 +1732,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18411732
/// timers as they do not need to be further processed.
18421733
auto& relayer = ref.get<DataRelayer>();
18431734

1844-
bool shouldProcess = hasOnlyGenerated(spec) == false;
1735+
bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
18451736

18461737
while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
18471738
relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
@@ -1874,7 +1765,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
18741765
}
18751766
// This is needed because the transport is deleted before the device.
18761767
relayer.clear();
1877-
switchState(ref, StreamingState::Idle);
1768+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
18781769
// In case we should process, note the data processor responsible for it
18791770
if (shouldProcess) {
18801771
state.lastActiveDataProcessor = &context;
@@ -2567,7 +2458,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25672458
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
25682459
} else {
25692460
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2570-
switchState(ref, StreamingState::Idle);
2461+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
25712462
}
25722463
if (shouldProcess(action)) {
25732464
auto& timingInfo = ref.get<TimingInfo>();
@@ -2655,7 +2546,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
26552546
for (auto& channel : spec.outputChannels) {
26562547
DataProcessingHelpers::sendEndOfStream(ref, channel);
26572548
}
2658-
switchState(ref, StreamingState::Idle);
2549+
DataProcessingHelpers::switchState(ref, StreamingState::Idle);
26592550
}
26602551

26612552
return true;

0 commit comments

Comments
 (0)