Skip to content

Commit aa88464

Browse files
authored
DPL: introduce a --data-processing-timeout (AliceO2Group#13542)
Used to force stop the data processing and start the calibration. In case it is larger than the exit-transition-timeout, the two will be coalesced into a single transition. For the time being the only side effect is to set the DeviceState::allowedProcessing and to terminate the source devices earlier than the data processing ones. Next PR will make sure that only calibrations are treated when DeviceState::allowedProcessing is set to ProcessingType::Calibration.
1 parent 813cce4 commit aa88464

File tree

7 files changed

+87
-13
lines changed

7 files changed

+87
-13
lines changed

Framework/Core/include/Framework/DeviceContext.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ struct ComputingQuotaStats;
2828
struct DeviceContext {
2929
ComputingQuotaStats* quotaStats = nullptr;
3030
uv_timer_t* gracePeriodTimer = nullptr;
31+
uv_timer_t* dataProcessingGracePeriodTimer = nullptr;
3132
uv_signal_t* sigusr1Handle = nullptr;
3233
int expectedRegionCallbacks = 0;
3334
int exitTransitionTimeout = 0;
35+
int dataProcessingTimeout = 0;
3436
};
3537

3638
} // namespace o2::framework

Framework/Core/include/Framework/DeviceState.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,18 @@ struct DeviceState {
6868
STREAM_CONTEXT_LOG = 1 << 4, // Log for the StreamContext callbacks
6969
};
7070

71+
enum ProcessingType : int {
72+
Any, // Any kind of processing is allowed
73+
CalibrationOnly, // Only calibrations are allowed to be processed / produced
74+
};
75+
7176
std::vector<InputChannelInfo> inputChannelInfos;
7277
StreamingState streaming = StreamingState::Streaming;
78+
// What kind of processing is allowed. By default we allow any.
79+
// If we are past the data processing timeout, this will be
80+
// CalibrationOnly. We need to reset it at every start.
81+
ProcessingType allowedProcessing = ProcessingType::Any;
82+
7383
bool quitRequested = false;
7484
std::atomic<int64_t> cleanupCount = -1;
7585

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,28 @@ void on_transition_requested_expired(uv_timer_t* handle)
142142
if (hasOnlyGenerated(spec)) {
143143
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
144144
} else {
145-
O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "Grace period for data / calibration expired. Exiting.");
145+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for %{public}s expired. Exiting.",
146+
state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration");
146147
}
147148
state.transitionHandling = TransitionHandlingState::Expired;
148149
}
149150

151+
void on_data_processing_expired(uv_timer_t* handle)
152+
{
153+
auto* ref = (ServiceRegistryRef*)handle->data;
154+
auto& state = ref->get<DeviceState>();
155+
state.loopReason |= DeviceState::TIMER_EXPIRED;
156+
157+
// Check if this is a source device
158+
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
159+
160+
// Source devices should never end up in this callback, since the exitTransitionTimeout should
161+
// be reset to the dataProcessingTimeout and the timers cohalesced.
162+
assert(hasOnlyGenerated(ref->get<DeviceSpec const>()) == false);
163+
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
164+
state.allowedProcessing = DeviceState::CalibrationOnly;
165+
}
166+
150167
void on_communication_requested(uv_async_t* s)
151168
{
152169
auto* state = (DeviceState*)s->data;
@@ -949,6 +966,10 @@ void DataProcessingDevice::startPollers()
949966
deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
950967
deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
951968
uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
969+
970+
deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
971+
deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
972+
uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
952973
}
953974

954975
void DataProcessingDevice::stopPollers()
@@ -980,6 +1001,11 @@ void DataProcessingDevice::stopPollers()
9801001
delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
9811002
free(deviceContext.gracePeriodTimer);
9821003
deviceContext.gracePeriodTimer = nullptr;
1004+
1005+
uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
1006+
delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
1007+
free(deviceContext.dataProcessingGracePeriodTimer);
1008+
deviceContext.dataProcessingGracePeriodTimer = nullptr;
9831009
}
9841010

9851011
void DataProcessingDevice::InitTask()
@@ -1015,6 +1041,7 @@ void DataProcessingDevice::InitTask()
10151041

10161042
deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
10171043
deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1044+
deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
10181045

10191046
for (auto& channel : GetChannels()) {
10201047
channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
@@ -1209,6 +1236,7 @@ void DataProcessingDevice::PreRun()
12091236
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
12101237
state.quitRequested = false;
12111238
state.streaming = StreamingState::Streaming;
1239+
state.allowedProcessing = DeviceState::Any;
12121240
for (auto& info : state.inputChannelInfos) {
12131241
if (info.state != InputChannelState::Pull) {
12141242
info.state = InputChannelState::Running;
@@ -1339,6 +1367,19 @@ void DataProcessingDevice::Run()
13391367
state.streaming = StreamingState::EndOfStreaming;
13401368
}
13411369

1370+
// If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively
1371+
// the same (because source devices are not allowed to produce any calibration).
1372+
// should be the same.
1373+
if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) {
1374+
deviceContext.exitTransitionTimeout = deviceContext.dataProcessingTimeout;
1375+
}
1376+
1377+
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1378+
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1379+
uv_update_time(state.loop);
1380+
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1381+
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1382+
}
13421383
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
13431384
state.transitionHandling = TransitionHandlingState::Requested;
13441385
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
15371537
realOdesc.add_options()("child-driver", bpo::value<std::string>());
15381538
realOdesc.add_options()("rate", bpo::value<std::string>());
15391539
realOdesc.add_options()("exit-transition-timeout", bpo::value<std::string>());
1540+
realOdesc.add_options()("data-processing-timeout", bpo::value<std::string>());
15401541
realOdesc.add_options()("expected-region-callbacks", bpo::value<std::string>());
15411542
realOdesc.add_options()("timeframes-rate-limit", bpo::value<std::string>());
15421543
realOdesc.add_options()("environment", bpo::value<std::string>());
@@ -1723,6 +1724,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
17231724
("control-port", bpo::value<std::string>(), "Utility port to be used by O2 Control") //
17241725
("rate", bpo::value<std::string>(), "rate for a data source device (Hz)") //
17251726
("exit-transition-timeout", bpo::value<std::string>(), "timeout before switching to READY state") //
1727+
("data-processing-timeout", bpo::value<std::string>(), "timeout after which only calibration can happen") //
17261728
("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
17271729
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in fly") //
17281730
("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //

Framework/Core/src/O2ControlHelpers.cxx

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,8 @@ void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::s
262262
dumpOut << indLevel << indScheme << "- \"-b\"\n";
263263
dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n";
264264
dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n";
265+
dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n";
266+
dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n";
265267
dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n";
266268
dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n";
267269
dumpOut << indLevel << indScheme << "- \"--session\"\n";
@@ -393,15 +395,20 @@ void dumpTask(std::ostream& dumpOut, const DeviceSpec& spec, const DeviceExecuti
393395
dumpOut << indLevel << "defaults:\n";
394396
dumpOut << indLevel << indScheme << "log_task_stdout: none\n";
395397
dumpOut << indLevel << indScheme << "log_task_stderr: none\n";
396-
std::string exitTransitionTimeout = "15";
398+
std::string exitTransitionTimeout = "15"; // Allow 15 seconds to finish processing and calibrations
399+
std::string dataProcessingTimeout = "10"; // Allow only ten seconds to finish processing
397400
if (execution.args.size() > 2) {
398401
for (size_t i = 0; i < execution.args.size() - 1; ++i) {
399402
if (strcmp(execution.args[i], "--exit-transition-timeout") == 0) {
400403
exitTransitionTimeout = execution.args[i + 1];
401404
}
405+
if (strcmp(execution.args[i], "--data-processing-timeout") == 0) {
406+
dataProcessingTimeout = execution.args[i + 1];
407+
}
402408
}
403409
}
404410
dumpOut << indLevel << indScheme << "exit_transition_timeout: " << exitTransitionTimeout << "\n";
411+
dumpOut << indLevel << indScheme << "data_processing_timeout: " << dataProcessingTimeout << "\n";
405412

406413
if (bfs::path(execution.args[0]).filename().string() != execution.args[0]) {
407414
LOG(warning) << "The workflow template generation was started with absolute or relative executables paths."

Framework/Core/src/runDataProcessing.cxx

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10361036
// declared in the workflow definition are allowed.
10371037
runner.AddHook<fair::mq::hooks::SetCustomCmdLineOptions>([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) {
10381038
std::string defaultExitTransitionTimeout = "0";
1039+
std::string defaultDataProcessingTimeout = "0";
10391040
std::string defaultInfologgerMode = "";
10401041
o2::framework::DeploymentMode deploymentMode = o2::framework::DefaultsHelpers::deploymentMode();
10411042
if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) {
@@ -1047,15 +1048,16 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10471048
boost::program_options::options_description optsDesc;
10481049
ConfigParamsHelper::populateBoostProgramOptions(optsDesc, spec.options, gHiddenDeviceOptions);
10491050
char const* defaultSignposts = getenv("DPL_SIGNPOSTS");
1050-
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
1051-
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
1052-
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
1053-
("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
1054-
("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
1055-
("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
1056-
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
1057-
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
1058-
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
1051+
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
1052+
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
1053+
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
1054+
("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
1055+
("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
1056+
("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
1057+
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
1058+
("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") //
1059+
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
1060+
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
10591061
("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");
10601062
r.fConfig.AddToCmdLineOptions(optsDesc, true);
10611063
});

Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
#include "../src/DeviceSpecHelpers.h"
1616
#include "../src/SimpleResourceManager.h"
1717
#include "../src/ComputingResourceHelpers.h"
18-
#include "Framework/DataAllocator.h"
1918
#include "Framework/DeviceControl.h"
2019
#include "Framework/DeviceSpec.h"
21-
#include "Framework/ProcessingContext.h"
2220
#include "Framework/WorkflowSpec.h"
2321
#include "Framework/DriverConfig.h"
2422
#include "Framework/O2ControlParameters.h"
@@ -141,6 +139,7 @@ const std::vector expectedTasks{
141139
log_task_stdout: none
142140
log_task_stderr: none
143141
exit_transition_timeout: 15
142+
data_processing_timeout: 10
144143
_module_cmdline: >-
145144
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
146145
{{ dpl_command }} | bcsadc/foo
@@ -173,6 +172,8 @@ const std::vector expectedTasks{
173172
- "-b"
174173
- "--exit-transition-timeout"
175174
- "'{{ exit_transition_timeout }}'"
175+
- "--data-processing-timeout"
176+
- "'{{ data_processing_timeout }}'"
176177
- "--monitoring-backend"
177178
- "'{{ monitoring_dpl_url }}'"
178179
- "--session"
@@ -236,6 +237,7 @@ const std::vector expectedTasks{
236237
log_task_stdout: none
237238
log_task_stderr: none
238239
exit_transition_timeout: 15
240+
data_processing_timeout: 10
239241
_module_cmdline: >-
240242
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
241243
{{ dpl_command }} | foo
@@ -270,6 +272,8 @@ const std::vector expectedTasks{
270272
- "-b"
271273
- "--exit-transition-timeout"
272274
- "'{{ exit_transition_timeout }}'"
275+
- "--data-processing-timeout"
276+
- "'{{ data_processing_timeout }}'"
273277
- "--monitoring-backend"
274278
- "'{{ monitoring_dpl_url }}'"
275279
- "--session"
@@ -333,6 +337,7 @@ const std::vector expectedTasks{
333337
log_task_stdout: none
334338
log_task_stderr: none
335339
exit_transition_timeout: 15
340+
data_processing_timeout: 10
336341
_module_cmdline: >-
337342
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
338343
{{ dpl_command }} | foo
@@ -367,6 +372,8 @@ const std::vector expectedTasks{
367372
- "-b"
368373
- "--exit-transition-timeout"
369374
- "'{{ exit_transition_timeout }}'"
375+
- "--data-processing-timeout"
376+
- "'{{ data_processing_timeout }}'"
370377
- "--monitoring-backend"
371378
- "'{{ monitoring_dpl_url }}'"
372379
- "--session"
@@ -430,6 +437,7 @@ const std::vector expectedTasks{
430437
log_task_stdout: none
431438
log_task_stderr: none
432439
exit_transition_timeout: 15
440+
data_processing_timeout: 10
433441
_module_cmdline: >-
434442
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
435443
{{ dpl_command }} | foo
@@ -461,6 +469,8 @@ const std::vector expectedTasks{
461469
- "-b"
462470
- "--exit-transition-timeout"
463471
- "'{{ exit_transition_timeout }}'"
472+
- "--data-processing-timeout"
473+
- "'{{ data_processing_timeout }}'"
464474
- "--monitoring-backend"
465475
- "'{{ monitoring_dpl_url }}'"
466476
- "--session"

0 commit comments

Comments
 (0)