Skip to content

Commit 9c82299

Browse files
committed
DPL: allow customising DataProcessingStats intervals
In particular, this allows setting a minimum value for online metrics.
1 parent b3c6c34 commit 9c82299

File tree

8 files changed

+35
-19
lines changed

8 files changed

+35
-19
lines changed

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <atomic>
1717
#include <cstdint>
1818
#include <array>
19+
#include <memory>
1920
#include <numeric>
2021
#include <mutex>
2122
#include <utility>
@@ -69,8 +70,16 @@ enum struct ProcessingStatsId : short {
6970

7071
/// Helper struct to hold statistics about the data processing happening.
7172
struct DataProcessingStats {
73+
// Parameters for the default behaviour
74+
struct DefaultConfig {
75+
int64_t minOnlinePublishInterval = 0;
76+
};
77+
78+
DefaultConfig config = {};
79+
7280
DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
73-
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp);
81+
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp,
82+
DefaultConfig config);
7483

7584
constexpr static ServiceKind service_kind = ServiceKind::Global;
7685
constexpr static unsigned short MAX_METRICS = 1 << 15;

Framework/Core/src/CommonServices.cxx

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -715,14 +715,14 @@ o2::framework::ServiceSpec
715715
O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
716716
(uint64_t)oldestPossibleOutput.timeslice.value);
717717
AsyncQueueHelpers::post(
718-
queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
719-
.id = decongestion.oldestPossibleTimesliceTask,
718+
queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
719+
.id = decongestion.oldestPossibleTimesliceTask,
720720
.debounce = -1, .callback = decongestionCallback}
721721
.user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
722722

723723
if (decongestion.orderedCompletionPolicyActive) {
724724
AsyncQueueHelpers::post(
725-
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
725+
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
726726
.callback = decongestionCallbackOrdered}
727727
.user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
728728
} },
@@ -867,8 +867,11 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
867867
clock_gettime(CLOCK_REALTIME, &now);
868868
uv_update_time(state.loop);
869869
uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
870+
DataProcessingStats::DefaultConfig config = {
871+
.minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
870872
auto* stats = new DataProcessingStats(TimingHelpers::defaultRealtimeBaseConfigurator(offset, state.loop),
871-
TimingHelpers::defaultCPUTimeConfigurator(state.loop));
873+
TimingHelpers::defaultCPUTimeConfigurator(state.loop),
874+
config);
872875
auto& runningWorkflow = services.get<RunningWorkflowInfo const>();
873876

874877
// It makes no sense to update the stats more often than every 5s

Framework/Core/src/DataProcessingStats.cxx

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,20 @@
1111

1212
#include "Framework/DataProcessingStats.h"
1313
#include "Framework/RuntimeError.h"
14-
#include "Framework/ServiceRegistryRef.h"
15-
#include "Framework/DeviceState.h"
1614
#include "Framework/Logger.h"
1715
#include <uv.h>
18-
#include <iostream>
1916
#include <atomic>
20-
#include <utility>
17+
#include <thread>
2118

2219
namespace o2::framework
2320
{
2421

2522
DataProcessingStats::DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase_,
26-
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_)
23+
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_,
24+
DefaultConfig config_)
2725
: getTimestamp(getTimestamp_),
28-
getRealtimeBase(getRealtimeBase_)
26+
getRealtimeBase(getRealtimeBase_),
27+
config(config_)
2928
{
3029
getRealtimeBase(realTimeBase, initialTimeOffset);
3130
}
@@ -269,6 +268,9 @@ void DataProcessingStats::registerMetric(MetricSpec const& spec)
269268
metricSpecs[spec.metricId] = spec;
270269
metricsNames[spec.metricId] = spec.name;
271270
metrics[spec.metricId] = spec.defaultValue;
271+
if (metricSpecs[spec.metricId].scope == Scope::Online) {
272+
metricSpecs[spec.metricId].minPublishInterval = std::max(metricSpecs[spec.metricId].minPublishInterval, config.minOnlinePublishInterval);
273+
}
272274
int64_t currentTime = getTimestamp(realTimeBase, initialTimeOffset);
273275
updateInfos[spec.metricId] = UpdateInfo{currentTime, currentTime};
274276
updated[spec.metricId] = spec.sendInitialValue;

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1753,6 +1753,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
17531753
("configuration,cfg", bpo::value<std::string>(), "configuration connection string") //
17541754
("driver-client-backend", bpo::value<std::string>(), "driver connection string") //
17551755
("monitoring-backend", bpo::value<std::string>(), "monitoring connection string") //
1756+
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>(), "minimum flushing interval for online metrics (in s)") //
17561757
("infologger-mode", bpo::value<std::string>(), "O2_INFOLOGGER_MODE override") //
17571758
("infologger-severity", bpo::value<std::string>(), "minimun FairLogger severity which goes to info logger") //
17581759
("dpl-tracing-flags", bpo::value<std::string>(), "pipe separated list of events to trace") //

Framework/Core/src/runDataProcessing.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
10491049
ConfigParamsHelper::populateBoostProgramOptions(optsDesc, spec.options, gHiddenDeviceOptions);
10501050
char const* defaultSignposts = getenv("DPL_SIGNPOSTS");
10511051
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
1052+
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>()->default_value("0"), "minimum flushing interval for online metrics (in s)") //
10521053
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
10531054
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
10541055
("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //

Framework/Core/test/test_ComputingQuotaEvaluator.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ TEST_CASE("TestComputingQuotaEvaluator")
7575
};
7676

7777
DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
78-
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
78+
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
7979

8080
ServiceRegistry registry;
8181
ServiceRegistryRef ref(registry);

Framework/Core/test/test_DataProcessingStats.cxx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ using namespace o2::framework;
3030
TEST_CASE("DataProcessingStats")
3131
{
3232
DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
33-
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
33+
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
3434

3535
o2::framework::clean_all_runtime_errors();
3636
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
@@ -190,7 +190,7 @@ TEST_CASE("DataProcessingStatsOutOfOrder")
190190
int64_t value[] = {0, 1000, 999, 998};
191191
return base + value[count++] - offset;
192192
};
193-
DataProcessingStats stats(realtimeTime, cpuTime);
193+
DataProcessingStats stats(realtimeTime, cpuTime, {});
194194
// Notice this will consume one value in the cpuTime.
195195
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
196196
stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 2});
@@ -222,7 +222,7 @@ TEST_CASE("DataProcessingStatsInstantaneousRate")
222222

223223
// I want to push deltas since the last update and have the immediate time
224224
// averaged being stored.
225-
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator);
225+
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
226226
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
227227
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 0);
228228
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
@@ -265,7 +265,7 @@ TEST_CASE("DataProcessingStatsCumulativeRate")
265265

266266
// I want to push deltas since the last update and have the immediate time
267267
// averaged being stored.
268-
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator);
268+
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
269269
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
270270
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
271271
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
@@ -310,7 +310,7 @@ TEST_CASE("DataProcessingStatsPublishing")
310310

311311
// I want to push deltas since the last update and have the immediate time
312312
// averaged being stored.
313-
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp);
313+
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
314314
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 5000});
315315
stats.registerMetric({.name = "dummy_metric2", .metricId = DummyMetric2, .minPublishInterval = 2000});
316316
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
@@ -355,7 +355,7 @@ TEST_CASE("DataProcessingStatsPublishingRepeated")
355355

356356
// I want to push deltas since the last update and have the immediate time
357357
// averaged being stored.
358-
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp);
358+
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
359359
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 3000, .maxRefreshLatency = 9000});
360360
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
361361
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ TEST_CASE("DataRelayer")
4848
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
4949
DataProcessingStats stats(
5050
TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
51-
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
51+
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
5252
int quickUpdateInterval = 1;
5353
using MetricSpec = DataProcessingStats::MetricSpec;
5454
std::vector<MetricSpec> specs{

0 commit comments

Comments
 (0)