Skip to content

Commit 253c99b

Browse files
authored
DPL: drop incoming non-calibration data when we process calibration only (AliceO2Group#13544)
1 parent 1de0de3 commit 253c99b

File tree

4 files changed

+30
-13
lines changed

4 files changed

+30
-13
lines changed

Framework/Core/include/Framework/DeviceState.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
#include <vector>
1919
#include <string>
20-
#include <map>
21-
#include <utility>
2220
#include <atomic>
2321

2422
typedef struct uv_loop_s uv_loop_t;

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
// or submit itself to any jurisdiction.
1111
#include "Framework/AsyncQueue.h"
1212
#include "Framework/DataProcessingDevice.h"
13-
#include "Framework/ChannelMatching.h"
1413
#include "Framework/ControlService.h"
1514
#include "Framework/ComputingQuotaEvaluator.h"
1615
#include "Framework/DataProcessingHeader.h"
@@ -28,7 +27,6 @@
2827
#include "ConfigurationOptionsRetriever.h"
2928
#include "Framework/FairMQDeviceProxy.h"
3029
#include "Framework/CallbackService.h"
31-
#include "Framework/TMessageSerializer.h"
3230
#include "Framework/InputRecord.h"
3331
#include "Framework/InputSpan.h"
3432
#if defined(__APPLE__) || defined(NDEBUG)
@@ -37,23 +35,20 @@
3735
#include "Framework/Signpost.h"
3836
#include "Framework/TimingHelpers.h"
3937
#include "Framework/SourceInfoHeader.h"
40-
#include "Framework/Logger.h"
4138
#include "Framework/DriverClient.h"
42-
#include "Framework/Monitoring.h"
4339
#include "Framework/TimesliceIndex.h"
4440
#include "Framework/VariableContextHelpers.h"
4541
#include "Framework/DataProcessingContext.h"
42+
#include "Framework/DataProcessingHeader.h"
4643
#include "Framework/DeviceContext.h"
4744
#include "Framework/RawDeviceService.h"
4845
#include "Framework/StreamContext.h"
4946
#include "Framework/DefaultsHelpers.h"
47+
#include "Framework/ServiceRegistryRef.h"
5048

51-
#include "PropertyTreeHelpers.h"
52-
#include "DataProcessingStatus.h"
5349
#include "DecongestionService.h"
5450
#include "Framework/DataProcessingHelpers.h"
5551
#include "DataRelayerHelpers.h"
56-
#include "ProcessingPoliciesHelpers.h"
5752
#include "Headers/DataHeader.h"
5853
#include "Headers/DataHeaderHelpers.h"
5954

@@ -66,6 +61,7 @@
6661
#include <fairmq/ProgOptions.h>
6762
#include <Configuration/ConfigurationInterface.h>
6863
#include <Configuration/ConfigurationFactory.h>
64+
#include <Monitoring/Monitoring.h>
6965
#include <TMessage.h>
7066
#include <TClonesArray.h>
7167

@@ -74,7 +70,6 @@
7470
#include <vector>
7571
#include <numeric>
7672
#include <memory>
77-
#include <unordered_map>
7873
#include <uv.h>
7974
#include <execinfo.h>
8075
#include <sstream>

Framework/Core/src/DataRelayer.cxx

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,12 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop)
402402
pruneCache(slot);
403403
}
404404

405+
bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
406+
{
407+
auto* dh = o2::header::get<DataHeader*>(first->GetData());
408+
return dh->flagsDerivedHeader & DataProcessingHeader::KEEP_AT_EOS_FLAG;
409+
}
410+
405411
DataRelayer::RelayChoice
406412
DataRelayer::relay(void const* rawHeader,
407413
std::unique_ptr<fair::mq::Message>* messages,
@@ -456,7 +462,7 @@ DataRelayer::RelayChoice
456462
&nPayloads,
457463
&cache = mCache,
458464
&services = mContext,
459-
numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) {
465+
numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
460466
O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
461467
O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
462468
fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
@@ -468,11 +474,20 @@ DataRelayer::RelayChoice
468474
// TODO: make sure that multiple parts can only be added within the same call of
469475
// DataRelayer::relay
470476
assert(nPayloads > 0);
477+
size_t saved = 0;
471478
for (size_t mi = 0; mi < nMessages; ++mi) {
472479
assert(mi + nPayloads < nMessages);
480+
// We are in calibration mode and the data does not have the calibration bit set.
481+
// We do not store it.
482+
if (services.get<DeviceState>().allowedProcessing == DeviceState::ProcessingType::CalibrationOnly && !isCalibrationData(messages[mi])) {
483+
mi += nPayloads;
484+
continue;
485+
}
473486
target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
474487
mi += nPayloads;
488+
saved += nPayloads;
475489
}
490+
return saved;
476491
};
477492

478493
auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
@@ -551,7 +566,10 @@ DataRelayer::RelayChoice
551566
this->pruneCache(slot, onDrop);
552567
mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
553568
}
554-
saveInSlot(timeslice, input, slot, info);
569+
size_t saved = saveInSlot(timeslice, input, slot, info);
570+
if (saved == 0) {
571+
return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
572+
}
555573
index.publishSlot(slot);
556574
index.markAsDirty(slot, true);
557575
stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
@@ -633,7 +651,10 @@ DataRelayer::RelayChoice
633651
// cache still holds the old data, so we prune it.
634652
this->pruneCache(slot, onDrop);
635653
mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
636-
saveInSlot(timeslice, input, slot, info);
654+
size_t saved = saveInSlot(timeslice, input, slot, info);
655+
if (saved == 0) {
656+
return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
657+
}
637658
index.publishSlot(slot);
638659
index.markAsDirty(slot, true);
639660
return RelayChoice{.type = RelayChoice::Type::WillRelay};

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include <catch_amalgamated.hpp>
13+
#include "Framework/DeviceState.h"
1314
#include "Headers/DataHeader.h"
1415
#include "Headers/Stack.h"
1516
#include "MemoryResources/MemoryResources.h"
@@ -60,10 +61,12 @@ TEST_CASE("DataRelayer")
6061
stats.registerMetric(spec);
6162
}
6263

64+
DeviceState state;
6365
ref.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
6466
ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
6567
ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
6668
ref.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
69+
ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&state));
6770
// A simple test where an input is provided
6871
// and the subsequent InputRecord is immediately requested.
6972
SECTION("TestNoWait")

0 commit comments

Comments
 (0)