Skip to content

Commit 6731bca

Browse files
committed
feat: separate sensor / cpu streams more cleanly
1 parent b68b4f3 commit 6731bca

File tree

4 files changed

+63
-25
lines changed

4 files changed

+63
-25
lines changed

backend/src/main/java/net/laprun/sustainability/power/sensors/AbstractPowerSensor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ public Measures update(Long tick, Map<String, Double> cpuShares) {
106106
return measures;
107107
}
108108

109+
@Override
110+
public Measures update(long tick) {
111+
return update(tick, Map.of());
112+
}
113+
109114
protected long lastUpdateEpoch() {
110115
return lastUpdateEpoch;
111116
}

backend/src/main/java/net/laprun/sustainability/power/sensors/MapMeasures.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,9 @@ public void record(RegisteredPID pid, SensorMeasure sensorMeasure) {
4848
public SensorMeasure getOrDefault(RegisteredPID pid) {
4949
return measures.getOrDefault(pid, SensorMeasure.missing);
5050
}
51+
52+
@Override
53+
public String toString() {
54+
return measures.toString();
55+
}
5156
}

backend/src/main/java/net/laprun/sustainability/power/sensors/PowerSensor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ default void stop() {
7878
*/
7979
Measures update(Long tick, Map<String, Double> cpuShares);
8080

81+
Measures update(long tick);
82+
8183
/**
8284
* Unregisters the specified {@link RegisteredPID} with this sensor thus signaling that clients are not interested in
8385
* tracking the consumption of the associated process anymore

backend/src/main/java/net/laprun/sustainability/power/sensors/SamplingMeasurer.java

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import net.laprun.sustainability.power.ProcessUtils;
2222
import net.laprun.sustainability.power.SensorMeasure;
2323
import net.laprun.sustainability.power.SensorMetadata;
24+
import net.laprun.sustainability.power.measures.ExternalCPUShareSensorMeasure;
2425
import net.laprun.sustainability.power.persistence.Persistence;
2526
import net.laprun.sustainability.power.sensors.cpu.CPUShare;
2627

@@ -37,7 +38,7 @@ public class SamplingMeasurer {
3738
@ConfigProperty(name = "power-server.sampling-period", defaultValue = DEFAULT_SAMPLING_PERIOD)
3839
Duration samplingPeriod;
3940

40-
private Multi<Measures> periodicSensorCheck;
41+
private Multi<Tuple2<Measures, Map<String, Double>>> periodicSensorCheck;
4142
private final Map<Long, Cancellable> manuallyTrackedProcesses = new ConcurrentHashMap<>();
4243

4344
public PowerSensor sensor() {
@@ -51,7 +52,19 @@ public Multi<SensorMeasure> stream(String pid) throws Exception {
5152

5253
public Multi<SensorMeasure> uncheckedStream(long pid) throws Exception {
5354
final var registeredPID = track(pid);
54-
return periodicSensorCheck.map(measures -> measures.getOrDefault(registeredPID));
55+
return periodicSensorCheck.map(combined -> withExternalCPUShareIfAvailable(registeredPID, combined));
56+
}
57+
58+
private SensorMeasure withExternalCPUShareIfAvailable(RegisteredPID pid, Tuple2<Measures, Map<String, Double>> tuple) {
59+
final var measure = tuple.getItem1().getOrDefault(pid);
60+
final var cpuShares = tuple.getItem2();
61+
if (!cpuShares.isEmpty()) {
62+
final var cpuShare = cpuShares.get(pid.pidAsString());
63+
if (cpuShare != null && cpuShare > 0) {
64+
return new ExternalCPUShareSensorMeasure(measure, cpuShare);
65+
}
66+
}
67+
return measure;
5568
}
5669

5770
@SuppressWarnings("UnusedReturnValue")
@@ -67,19 +80,34 @@ public Cancellable startTrackingApp(String appName, long pid, String session) th
6780
RegisteredPID track(long pid) throws Exception {
6881
final var registeredPID = sensor.register(pid);
6982

83+
startSamplingIfNeeded();
84+
85+
periodicSensorCheck = periodicSensorCheck.onCancellation().invoke(() -> sensor.unregister(registeredPID));
86+
return registeredPID;
87+
}
88+
89+
private void startSamplingIfNeeded() throws Exception {
7090
if (!sensor.isStarted()) {
71-
final var adjusted = sensor.adjustSamplingPeriodIfNeeded(samplingPeriod.toMillis());
72-
Log.infof("%s sensor adjusted its sampling period to %dms", sensor.getClass().getSimpleName(), adjusted);
73-
sensor.start();
91+
// check if sensor wants a different sampling period
92+
final var samplingPeriodMillis = samplingPeriod.toMillis();
93+
final var adjusted = sensor.adjustSamplingPeriodIfNeeded(samplingPeriodMillis);
94+
if (adjusted != samplingPeriodMillis) {
95+
Log.infof("%s sensor adjusted its sampling period to %dms", sensor.getClass().getSimpleName(), adjusted);
96+
}
7497

75-
final var samplingTicks = Multi.createFrom().ticks().every(samplingPeriod);
98+
// start sensor
99+
sensor.start();
76100

101+
// manage external CPU share sampling
102+
final var overSamplingFactor = 3;
103+
final Multi<Map<String, Double>> cpuSharesMulti;
104+
final var cpuSharesTicks = Multi.createFrom().ticks()
105+
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
106+
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
107+
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
77108
if (sensor.wantsCPUShareSamplingEnabled()) {
78-
final var overSamplingFactor = 3;
79-
final var cpuSharesMulti = Multi.createFrom().ticks()
80-
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
81-
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
82-
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
109+
// if enabled, record a cpu share for each tick, group by the over sampling factor and average over these aggregates to produce one value for the power measure interval
110+
cpuSharesMulti = cpuSharesTicks
83111
.map(tick -> CPUShare.cpuSharesFor(sensor.getRegisteredPIDs()))
84112
.group()
85113
.intoLists()
@@ -99,29 +127,27 @@ RegisteredPID track(long pid) throws Exception {
99127
values.stream().mapToDouble(Double::doubleValue).average().orElse(0)));
100128
return averages;
101129
});
102-
periodicSensorCheck = Multi.createBy()
103-
.combining()
104-
.streams(samplingTicks, cpuSharesMulti)
105-
.asTuple()
106-
.map(this::updateSensor);
107130
} else {
108-
periodicSensorCheck = samplingTicks
109-
.map(tick -> sensor.update(tick, Map.of()));
131+
// otherwise, only emit an empty map to signify no external cpu share was recorded
132+
cpuSharesMulti = cpuSharesTicks.map(unused -> Map.of());
110133
}
111134

112-
periodicSensorCheck = periodicSensorCheck
135+
// manage periodic power sampling, measuring sensor values over the sampling period
136+
final var sensorSamplerMulti = Multi.createFrom().ticks()
137+
.every(samplingPeriod)
138+
.map(sensor::update)
113139
.broadcast()
114140
.withCancellationAfterLastSubscriberDeparture()
115141
.toAtLeast(1)
116142
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
117-
}
118143

119-
periodicSensorCheck = periodicSensorCheck.onCancellation().invoke(() -> sensor.unregister(registeredPID));
120-
return registeredPID;
121-
}
144+
// combine both multis
145+
periodicSensorCheck = Multi.createBy()
146+
.combining()
147+
.streams(sensorSamplerMulti, cpuSharesMulti)
148+
.asTuple();
122149

123-
private Measures updateSensor(Tuple2<Long, Map<String, Double>> tuple) {
124-
return sensor.update(tuple.getItem1(), tuple.getItem2());
150+
}
125151
}
126152

127153
/**

0 commit comments

Comments
 (0)