diff --git a/analysis/pom.xml b/analysis/pom.xml index 1353595..3cc99c5 100644 --- a/analysis/pom.xml +++ b/analysis/pom.xml @@ -5,7 +5,7 @@ net.laprun.sustainability power-server-parent - 0.2.4-SNAPSHOT + 0.3.0-SNAPSHOT power-server-analysis diff --git a/if-manifest-export/pom.xml b/if-manifest-export/pom.xml index c7a2392..b0c2056 100644 --- a/if-manifest-export/pom.xml +++ b/if-manifest-export/pom.xml @@ -5,7 +5,7 @@ net.laprun.sustainability power-server-parent - 0.2.4-SNAPSHOT + 0.3.0-SNAPSHOT power-server-if-manifest-export diff --git a/measure/pom.xml b/measure/pom.xml index 00ec07b..4cdb5de 100644 --- a/measure/pom.xml +++ b/measure/pom.xml @@ -5,7 +5,7 @@ net.laprun.sustainability power-server-parent - 0.2.4-SNAPSHOT + 0.3.0-SNAPSHOT power-server-measure diff --git a/metadata/pom.xml b/metadata/pom.xml index 6219961..05579a2 100644 --- a/metadata/pom.xml +++ b/metadata/pom.xml @@ -5,7 +5,7 @@ net.laprun.sustainability power-server-parent - 0.2.4-SNAPSHOT + 0.3.0-SNAPSHOT power-server-metadata diff --git a/metadata/src/main/java/net/laprun/sustainability/power/SensorMeasure.java b/metadata/src/main/java/net/laprun/sustainability/power/SensorMeasure.java index c1a01e7..c86059c 100644 --- a/metadata/src/main/java/net/laprun/sustainability/power/SensorMeasure.java +++ b/metadata/src/main/java/net/laprun/sustainability/power/SensorMeasure.java @@ -6,11 +6,12 @@ * with the sensor. * * @param components an array recording the power consumption reported by each component of this sensor - * @param tick the ordinal tick associated with this measure + * @param startMs the start timestamp in milliseconds for this measure + * @param endMs the end timestamp in milliseconds for this measure */ -public record SensorMeasure(double[] components, long tick, long timestamp, long duration) { +public record SensorMeasure(double[] components, long startMs, long endMs) { /** * Represents an invalid or somehow missed measure. */ - public static final SensorMeasure missing = new SensorMeasure(new double[] { -1.0 }, -1, -1, -1); + public static final SensorMeasure missing = new SensorMeasure(new double[] { -1.0 }, -1, -1); } diff --git a/pom.xml b/pom.xml index 50a09d2..f2298cf 100644 --- a/pom.xml +++ b/pom.xml @@ -1,15 +1,14 @@ - + 4.0.0 net.laprun.sustainability power-server-parent - 0.2.4-SNAPSHOT + 0.3.0-SNAPSHOT pom power-server : parent An application allowing to retrieve power consumption and associated metadata on a per-process basis, - via a RESTful endpoint - + via a RESTful endpoint https://github.com/metacosm/power-server @@ -184,6 +183,14 @@ true + true + true + true + true + false + true + true + false UNICODE true true @@ -212,10 +219,8 @@ - ${project.build.directory}/${project.build.finalName}-runner - - org.jboss.logmanager.LogManager - + ${project.build.directory}/${project.build.finalName}-runner + org.jboss.logmanager.LogManager ${maven.home} @@ -326,12 +331,8 @@ - - {{artifactsDir}}/{{distributionName}}-{{projectVersion}}-linux-x86_64.tar.gz - - - artifacts/{{distributionName}}-{{projectEffectiveVersion}}-linux-x86_64.tar.gz - + {{artifactsDir}}/{{distributionName}}-{{projectVersion}}-linux-x86_64.tar.gz + artifacts/{{distributionName}}-{{projectEffectiveVersion}}-linux-x86_64.tar.gz linux-x86_64 - - {{artifactsDir}}/{{distributionName}}-{{projectVersion}}-osx-x86_64.tar.gz - - - artifacts/{{distributionName}}-{{projectEffectiveVersion}}-osx-x86_64.tar.gz - + {{artifactsDir}}/{{distributionName}}-{{projectVersion}}-osx-x86_64.tar.gz + artifacts/{{distributionName}}-{{projectEffectiveVersion}}-osx-x86_64.tar.gz osx-x86_64 - - {{artifactsDir}}/{{distributionName}}-{{projectVersion}}-osx-aarch_64.tar.gz - - - artifacts/{{distributionName}}-{{projectEffectiveVersion}}-osx-aarch_64.tar.gz - + {{artifactsDir}}/{{distributionName}}-{{projectVersion}}-osx-aarch_64.tar.gz + artifacts/{{distributionName}}-{{projectEffectiveVersion}}-osx-aarch_64.tar.gz osx-aarch_64 @@ -396,12 +389,8 @@ true - - true - - - true - + true + true ${project.url} @@ -438,12 +427,8 @@ true - - true - - - true - + true + true ${project.url} diff --git a/server/pom.xml b/server/pom.xml index 3195226..e48eeeb 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -4,7 +4,7 @@ net.laprun.sustainability power-server-parent - 0.2.4-SNAPSHOT + 0.3.0-SNAPSHOT power-server power-server : server @@ -32,6 +32,19 @@ quarkus-rest-client test + + io.quarkus + quarkus-hibernate-orm-panache + + + io.quarkus + quarkus-scheduler + + + io.quarkiverse.jdbc + quarkus-jdbc-sqlite4j + 0.0.5 + diff --git a/server/src/main/java/net/laprun/sustainability/power/PowerMeasurer.java b/server/src/main/java/net/laprun/sustainability/power/PowerMeasurer.java index 4e2ed6c..32acad6 100644 --- a/server/src/main/java/net/laprun/sustainability/power/PowerMeasurer.java +++ b/server/src/main/java/net/laprun/sustainability/power/PowerMeasurer.java @@ -9,8 +9,10 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.infrastructure.Infrastructure; +import net.laprun.sustainability.power.persistence.Persistence; import net.laprun.sustainability.power.sensors.Measures; import net.laprun.sustainability.power.sensors.PowerSensor; +import net.laprun.sustainability.power.sensors.RegisteredPID; @ApplicationScoped public class PowerMeasurer { @@ -24,7 +26,16 @@ public class PowerMeasurer { private Multi periodicSensorCheck; - public Multi startTracking(String pid) throws Exception { + public Multi stream(String pid) throws Exception { + final var registeredPID = track(pid); + return periodicSensorCheck.map(measures -> measures.getOrDefault(registeredPID)); + } + + public void startTrackingApp(String appName, String pid) throws Exception { + stream(pid).subscribe().with(m -> Persistence.save(m, appName)); + } + + private RegisteredPID track(String pid) throws Exception { // first make sure that the process with that pid exists final var parsedPID = validPIDOrFail(pid); @@ -41,9 +52,8 @@ public Multi startTracking(String pid) throws Exception { final var registeredPID = sensor.register(parsedPID); // todo: the timing of things could make it so that the pid has been removed before the map operation occurs so // currently return -1 instead of null but this needs to be properly addressed - return periodicSensorCheck - .map(measures -> measures.getOrDefault(registeredPID)) - .onCancellation().invoke(() -> sensor.unregister(registeredPID)); + periodicSensorCheck = periodicSensorCheck.onCancellation().invoke(() -> sensor.unregister(registeredPID)); + return registeredPID; } protected long validPIDOrFail(String pid) { diff --git a/server/src/main/java/net/laprun/sustainability/power/PowerResource.java b/server/src/main/java/net/laprun/sustainability/power/PowerResource.java index 0c0d2cc..b768327 100644 --- a/server/src/main/java/net/laprun/sustainability/power/PowerResource.java +++ b/server/src/main/java/net/laprun/sustainability/power/PowerResource.java @@ -1,10 +1,15 @@ package net.laprun.sustainability.power; import java.time.Duration; +import java.util.List; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; -import jakarta.ws.rs.*; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; import jakarta.ws.rs.core.MediaType; import org.jboss.resteasy.reactive.RestStreamElementType; @@ -12,6 +17,7 @@ import io.quarkus.logging.Log; import io.quarkus.runtime.StartupEvent; import io.smallrye.mutiny.Multi; +import net.laprun.sustainability.power.persistence.Measure; @Path("/power") public class PowerResource { @@ -25,10 +31,20 @@ public void onStartup(@Observes StartupEvent event) { @GET @RestStreamElementType(MediaType.APPLICATION_JSON) - @Path("{pid}") - public Multi powerFor(@PathParam("pid") String pid) throws Exception { + @Path("stream/{pid}") + public Multi streamMeasuresFor(@PathParam("pid") String pid) throws Exception { try { - return measurer.startTracking(pid); + return measurer.stream(pid); + } catch (IllegalArgumentException e) { + throw new NotFoundException("Unknown process: " + pid); + } + } + + @POST + @Path("start/{appName}/{pid}") + public void startMeasure(@PathParam("appName") String appName, @PathParam("pid") String pid) throws Exception { + try { + measurer.startTrackingApp(appName, pid); } catch (IllegalArgumentException e) { throw new NotFoundException("Unknown process: " + pid); } @@ -45,4 +61,10 @@ public SensorMetadata metadata() { public Duration samplingPeriod() { return measurer.getSamplingPeriod(); } + + @GET + @Path("measures/{appName}") + public List measures(@PathParam("appName") String appName) throws Exception { + return Measure.forApplication(appName).stream().map(Measure::asSensorMeasure).toList(); + } } diff --git a/server/src/main/java/net/laprun/sustainability/power/persistence/Measure.java b/server/src/main/java/net/laprun/sustainability/power/persistence/Measure.java new file mode 100644 index 0000000..3e1792f --- /dev/null +++ b/server/src/main/java/net/laprun/sustainability/power/persistence/Measure.java @@ -0,0 +1,28 @@ +package net.laprun.sustainability.power.persistence; + +import java.util.List; + +import jakarta.persistence.Entity; + +import io.quarkus.hibernate.orm.panache.PanacheEntity; +import net.laprun.sustainability.power.SensorMeasure; + +@Entity +public class Measure extends PanacheEntity { + public String appName; + public long startTime; + public long endTime; + public double[] components; + + public static List forApplication(String appName) { + return find("appName", appName).list(); + } + + public static List all() { + return Measure.findAll().list(); + } + + public SensorMeasure asSensorMeasure() { + return new SensorMeasure(components, startTime, endTime); + } +} diff --git a/server/src/main/java/net/laprun/sustainability/power/persistence/Persistence.java b/server/src/main/java/net/laprun/sustainability/power/persistence/Persistence.java new file mode 100644 index 0000000..625eaeb --- /dev/null +++ b/server/src/main/java/net/laprun/sustainability/power/persistence/Persistence.java @@ -0,0 +1,20 @@ +package net.laprun.sustainability.power.persistence; + +import jakarta.transaction.Transactional; + +import net.laprun.sustainability.power.SensorMeasure; + +public enum Persistence { + ; + + @Transactional + public static Measure save(SensorMeasure measure, String appName) { + final var persisted = new Measure(); + persisted.components = measure.components(); + persisted.appName = appName; + persisted.startTime = measure.startMs(); + persisted.endTime = measure.endMs(); + persisted.persist(); + return persisted; + } +} diff --git a/server/src/main/java/net/laprun/sustainability/power/persistence/SQLiteFilePersister.java b/server/src/main/java/net/laprun/sustainability/power/persistence/SQLiteFilePersister.java new file mode 100644 index 0000000..3940ef9 --- /dev/null +++ b/server/src/main/java/net/laprun/sustainability/power/persistence/SQLiteFilePersister.java @@ -0,0 +1,80 @@ +package net.laprun.sustainability.power.persistence; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.sql.DataSource; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkus.logging.Log; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.scheduler.Scheduled; + +@Singleton +public class SQLiteFilePersister { + private final AtomicBoolean executing = new AtomicBoolean(false); + @ConfigProperty(name = "quarkus.datasource.jdbc.url") + String jdbcUrl; + @Inject + DataSource dataSource; + private Path dbFile; + private Path backupDBFile; + + @PostConstruct + void init() { + int prefixLength = "jdbc:sqlite:".length(); + int queryParamsIdx = jdbcUrl.indexOf('?'); + int length = (queryParamsIdx != -1) ? queryParamsIdx : jdbcUrl.length(); + var dbFileName = jdbcUrl.substring(prefixLength, length); + dbFile = Paths.get(dbFileName); + backupDBFile = dbFile.toAbsolutePath().getParent().resolve(dbFile.getFileName() + "_backup"); + } + + // Periodical backup + @Scheduled(delay = 5, delayUnit = TimeUnit.MINUTES, every = "${power-server.db.backup.period}") + void scheduled() { + backup(); + } + + // Execute a backup during shutdown + public void onShutdown(@Observes ShutdownEvent event) { + backup(); + } + + void backup() { + if (executing.compareAndSet(false, true)) { + try { + Log.trace("Starting DB backup for file: " + dbFile); + try (var conn = dataSource.getConnection(); + var stmt = conn.createStatement()) { + // Execute the backup + stmt.executeUpdate("backup to " + backupDBFile); + // Atomically substitute the DB file with its backup + Files.move(backupDBFile, dbFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (SQLException e) { + throw new RuntimeException("Failed to backup the database", e); + } catch (IOException e) { + throw new RuntimeException("Failed to create backup files or folders", e); + } + Log.info("Backup of " + dbFile + " completed"); + } finally { + executing.set(false); + } + } else { + Log.trace("Skipping backup as one is already in progress"); + } + } + +} diff --git a/server/src/main/java/net/laprun/sustainability/power/sensors/MapMeasures.java b/server/src/main/java/net/laprun/sustainability/power/sensors/MapMeasures.java index added16..4c1fbc9 100644 --- a/server/src/main/java/net/laprun/sustainability/power/sensors/MapMeasures.java +++ b/server/src/main/java/net/laprun/sustainability/power/sensors/MapMeasures.java @@ -27,7 +27,7 @@ public Set trackedPIDs() { } @Override - public int numberOfTrackerPIDs() { + public int numberOfTrackedPIDs() { return measures.size(); } diff --git a/server/src/main/java/net/laprun/sustainability/power/sensors/Measures.java b/server/src/main/java/net/laprun/sustainability/power/sensors/Measures.java index 7c830bd..e33ffde 100644 --- a/server/src/main/java/net/laprun/sustainability/power/sensors/Measures.java +++ b/server/src/main/java/net/laprun/sustainability/power/sensors/Measures.java @@ -39,7 +39,7 @@ public interface Measures { * * @return the number of tracked processes */ - int numberOfTrackerPIDs(); + int numberOfTrackedPIDs(); /** * Records the specified measure and associates it to the specified tracked process, normally called once per tick diff --git a/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java b/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java index 3d61cd7..a436044 100644 --- a/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java +++ b/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java @@ -141,8 +141,7 @@ public Measures update(Long tick) { measure[i] = newComponentValue; lastMeasuredSensorValues[i] = newComponentValue; } - final long timestamp = System.currentTimeMillis(); - measures.singleMeasure(new SensorMeasure(measure, tick, timestamp, timestamp - start)); + measures.singleMeasure(new SensorMeasure(measure, start, System.currentTimeMillis())); return measures; } } diff --git a/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/SingleMeasureMeasures.java b/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/SingleMeasureMeasures.java index a69c08b..ddbad79 100644 --- a/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/SingleMeasureMeasures.java +++ b/server/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/SingleMeasureMeasures.java @@ -33,7 +33,7 @@ public Set trackedPIDs() { } @Override - public int numberOfTrackerPIDs() { + public int numberOfTrackedPIDs() { return trackedPIDs.size(); } diff --git a/server/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java b/server/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java index d938030..dc7c625 100644 --- a/server/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java +++ b/server/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java @@ -130,7 +130,7 @@ Measures extractPowerMeasure(InputStream powerMeasureInput, Long tick) { // copy the pids so that we can remove them as soon as we've processed them final var pidsToProcess = new HashSet<>(measures.trackedPIDs()); // start measure - final var pidMeasures = new HashMap(measures.numberOfTrackerPIDs()); + final var pidMeasures = new HashMap(measures.numberOfTrackedPIDs()); final var metadata = cpu.metadata(); final var powerComponents = new HashMap(metadata.componentCardinality()); while ((line = input.readLine()) != null) { @@ -197,8 +197,7 @@ Measures extractPowerMeasure(InputStream powerMeasureInput, Long tick) { } }); - final long timestamp = System.currentTimeMillis(); - measures.record(pid, new SensorMeasure(measure, tick, timestamp, timestamp - start)); + measures.record(pid, new SensorMeasure(measure, start, System.currentTimeMillis())); }); } catch (Exception exception) { throw new RuntimeException(exception); @@ -217,7 +216,7 @@ public Measures update(Long tick) { public void unregister(RegisteredPID registeredPID) { super.unregister(registeredPID); // if we're not tracking any processes anymore, stop powermetrics as well - if (measures.numberOfTrackerPIDs() == 0) { + if (measures.numberOfTrackedPIDs() == 0) { stop(); } } diff --git a/server/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java b/server/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java index 8219f1a..30c6749 100644 --- a/server/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java +++ b/server/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java @@ -48,7 +48,7 @@ public void start(long samplingFrequencyInMillis) { @Override public Measures update(Long tick) { measures.trackedPIDs().forEach(pid -> measures.record(pid, - new SensorMeasure(new double[] { Math.random() }, tick, System.currentTimeMillis(), 0))); + new SensorMeasure(new double[] { Math.random() }, System.currentTimeMillis(), System.currentTimeMillis()))); return measures; } } diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 505f341..bd45d4c 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -1 +1,9 @@ -quarkus.http.port=20432 \ No newline at end of file +quarkus.http.port=20432 +quarkus.datasource.jdbc.url=jdbc:sqlite:power-server.db +quarkus.datasource.db-kind=sqlite +quarkus.datasource.jdbc.min-size=1 + +power-server.db.backup.period=5m + +# Only use this property once to create the initial database +# quarkus.hibernate-orm.database.generation=create diff --git a/server/src/test/java/net/laprun/sustainability/power/CIPowerResourceTest.java b/server/src/test/java/net/laprun/sustainability/power/CIPowerResourceTest.java index 949d520..ac3e42b 100644 --- a/server/src/test/java/net/laprun/sustainability/power/CIPowerResourceTest.java +++ b/server/src/test/java/net/laprun/sustainability/power/CIPowerResourceTest.java @@ -1,5 +1,9 @@ package net.laprun.sustainability.power; +import static io.restassured.RestAssured.given; + +import org.junit.jupiter.api.Test; + import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; @@ -15,4 +19,13 @@ protected long getPid() { public void testLinuxMetadataEndpoint() { // overridden to disable as with the profile activation, the MockPowerSensor implementation is picked up, which is a macOS-specific implementation } + + @Test + public void testDBBasedEndpoint() { + final var pid = getPid(); + given() + .when().post("/power/start/cipowerresourcetest/" + pid) + .then() + .statusCode(204); + } } diff --git a/server/src/test/java/net/laprun/sustainability/power/PowerResourceTest.java b/server/src/test/java/net/laprun/sustainability/power/PowerResourceTest.java index a0026f8..1642b0d 100644 --- a/server/src/test/java/net/laprun/sustainability/power/PowerResourceTest.java +++ b/server/src/test/java/net/laprun/sustainability/power/PowerResourceTest.java @@ -5,8 +5,17 @@ import java.net.URI; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.sse.SseEventSource; + +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.condition.EnabledOnOs; @@ -24,7 +33,24 @@ public class PowerResourceTest { @Test public void testPowerEndpoint() throws Exception { final var pid = getPid(); - StreamChecker.checkPowerForPID(uri, pid); + try (final var client = ClientBuilder.newClient(); + final var eventSource = SseEventSource + .target(client.target(uri) + .path("power/stream/{pid}") + .resolveTemplate("pid", pid)) + .build()) { + CompletableFuture> res = new CompletableFuture<>(); + List collect = Collections.synchronizedList(new ArrayList<>()); + eventSource.register(inboundSseEvent -> { + collect.add(inboundSseEvent.readData()); + // stop after one event + eventSource.close(); + }, + res::completeExceptionally, + () -> res.complete(collect)); + eventSource.open(); + Assertions.assertThat(res.get(5, TimeUnit.SECONDS)).hasSize(1); + } } @Test @@ -104,4 +130,13 @@ public void testLinuxMetadataEndpoint() { assertTrue(metadata.documentation().contains("RAPL")); } + @Test + public void testDBBackedEndpoint() { + final var pid = getPid(); + given() + .when().post("/power/start/powerresourcetest/" + pid) + .then() + .statusCode(204); + } + } diff --git a/server/src/test/java/net/laprun/sustainability/power/StreamChecker.java b/server/src/test/java/net/laprun/sustainability/power/StreamChecker.java deleted file mode 100644 index 9b4e52e..0000000 --- a/server/src/test/java/net/laprun/sustainability/power/StreamChecker.java +++ /dev/null @@ -1,35 +0,0 @@ -package net.laprun.sustainability.power; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import jakarta.ws.rs.client.ClientBuilder; -import jakarta.ws.rs.sse.SseEventSource; - -import org.assertj.core.api.Assertions; - -public class StreamChecker { - - static void checkPowerForPID(URI uri, long pid) throws Exception { - final var powerForPid = ClientBuilder.newClient().target(uri.resolve("power")) - .path("{pid}").resolveTemplate("pid", pid); - - try (final var eventSource = SseEventSource.target(powerForPid).build()) { - CompletableFuture> res = new CompletableFuture<>(); - List collect = Collections.synchronizedList(new ArrayList<>()); - eventSource.register(inboundSseEvent -> { - collect.add(inboundSseEvent.readData()); - // stop after one event - eventSource.close(); - }, - res::completeExceptionally, - () -> res.complete(collect)); - eventSource.open(); - Assertions.assertThat(res.get(5, TimeUnit.SECONDS)).hasSize(1); - } - } -}