Skip to content

Commit 804eff6

Browse files
committed
feat(?): attempt to model stream of measures as Multi
1 parent 430b719 commit 804eff6

File tree

14 files changed

+212
-117
lines changed

14 files changed

+212
-117
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package net.laprun.sustainability.power.nuprocess;
2+
3+
import java.io.InputStream;
4+
import java.nio.ByteBuffer;
5+
6+
import com.zaxxer.nuprocess.NuProcess;
7+
8+
import io.quarkus.logging.Log;
9+
import io.smallrye.mutiny.subscription.MultiEmitter;
10+
import net.laprun.sustainability.power.sensors.macos.powermetrics.ProcessWrapper;
11+
12+
public class EmitterProcessHandler extends BaseProcessHandler {
13+
private final boolean debug = true;
14+
private MultiEmitter<? super InputStream> emitter;
15+
private long start;
16+
private short ignoreCount = 2;
17+
18+
public EmitterProcessHandler(long periodInMilliSecondsAsString) {
19+
super(ProcessWrapper.preparePowermetricsCommandVarArgs("cpu_power,tasks",
20+
"--show-process-samp-norm", "-b", "65536", "--show-process-gpu", "-i",
21+
"" + periodInMilliSecondsAsString));
22+
}
23+
24+
public void setEmitter(MultiEmitter<? super InputStream> emitter) {
25+
this.emitter = emitter;
26+
}
27+
28+
@Override
29+
public void onPreStart(NuProcess nuProcess) {
30+
start = System.currentTimeMillis();
31+
super.onPreStart(nuProcess);
32+
}
33+
34+
@Override
35+
public void onStdout(ByteBuffer buffer, boolean closed) {
36+
if (ignoreCount-- > 0) {
37+
return;
38+
}
39+
final var now = System.currentTimeMillis();
40+
if (buffer.hasRemaining() && !closed) {
41+
final var remaining = buffer.remaining();
42+
43+
final var readOnlyBuffer = buffer.asReadOnlyBuffer();
44+
final var first3Chars = new byte[4];
45+
readOnlyBuffer.get(first3Chars);
46+
Log.infof("1st 4 bytes: '%s'", new String(first3Chars));
47+
final var last3Chars = new byte[4];
48+
readOnlyBuffer.get(remaining - 4, last3Chars, 0, 3);
49+
Log.infof("last 4 bytes: '%s'", new String(last3Chars));
50+
51+
if (debug) {
52+
var bytes = new byte[remaining];
53+
buffer.duplicate().get(bytes);
54+
Log.infof("=== read %d after %d:\n\n'%s'\n\n===\n", remaining, now - start,
55+
new String(bytes));
56+
start = now;
57+
}
58+
final var shouldBuffer = remaining > 16384;
59+
if (!shouldBuffer) {
60+
return;
61+
}
62+
// log("read", remaining);
63+
// stdOutBuffer.put(buffer);
64+
// Log.warnf("=== read %s", new String(stdOutBuffer.array()));
65+
//emitter.emit(new ByteArrayInputStream(bytes));
66+
// stdOutBuffer.clear();
67+
}
68+
69+
if (closed) {
70+
log("closed", 0);
71+
//emitter.complete();
72+
}
73+
}
74+
75+
private void log(String op, int remaining) {
76+
final var end = System.currentTimeMillis();
77+
Log.infof("%s after %dms, size: %d", op, end - start, remaining);
78+
start = end;
79+
}
80+
}

backend/src/main/java/net/laprun/sustainability/power/sensors/AbstractPowerSensor.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
import org.eclipse.microprofile.config.inject.ConfigProperty;
66

77
import io.quarkus.logging.Log;
8+
import io.smallrye.mutiny.Multi;
9+
import io.smallrye.mutiny.infrastructure.Infrastructure;
810
import net.laprun.sustainability.power.SensorMetadata;
911

10-
public abstract class AbstractPowerSensor implements PowerSensor {
11-
protected final Measures measures = new MapMeasures();
12+
public abstract class AbstractPowerSensor<T> implements PowerSensor {
1213
private final PIDRegistry registry = new PIDRegistry();
14+
private final Measures current = new MapMeasures();
1315
private long lastUpdateEpoch;
1416
private boolean started;
17+
private Multi<Measures> measures;
1518
@ConfigProperty(name = "power-server.enable-cpu-share-sampling", defaultValue = "false")
1619
protected boolean cpuSharesEnabled;
1720
private SensorMetadata metadata;
@@ -65,38 +68,52 @@ public Set<String> registeredPIDsAsStrings() {
6568
}
6669

6770
@Override
68-
public void start() throws Exception {
71+
public Multi<Measures> start() throws Exception {
6972
if (!started) {
7073
lastUpdateEpoch = System.currentTimeMillis();
7174
started = true;
72-
doStart();
75+
measures = doStart()
76+
.broadcast()
77+
.withCancellationAfterLastSubscriberDeparture()
78+
.toAtLeast(1)
79+
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
80+
.onItem()
81+
.transform(this::update);
7382
}
83+
return measures;
7484
}
7585

76-
protected abstract void doStart();
77-
7886
@Override
7987
public boolean isStarted() {
8088
return started;
8189
}
8290

8391
@Override
8492
public void stop() {
85-
PowerSensor.super.stop();
86-
started = false;
93+
if (started) {
94+
PowerSensor.super.stop();
95+
started = false;
96+
}
8797
}
8898

89-
public Measures update(long tick) {
99+
public Measures update(T tick) {
100+
// reset revolving measure so that we don't get values for pids that are not tracked anymore
101+
current.clear();
90102
final long newUpdateStartEpoch = System.currentTimeMillis();
91-
Log.debugf("Sensor update last called: %dms ago", newUpdateStartEpoch - lastUpdateEpoch);
92-
final var measures = doUpdate(lastUpdateEpoch, newUpdateStartEpoch);
103+
Log.infof("Sensor update last called: %dms ago", newUpdateStartEpoch - lastUpdateEpoch);
104+
Log.infof("input %s", tick);
105+
// extract current values into revolving measure
106+
doUpdate(tick, current, lastUpdateEpoch, newUpdateStartEpoch);
107+
Log.infof("Last recorded measure: %s", current);
93108
lastUpdateEpoch = newUpdateStartEpoch;
94-
return measures;
109+
return current;
95110
}
96111

112+
protected abstract Multi<T> doStart();
113+
97114
protected long lastUpdateEpoch() {
98115
return lastUpdateEpoch;
99116
}
100117

101-
abstract protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch);
118+
abstract protected void doUpdate(T tick, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch);
102119
}

backend/src/main/java/net/laprun/sustainability/power/sensors/PowerSensor.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Set;
44

5+
import io.smallrye.mutiny.Multi;
56
import net.laprun.sustainability.power.SensorMetadata;
67

78
/**
@@ -52,7 +53,7 @@ default void stop() {
5253
*
5354
* @throws Exception if the sensor couldn't be started for some reason
5455
*/
55-
void start() throws Exception;
56+
Multi<Measures> start() throws Exception;
5657

5758
/**
5859
* Registers the provided process identifier (pid) with the sensor in case it can provide per-process measures. For sensors
@@ -64,15 +65,6 @@ default void stop() {
6465
*/
6566
RegisteredPID register(long pid);
6667

67-
/**
68-
* Updates the ongoing {@link Measures} being recorded by this sensor for the given tick
69-
*
70-
* @param tick an ordinal value tracking the number of recorded measures being taken by the sensor since it started
71-
* measuring power consumption
72-
* @return the {@link Measures} object recording the measures this sensor has taken since it started measuring
73-
*/
74-
Measures update(long tick);
75-
7668
/**
7769
* Unregisters the specified {@link RegisteredPID} with this sensor thus signaling that clients are not interested in
7870
* tracking the consumption of the associated process anymore

backend/src/main/java/net/laprun/sustainability/power/sensors/SamplingMeasurer.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,16 @@ private void startSamplingIfNeeded() throws Exception {
9595
Log.infof("%s sensor adjusted its sampling period to %dms", sensor.getClass().getSimpleName(), adjusted);
9696
}
9797

98-
// start sensor
99-
sensor.start();
98+
// start periodic power sampling, measuring sensor values over the sampling period
99+
final var sensorSamplerMulti = sensor.start();
100100

101101
// manage external CPU share sampling
102102
final var overSamplingFactor = 3;
103103
final Multi<Map<String, Double>> cpuSharesMulti;
104104
final var cpuSharesTicks = Multi.createFrom().ticks()
105105
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
106106
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
107-
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
107+
.emitOn(Infrastructure.getDefaultWorkerPool());
108108
if (sensor.wantsCPUShareSamplingEnabled()) {
109109
// if enabled, record a cpu share for each tick, group by the over sampling factor and average over these aggregates to produce one value for the power measure interval
110110
cpuSharesMulti = cpuSharesTicks
@@ -132,15 +132,6 @@ private void startSamplingIfNeeded() throws Exception {
132132
cpuSharesMulti = cpuSharesTicks.map(unused -> Map.of());
133133
}
134134

135-
// manage periodic power sampling, measuring sensor values over the sampling period
136-
final var sensorSamplerMulti = Multi.createFrom().ticks()
137-
.every(samplingPeriod)
138-
.map(sensor::update)
139-
.broadcast()
140-
.withCancellationAfterLastSubscriberDeparture()
141-
.toAtLeast(1)
142-
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
143-
144135
// combine both multis
145136
periodicSensorCheck = Multi.createBy()
146137
.combining()

backend/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
import java.nio.file.Files;
66
import java.nio.file.Path;
7+
import java.time.Duration;
78
import java.util.*;
89
import java.util.function.BiConsumer;
910

1011
import io.quarkus.logging.Log;
12+
import io.smallrye.mutiny.Multi;
1113
import net.laprun.sustainability.power.SensorMetadata;
1214
import net.laprun.sustainability.power.measures.NoDurationSensorMeasure;
1315
import net.laprun.sustainability.power.sensors.AbstractPowerSensor;
@@ -16,11 +18,12 @@
1618
/**
1719
* A sensor using Intel's RAPL accessed via Linux' powercap system.
1820
*/
19-
public class IntelRAPLSensor extends AbstractPowerSensor {
21+
public class IntelRAPLSensor extends AbstractPowerSensor<Long> {
2022
private final RAPLFile[] raplFiles;
2123
private final int rawOffset;
2224
private SensorMetadata nativeMetadata;
2325
private final long[] lastMeasuredSensorValues;
26+
private Duration samplingPeriod;
2427

2528
/**
2629
* Initializes the RAPL sensor
@@ -101,9 +104,17 @@ private static boolean addFileIfReadable(String raplFileAsString, SortedMap<Stri
101104
}
102105

103106
@Override
104-
public void doStart() {
107+
public long adjustSamplingPeriodIfNeeded(long requestedSamplingPeriodInMillis) {
108+
samplingPeriod = Duration.ofMillis(requestedSamplingPeriodInMillis);
109+
return super.adjustSamplingPeriodIfNeeded(requestedSamplingPeriodInMillis);
110+
}
111+
112+
@Override
113+
public Multi<Long> doStart() {
105114
// perform an initial measure to prime the data
106115
readAndRecordSensor(null, lastUpdateEpoch());
116+
return Multi.createFrom().ticks()
117+
.every(samplingPeriod);
107118
}
108119

109120
/**
@@ -138,7 +149,7 @@ protected SensorMetadata nativeMetadata() {
138149
}
139150

140151
@Override
141-
protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) {
152+
protected void doUpdate(Long tick, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) {
142153
final var measure = new double[metadata().componentCardinality()];
143154
readAndRecordSensor((value, index) -> {
144155
measure[index] = computePowerInMilliWatts(index, value, newUpdateStartEpoch);
@@ -147,9 +158,7 @@ protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) {
147158
newUpdateStartEpoch);
148159

149160
final var single = new NoDurationSensorMeasure(measure, lastUpdateEpoch, newUpdateStartEpoch);
150-
registeredPIDs().forEach(pid -> measures.record(pid, single));
151-
152-
return measures;
161+
registeredPIDs().forEach(pid -> current.record(pid, single));
153162
}
154163

155164
protected void readAndRecordSensor(BiConsumer<Long, Integer> onReadingSensorValueAtIndex, long newUpdateStartEpoch) {

backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/FileMacOSPowermetricsSensor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import java.io.FileInputStream;
55
import java.io.InputStream;
66

7+
import io.smallrye.mutiny.Multi;
8+
79
/**
810
* The aim of this sensor is to only perform one long measure and then read the power information from it once done,
911
*/
@@ -15,7 +17,11 @@ public FileMacOSPowermetricsSensor(File file) {
1517
}
1618

1719
@Override
18-
protected InputStream getInputStream() {
20+
protected Multi<InputStream> getInputStream() {
21+
return Multi.createFrom().item(fromFile());
22+
}
23+
24+
private FileInputStream fromFile() {
1925
try {
2026
return new FileInputStream(file);
2127
} catch (Exception e) {
@@ -26,7 +32,7 @@ protected InputStream getInputStream() {
2632
@Override
2733
public void stop() {
2834
// need to defer reading metadata until we know the file has been populated
29-
initMetadata(getInputStream());
35+
initMetadata(fromFile());
3036
super.stop();
3137
}
3238
}

backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.InputStream;
44

55
import io.quarkus.logging.Log;
6+
import io.smallrye.mutiny.Multi;
67
import net.laprun.sustainability.power.SensorMetadata;
78
import net.laprun.sustainability.power.sensors.AbstractPowerSensor;
89
import net.laprun.sustainability.power.sensors.Measures;
@@ -12,7 +13,7 @@
1213
/**
1314
* A macOS powermetrics based {@link PowerSensor} implementation.
1415
*/
15-
public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor {
16+
public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor<InputStream> {
1617
/**
1718
* The Central Processing Unit component name
1819
*/
@@ -42,7 +43,7 @@ public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor {
4243
public static final String CPU_SHARE = "cpuShare";
4344

4445
private CPU cpu;
45-
private long lastCalled;
46+
private long lastCalled = System.currentTimeMillis();
4647

4748
@Override
4849
public boolean supportsProcessAttribution() {
@@ -59,30 +60,26 @@ protected SensorMetadata nativeMetadata() {
5960
}
6061

6162
@Override
62-
protected void doStart() {
63-
// nothing to do here by default
64-
if (Log.isDebugEnabled()) {
65-
lastCalled = System.currentTimeMillis();
66-
}
63+
protected Multi<InputStream> doStart() {
64+
return getInputStream();
6765
}
6866

69-
Measures extractPowerMeasure(InputStream powerMeasureInput, long lastUpdateEpoch, long newUpdateEpoch) {
67+
void extractPowerMeasure(InputStream powerMeasureInput, Measures current, long lastUpdateEpoch, long newUpdateEpoch) {
7068
if (Log.isDebugEnabled()) {
7169
final var start = System.currentTimeMillis();
7270
Log.debugf("powermetrics measure extraction last called %dms ago", (start - lastCalled));
7371
lastCalled = start;
7472
}
75-
PowerMetricsParser.extractPowerMeasure(powerMeasureInput, measures, lastUpdateEpoch, newUpdateEpoch, registeredPIDs(),
73+
PowerMetricsParser.extractPowerMeasure(powerMeasureInput, current, lastUpdateEpoch, newUpdateEpoch, registeredPIDs(),
7674
metadata(), cpu);
77-
return measures;
7875
}
7976

8077
@Override
81-
protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) {
82-
return extractPowerMeasure(getInputStream(), lastUpdateEpoch, newUpdateStartEpoch);
78+
protected void doUpdate(InputStream inputStream, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) {
79+
extractPowerMeasure(inputStream, current, lastUpdateEpoch, newUpdateStartEpoch);
8380
}
8481

85-
protected abstract InputStream getInputStream();
82+
protected abstract Multi<InputStream> getInputStream();
8683

8784
@Override
8885
public void unregister(RegisteredPID registeredPID) {

0 commit comments

Comments
 (0)