Skip to content

Commit 0f629dd

Browse files
authored
Merge 2541d62 into sapling-pr-archive-ktf
2 parents 06b5843 + 2541d62 commit 0f629dd

File tree

6 files changed

+63
-64
lines changed

6 files changed

+63
-64
lines changed

Framework/Core/include/Framework/CommonServices.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ struct CommonServices {
5656
return [](InitContext&, void* service) -> void* { return service; };
5757
}
5858

59-
static ServiceSpec deviceContextSpec();
6059
static ServiceSpec dataProcessorContextSpec();
6160
static ServiceSpec driverClientSpec();
6261
static ServiceSpec monitoringSpec();

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ struct DeviceConfigurationHelpers {
7777
class DataProcessingDevice : public fair::mq::Device
7878
{
7979
public:
80-
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies);
80+
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&);
8181
void Init() final;
8282
void InitTask() final;
8383
void PreRun() final;

Framework/Core/include/Framework/DeviceContext.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ typedef struct uv_signal_s uv_signal_t;
2121
namespace o2::framework
2222
{
2323
struct ComputingQuotaStats;
24+
struct ProcessingPolicies;
2425

2526
/// Stucture which holds the whole runtime context
2627
/// of a running device which is not stored as
@@ -33,6 +34,7 @@ struct DeviceContext {
3334
int expectedRegionCallbacks = 0;
3435
int exitTransitionTimeout = 0;
3536
int dataProcessingTimeout = 0;
37+
ProcessingPolicies &processingPolicies;
3638
};
3739

3840
} // namespace o2::framework

Framework/Core/src/CommonServices.cxx

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,17 +1233,6 @@ o2::framework::ServiceSpec CommonServices::dataProcessorContextSpec()
12331233
.kind = ServiceKind::Serial};
12341234
}
12351235

1236-
o2::framework::ServiceSpec CommonServices::deviceContextSpec()
1237-
{
1238-
return ServiceSpec{
1239-
.name = "device-context",
1240-
.init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1241-
return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
1242-
},
1243-
.configure = noConfiguration(),
1244-
.kind = ServiceKind::Serial};
1245-
}
1246-
12471236
o2::framework::ServiceSpec CommonServices::dataAllocatorSpec()
12481237
{
12491238
return ServiceSpec{

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "Framework/DataProcessor.h"
1818
#include "Framework/DataSpecUtils.h"
1919
#include "Framework/DeviceState.h"
20+
#include "Framework/DeviceStateEnums.h"
2021
#include "Framework/DispatchPolicy.h"
2122
#include "Framework/DispatchControl.h"
2223
#include "Framework/DanglingContext.h"
@@ -196,11 +197,10 @@ struct locked_execution {
196197
~locked_execution() { ref.unlock(); }
197198
};
198199

199-
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry, ProcessingPolicies& policies)
200+
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry)
200201
: mRunningDevice{running},
201202
mConfigRegistry{nullptr},
202-
mServiceRegistry{registry},
203-
mProcessingPolicies{policies}
203+
mServiceRegistry{registry}
204204
{
205205
GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
206206
if (key == "cleanup") {
@@ -247,6 +247,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
247247
mHandles.resize(1);
248248

249249
ServiceRegistryRef ref{mServiceRegistry};
250+
250251
mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
251252
auto& state = ref.get<DeviceState>();
252253
assert(state.loop);
@@ -1330,6 +1331,58 @@ void DataProcessingDevice::Reset()
13301331
ref.get<CallbackService>().call<CallbackService::Id::Reset>();
13311332
}
13321333

1334+
TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies)
1335+
{
1336+
auto& state = ref.get<DeviceState>();
1337+
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
1338+
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
1339+
return state.transitionHandling;
1340+
}
1341+
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1342+
auto& deviceContext = ref.get<DeviceContext>();
1343+
// Check if we only have timers
1344+
auto& spec = ref.get<DeviceSpec const>();
1345+
if (hasOnlyTimers(spec)) {
1346+
switchState(ref, StreamingState::EndOfStreaming);
1347+
}
1348+
1349+
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1350+
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1351+
uv_update_time(state.loop);
1352+
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1353+
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1354+
}
1355+
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1356+
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1357+
uv_update_time(state.loop);
1358+
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1359+
deviceContext.exitTransitionTimeout);
1360+
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1361+
bool onlyGenerated = hasOnlyGenerated(spec);
1362+
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1363+
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1364+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1365+
} else {
1366+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1367+
"New state requested. Waiting for %d seconds before %{public}s",
1368+
timeout,
1369+
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1370+
}
1371+
return TransitionHandlingState::Requested;
1372+
} else {
1373+
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
1374+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1375+
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
1376+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1377+
} else if (policies.termination == TerminationPolicy::QUIT) {
1378+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1379+
} else {
1380+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1381+
}
1382+
return TransitionHandlingState::Expired;
1383+
}
1384+
}
1385+
13331386
void DataProcessingDevice::Run()
13341387
{
13351388
ServiceRegistryRef ref{mServiceRegistry};
@@ -1382,51 +1435,7 @@ void DataProcessingDevice::Run()
13821435
shouldNotWait = true;
13831436
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
13841437
}
1385-
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
1386-
state.transitionHandling = TransitionHandlingState::Requested;
1387-
auto& deviceContext = ref.get<DeviceContext>();
1388-
// Check if we only have timers
1389-
auto& spec = ref.get<DeviceSpec const>();
1390-
if (hasOnlyTimers(spec)) {
1391-
switchState(ref, StreamingState::EndOfStreaming);
1392-
}
1393-
1394-
// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1395-
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1396-
uv_update_time(state.loop);
1397-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1398-
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1399-
}
1400-
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1401-
state.transitionHandling = TransitionHandlingState::Requested;
1402-
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1403-
uv_update_time(state.loop);
1404-
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1405-
deviceContext.exitTransitionTimeout);
1406-
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1407-
bool onlyGenerated = hasOnlyGenerated(spec);
1408-
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1409-
if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1410-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1411-
} else {
1412-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1413-
"New state requested. Waiting for %d seconds before %{public}s",
1414-
timeout,
1415-
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1416-
}
1417-
} else {
1418-
state.transitionHandling = TransitionHandlingState::Expired;
1419-
if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1420-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1421-
} else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
1422-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1423-
} else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1424-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1425-
} else {
1426-
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1427-
}
1428-
}
1429-
}
1438+
state.transitionHandling = updateStateTransition(ref, mProcessingPolicies);
14301439
// If we are Idle, we can then consider the transition to be expired.
14311440
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
14321441
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
@@ -1560,7 +1569,7 @@ void DataProcessingDevice::Run()
15601569
}
15611570
}
15621571

1563-
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1572+
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
15641573
auto& spec = ref.get<DeviceSpec const>();
15651574
/// Cleanup messages which are still pending on exit.
15661575
for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {

Framework/Core/src/runDataProcessing.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,13 +1091,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10911091
quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
10921092
serviceRef.registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));
10931093

1094-
deviceContext = std::make_unique<DeviceContext>();
1094+
deviceContext = std::make_unique<DeviceContext>(DeviceContext{.processingPolicies = processingPolicies});
10951095
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
10961096
serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
10971097
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
10981098
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
10991099

1100-
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry, processingPolicies);
1100+
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry);
11011101

11021102
serviceRef.get<RawDeviceService>().setDevice(device.get());
11031103
r.fDevice = std::move(device);

0 commit comments

Comments
 (0)