From bc53adc84ca4839394b7d2e1ed56ba6491466acc Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 29 Jan 2025 22:00:19 +0100 Subject: [PATCH 1/2] fix: better synchronization --- .../power/measure/OngoingPowerMeasure.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java b/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java index 0e219ae..298e63a 100644 --- a/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java +++ b/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java @@ -21,8 +21,13 @@ public class OngoingPowerMeasure extends ProcessorAware implements PowerMeasure private final List syntheticComponents; private int samples; private long[] timestamps; + private long samplePeriod; public OngoingPowerMeasure(SensorMetadata metadata, SyntheticComponent... syntheticComponents) { + this(metadata, -1, syntheticComponents); + } + + public OngoingPowerMeasure(SensorMetadata metadata, long samplePeriod, SyntheticComponent... syntheticComponents) { super(Processors.empty); startedAt = System.currentTimeMillis(); @@ -30,6 +35,7 @@ public OngoingPowerMeasure(SensorMetadata metadata, SyntheticComponent... synthe measures = new double[numComponents][DEFAULT_SIZE]; nonZeroComponents = new BitSet(numComponents); timestamps = new long[DEFAULT_SIZE]; + this.samplePeriod = samplePeriod; if (syntheticComponents != null) { final var builder = SensorMetadata.from(metadata); @@ -47,7 +53,7 @@ public OngoingPowerMeasure(SensorMetadata metadata, SyntheticComponent... synthe } @Override - public int numberOfSamples() { + public synchronized int numberOfSamples() { return samples; } @@ -57,19 +63,20 @@ public SensorMetadata metadata() { } public void recordMeasure(double[] components) { - samples++; ensureArraysSize(); final var timestamp = System.currentTimeMillis(); - timestamps[samples - 1] = timestamp; - for (int component = 0; component < components.length; component++) { - final var componentValue = components[component]; - // record that the value is not zero - if (componentValue != 0) { - nonZeroComponents.set(component); + synchronized (this) { + timestamps[samples - 1] = timestamp; + for (int component = 0; component < components.length; component++) { + final var componentValue = components[component]; + // record that the value is not zero + if (componentValue != 0) { + nonZeroComponents.set(component); + } + measures[component][samples - 1] = componentValue; } - measures[component][samples - 1] = componentValue; } final var processors = processors(); @@ -82,7 +89,8 @@ public void recordMeasure(double[] components) { } } - private void ensureArraysSize() { + private synchronized void ensureArraysSize() { + samples++; final int currentSize = timestamps.length; if (currentSize <= samples) { final var newSize = currentSize * 2; @@ -123,7 +131,7 @@ private static boolean targetComponentExistsAndIsRecorder(int component, Registe } @Override - public TimestampedMeasures getNthTimestampedMeasures(int n) { + public synchronized TimestampedMeasures getNthTimestampedMeasures(int n) { n = Math.min(n, samples - 1); final var result = new double[measures.length]; for (int i = 0; i < measures.length; i++) { From 626cff8997f0ea071c0f697059eac5e2ef5d1e88 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 29 Jan 2025 22:01:24 +0100 Subject: [PATCH 2/2] feat: add cursor notion to retrieve measures based on timestamp --- .../sustainability/power/measure/Cursor.java | 109 ++++++++++++++++++ .../power/measure/OngoingPowerMeasure.java | 4 + .../power/measure/CursorTest.java | 54 +++++++++ 3 files changed, 167 insertions(+) create mode 100644 measure/src/main/java/net/laprun/sustainability/power/measure/Cursor.java create mode 100644 measure/src/test/java/net/laprun/sustainability/power/measure/CursorTest.java diff --git a/measure/src/main/java/net/laprun/sustainability/power/measure/Cursor.java b/measure/src/main/java/net/laprun/sustainability/power/measure/Cursor.java new file mode 100644 index 0000000..9da34d3 --- /dev/null +++ b/measure/src/main/java/net/laprun/sustainability/power/measure/Cursor.java @@ -0,0 +1,109 @@ +package net.laprun.sustainability.power.measure; + +import java.time.Duration; +import java.util.Arrays; + +public enum Cursor { + ; + + public static PartialCursor cursorOver(long[] timestamps, long timestamp, Duration duration, long initialOffset, + long averagePeriodHint) { + // adjusted timestamp for modding + System.out.println(Arrays.toString(timestamps)); + System.out.println("timestamp = " + timestamp); + final var timestampForDiv = timestamp - initialOffset; + final var durationAsMs = duration.toMillis(); + System.out.println("durationAsMs = " + durationAsMs); + + // cannot find an interval for a timestamp that is before the recording started + if (timestampForDiv < 0) { + return PartialCursor.empty; + } + + if (timestamps.length < 2) { + // if we don't have a sample period, use the full measure + double ratio = 1.0; + if (averagePeriodHint > 0) { + ratio = (double) durationAsMs / averagePeriodHint; + } + return new PartialCursor(0, 0, ratio, ratio); + } + + // estimate sample period based on 2 samples interval + if (averagePeriodHint <= 0) { + averagePeriodHint = timestamps[1] - timestamps[0]; + } + + // first, find potential first sample based on timestamp + int startIndex = (int) Math.floorDiv(timestampForDiv, averagePeriodHint); + System.out.println("startIndex = " + startIndex); + int endIndex = (int) Math.floorDiv(timestampForDiv + durationAsMs, averagePeriodHint); + System.out.println("endIndex = " + endIndex); + + if (startIndex == endIndex) { + final long previousTimestamp = startIndex == 0 ? initialOffset : timestamps[startIndex - 1]; + final long slotDuration = timestamps[startIndex] - previousTimestamp; + var ratio = (double) durationAsMs / slotDuration; + return new PartialCursor(startIndex, endIndex, ratio, -1); + } + + // get the index with the timestamp right after the one we're looking for since what we're interested in is the portion of the measure that gets recorded after the timestamp we want + long afterTimestamp = timestamps[startIndex]; + final long startOffset = afterTimestamp - timestamp; + double startRatio = 0; + if (startOffset > 0) { + startRatio = (double) startOffset / (afterTimestamp - timestamps[startIndex - 1]); + } + + // look for the index that records the first timestamp that's after the one we're looking for added to the duration + afterTimestamp = timestamps[endIndex]; + final long slotDuration = afterTimestamp - timestamps[endIndex - 1]; + final long endOffset = slotDuration - (afterTimestamp - timestamp - durationAsMs); + double endRatio = 0; + if (endOffset > 0) { + endRatio = (double) endOffset / slotDuration; + } + + return new PartialCursor(startIndex, endIndex, startRatio, endRatio); + } + + public record PartialCursor(int startIndex, int endIndex, double firstMeasureRatio, double lastMeasureRatio) { + + public static final PartialCursor empty = new PartialCursor(-1, -1, 0.0, 0.0); + + public double sum(double[] values) { + if (values == null || values.length == 0 || this == empty || values.length < startIndex + endIndex) { + return 0.0; + } + + if (startIndex == endIndex) { + return values[startIndex] * firstMeasureRatio; + } + + double sum = values[startIndex] * firstMeasureRatio; + for (int i = startIndex + 1; i < endIndex; i++) { + sum += values[i]; + } + sum += values[endIndex] * lastMeasureRatio; + + return sum; + } + + public double[] viewOf(double[] values) { + if (values == null || values.length == 0 || this == empty || values.length < startIndex + endIndex) { + return new double[0]; + } + + if (startIndex == endIndex) { + return new double[] { values[startIndex] * firstMeasureRatio }; + } + + final int len = endIndex - startIndex + 1; + final double[] view = new double[len]; + view[0] = values[startIndex] * firstMeasureRatio; + System.arraycopy(values, startIndex + 1, view, 1, len - 1 - 1); + view[len - 1] = values[endIndex] * lastMeasureRatio; + return view; + } + } +} diff --git a/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java b/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java index 298e63a..fb23065 100644 --- a/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java +++ b/measure/src/main/java/net/laprun/sustainability/power/measure/OngoingPowerMeasure.java @@ -139,4 +139,8 @@ public synchronized TimestampedMeasures getNthTimestampedMeasures(int n) { } return new TimestampedMeasures(timestamps[n], result); } + + public Cursor.PartialCursor getCursorOver(long timestamp, Duration duration) { + return Cursor.cursorOver(timestamps, timestamp, duration, startedAt, samplePeriod); + } } diff --git a/measure/src/test/java/net/laprun/sustainability/power/measure/CursorTest.java b/measure/src/test/java/net/laprun/sustainability/power/measure/CursorTest.java new file mode 100644 index 0000000..b55f3e1 --- /dev/null +++ b/measure/src/test/java/net/laprun/sustainability/power/measure/CursorTest.java @@ -0,0 +1,54 @@ +package net.laprun.sustainability.power.measure; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class CursorTest { + + @ParameterizedTest + @ValueSource(longs = { -1, 0, 100, 98, 105 }) + void cursorOverSimple(long periodHint) { + final var timestamps = new long[] { 100, 200, 300, 400, 500, 600, 700, 800, 900 }; + + final var cursor = Cursor.cursorOver(timestamps, 225, Duration.ofMillis(540 - 225), 0, + periodHint); + + assertEquals(2, cursor.startIndex()); + assertEquals(5, cursor.endIndex()); + assertEquals(0.75, cursor.firstMeasureRatio(), 0.0001); + assertEquals(0.4, cursor.lastMeasureRatio(), 0.0001); + + final var measures = new double[] { 100, 200, 300, 400, 500, 600, 700, 800, 900 }; + assertEquals(300 * 0.75 + 400 + 500 + 0.4 * 600, cursor.sum(measures), 0.0001); + final double[] view = cursor.viewOf(measures); + assertEquals(4, view.length); + assertEquals(300 * 0.75, view[0], 0.0001); + assertEquals(400, view[1], 0.0001); + assertEquals(500, view[2], 0.0001); + assertEquals(600 * 0.4, view[3], 0.0001); + } + + @Test + void cursorOverOneMeasure() { + final var timestamps = new long[] { 100, 200, 300, 400, 500, 600, 700, 800, 900 }; + + final var cursor = Cursor.cursorOver(timestamps, 1, Duration.ofMillis(10), 0, + 100); + + assertEquals(0, cursor.startIndex()); + assertEquals(0, cursor.endIndex()); + assertEquals(0.1, cursor.firstMeasureRatio(), 0.0001); + assertEquals(-1, cursor.lastMeasureRatio(), 0.0001); + + final var measures = new double[] { 100, 200, 300, 400, 500, 600, 700, 800, 900 }; + assertEquals(100 * 0.1, cursor.sum(measures), 0.0001); + final double[] view = cursor.viewOf(measures); + assertEquals(1, view.length); + assertEquals(100 * 0.1, view[0], 0.0001); + } +}