Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<version>3.14.0</version>
<configuration>
<source>17</source>
<target>17</target>
Expand Down Expand Up @@ -167,7 +167,7 @@
'core.autocrlf' to true -->
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.44.2</version>
<version>2.44.3</version>
<configuration>
<java>
<includes>
Expand Down Expand Up @@ -247,7 +247,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.114.Final</version>
<version>4.1.115.Final</version>
</dependency>
<!-- Added because of conflicts -->
<dependency>
Expand All @@ -259,7 +259,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.4</version>
<version>3.25.5</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
Expand All @@ -274,12 +274,12 @@
<dependency>
<groupId>io.deephaven</groupId>
<artifactId>deephaven-java-client-barrage-dagger</artifactId>
<version>0.37.6</version>
<version>0.38.0</version>
</dependency>
<dependency>
<groupId>io.deephaven</groupId>
<artifactId>deephaven-log-to-slf4j</artifactId>
<version>0.37.6</version>
<version>0.38.0</version>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/deephaven/benchmark/api/Bench.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.api;

import java.io.Closeable;
Expand Down Expand Up @@ -69,13 +69,14 @@ static public Bench create(Object testInst) {
final BenchLog runLog;
final List<Future<Metrics>> futures = new ArrayList<>();
final List<Closeable> closeables = new ArrayList<>();
final Session session = new Session();
private boolean isClosed = false;

Bench(Class<?> testInst) {
this.testInst = testInst;
this.result = new BenchResult(outputDir);
this.metrics = new BenchMetrics(outputDir);
this.platform = new BenchPlatform(outputDir);
this.platform = new BenchPlatform(this, outputDir);
this.queryLog = new QueryLog(outputDir, testInst);
this.runLog = new BenchLog(outputDir, testInst);
}
Expand Down Expand Up @@ -245,6 +246,7 @@ public void close() {
platform.commit();
runLog.close();
queryLog.close();
session.close();
}

Metrics awaitCompletion(Future<Metrics> future) {
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/io/deephaven/benchmark/api/BenchPlatform.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.api;

import java.io.BufferedWriter;
Expand All @@ -23,6 +23,7 @@
public class BenchPlatform {
static final Map<String, Property> properties = new LinkedHashMap<>();
static boolean hasBeenCommitted = false;
final Bench api;
final Path platformFile;
final Properties profileProps;

Expand All @@ -32,8 +33,8 @@ public class BenchPlatform {
*
* @param parent the parent directory of the platform file
*/
BenchPlatform(Path parent) {
this(parent, Bench.platformFileName, Bench.profile.getProperties());
BenchPlatform(Bench api, Path parent) {
this(api, parent, Bench.platformFileName, Bench.profile.getProperties());
}

/**
Expand All @@ -42,7 +43,8 @@ public class BenchPlatform {
* @param parent the parent directory of the platform file
* @param platformFileName the name the file to store platform properties
*/
BenchPlatform(Path parent, String platformFileName, Properties profileProps) {
BenchPlatform(Bench api, Path parent, String platformFileName, Properties profileProps) {
this.api = api;
this.platformFile = parent.resolve(platformFileName);
this.profileProps = profileProps;
}
Expand Down Expand Up @@ -87,14 +89,10 @@ void commit() {
* @return a cached result table containing properties
*/
protected ResultTable fetchResult(String query) {
Bench api = new Bench(Bench.class);
api.setName("# Write Platform Details"); // # means skip adding to results file

var tbl = new AtomicReference<ResultTable>();
api.query(query).fetchAfter("bench_api_platform", table -> {
tbl.set(table);
}).execute();
api.close();

return tbl.get();
}
Expand Down
40 changes: 24 additions & 16 deletions src/main/java/io/deephaven/benchmark/api/BenchQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import io.deephaven.benchmark.connect.Connector;
import io.deephaven.benchmark.connect.ConnectorFactory;
import io.deephaven.benchmark.connect.ResultTable;
import io.deephaven.benchmark.metric.Metrics;
Expand All @@ -26,12 +25,13 @@ final public class BenchQuery implements Closeable {
final Map<String, Consumer<ResultTable>> snapshotFetchers = new LinkedHashMap<>();
final Map<String, Function<ResultTable, Boolean>> tickingFetchers = new LinkedHashMap<>();
final Properties props = new Properties();
private Connector session = null;
final Session session;

BenchQuery(Bench bench, String logic, QueryLog queryLog) {
this.bench = bench;
this.logic = logic;
this.queryLog = queryLog;
this.session = bench.session;
}

/**
Expand All @@ -40,7 +40,7 @@ final public class BenchQuery implements Closeable {
*
* @param table a table name present in the query logic
* @param tableHandler a consumer that receives a non-live snapshot of the table
* @return a query configuration instance
* @return this bench query instance
*/
public BenchQuery fetchAfter(String table, Consumer<ResultTable> tableHandler) {
snapshotFetchers.put(table, tableHandler);
Expand All @@ -53,7 +53,7 @@ public BenchQuery fetchAfter(String table, Consumer<ResultTable> tableHandler) {
*
* @param table a table name present in the query logic
* @param tableHandler a function that receives non-live snapshot of the table
* @return a query configuration instance
* @return this bench query instance
*/
public BenchQuery fetchDuring(String table, Function<ResultTable, Boolean> tableHandler) {
tickingFetchers.put(table, tableHandler);
Expand All @@ -62,8 +62,15 @@ public BenchQuery fetchDuring(String table, Function<ResultTable, Boolean> table

/**
* Add properties to be passed to the <code>Connector</code> used in the query
*
* @param name the name of the property
* @param value the value of the property
* @return this bench query instance
*/
public BenchQuery withProperty(String name, String value) {
if (session.getConnector() != null) {
throw new RuntimeException("Cannot set properties after first query is executed");
}
if (value != null && !value.isBlank()) {
props.setProperty(name, value);
}
Expand All @@ -76,11 +83,12 @@ public BenchQuery withProperty(String name, String value) {
public void execute() {
var timer = Timer.start();
executeBarrageQuery(logic);
tickingFetchers.entrySet().forEach(e -> bench.addFuture(session.fetchTickingData(e.getKey(), e.getValue())));
var conn = session.getConnector();
tickingFetchers.entrySet().forEach(e -> bench.addFuture(conn.fetchTickingData(e.getKey(), e.getValue())));

snapshotFetchers.entrySet().forEach(e -> {
try {
Future<Metrics> f = session.fetchSnapshotData(e.getKey(), e.getValue());
Future<Metrics> f = conn.fetchSnapshotData(e.getKey(), e.getValue());
Metrics metrics = f.get();
metrics.set("duration.secs", timer.duration().toMillis() / 1000.0);
bench.addFuture(f);
Expand All @@ -94,33 +102,33 @@ public void execute() {
* Unsubscribe any fetchers, free used variables, and close the session
*/
public void close() {
if (session == null)
var conn = session.getConnector();
if (conn == null)
return;

if (!session.getUsedVariableNames().isEmpty()) {
String logic = String.join("=None; ", session.getUsedVariableNames()) + "=None\n";
if (!conn.getUsedVariableNames().isEmpty()) {
String logic = String.join("=None; ", conn.getUsedVariableNames()) + "=None\n";
executeBarrageQuery(logic);
}

session.close();
session = null;
}

// Add function defs in separate query so if there are errors in the "logic" part, the line numbers match up
private void executeBarrageQuery(String logic) {
if (session == null) {
var conn = session.getConnector();
if (conn == null) {
var connectorClass = bench.property("connector.class", "io.deephaven.benchmark.connect.BarrageConnector");
var localProps = Bench.profile.getProperties();
localProps.putAll(props);
session = ConnectorFactory.create(connectorClass, localProps);
conn = ConnectorFactory.create(connectorClass, localProps);
session.setConnector(conn);
}
String snippetsLogic = Bench.profile.replaceProperties(Snippets.getFunctions(logic));
if (!snippetsLogic.isBlank()) {
queryLog.logQuery(snippetsLogic);
session.executeQuery(snippetsLogic);
conn.executeQuery(snippetsLogic);
}
String userLogic = Bench.profile.replaceProperties(logic);
session.executeQuery(userLogic);
conn.executeQuery(userLogic);
queryLog.logQuery(userLogic);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/deephaven/benchmark/api/BenchTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public boolean generateParquet() {
String q = replaceTableAndGeneratorFields(useExistingParquetQuery);

AtomicBoolean usedExistingParquet = new AtomicBoolean(false);
bench.query(q).fetchAfter("result", table -> {
bench.query(q).fetchAfter("used_existing_parquet_" + tableName, table -> {
usedExistingParquet.set(table.getValue(0, "UsedExistingParquet").toString().equalsIgnoreCase("true"));
}).execute();

Expand Down Expand Up @@ -346,7 +346,7 @@ with open(path) as f:
os.link(str(matching_gen_parquet) + '.gen.parquet', table_parquet)
usedExisting = True

result = new_table([string_col("UsedExistingParquet", [str(usedExisting)])])
used_existing_parquet_${table.name} = new_table([string_col("UsedExistingParquet", [str(usedExisting)])])
""";

static final String kafkaToParquetQuery = """
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/deephaven/benchmark/api/Session.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.api;

import io.deephaven.benchmark.connect.Connector;

/**
* Contains session information that is shared between queries but is managed by <code>BenchQuery</code>. Each
* <code>Bench</code> instance allows only one reusable Session. If multiple queries are executed from the same
* <code>Bench</code> instance, the queries will be executed against the same session.
*/
class Session {
private Connector connector = null;

void setConnector(Connector connector) {
this.connector = connector;
}

Connector getConnector() {
return connector;
}

void close() {
if (connector != null) {
connector.close();
connector = null;
}
}

}
43 changes: 31 additions & 12 deletions src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.arrow.memory.RootAllocator;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.metric.MetricsFuture;
import io.deephaven.benchmark.util.Log;
import io.deephaven.client.impl.*;
import io.deephaven.client.impl.script.Changes;
import io.deephaven.engine.context.ExecutionContext;
Expand Down Expand Up @@ -98,7 +99,7 @@ public Set<String> getUsedVariableNames() {
}

/**
* Fetch the rows of a table create or modified by this session's queries
* Fetch the rows of a table created or modified by this session's queries
*
* @param table the name of the table to fetch data from
* @param tableHandler a consumer used to process the result table
Expand All @@ -108,6 +109,10 @@ public Future<Metrics> fetchSnapshotData(String table, Consumer<ResultTable> tab
checkClosed();
Metrics metrics = new Metrics("test-runner", "session." + table);
MetricsFuture future = new MetricsFuture(metrics);

if (snapshots.containsKey(table))
throw new RuntimeException("Cannot subscribe twice to the same table: " + table);

snapshots.computeIfAbsent(table, s -> {
try {
BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder().build();
Expand Down Expand Up @@ -141,6 +146,10 @@ public Future<Metrics> fetchTickingData(String table, Function<ResultTable, Bool
checkClosed();
Metrics metrics = new Metrics("test-runner", "session." + table);
MetricsFuture future = new MetricsFuture(metrics);

if (subscriptions.containsKey(table))
throw new RuntimeException("Cannot subscribe twice to the same table: " + table);

subscriptions.computeIfAbsent(table, s -> {
try {
BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder().build();
Expand Down Expand Up @@ -169,17 +178,17 @@ public Future<Metrics> fetchTickingData(String table, Function<ResultTable, Bool
* the session on the server.
*/
public void close() {
try {
if (isClosed.get())
return;
isClosed.set(true);
subscriptions.values().forEach(s -> {
s.handle.close();
});
subscriptions.clear();
variableNames.clear();
} catch (Exception ex) {
}
if (isClosed.get())
return;
isClosed.set(true);
subscriptions.keySet().forEach(t -> {
closeSubscription(t);
});
snapshots.keySet().forEach(t -> {
closeSubscription(t);
});
variableNames.clear();

try {
console.close();
} catch (Exception ex) {
Expand Down Expand Up @@ -207,6 +216,16 @@ private void checkClosed() {
throw new RuntimeException("Session is closed");
}

private void closeSubscription(String tableName) {
try {
var subscription = subscriptions.remove(tableName);
if (subscription != null)
subscription.handle.close();
} catch (Exception ex) {
Log.info("Failed to close handle for subscription: %s", tableName);
}
}

private FieldInfo findTable(String table) {
Optional<FieldInfo> found =
changes.changes().created().stream().filter(f -> f.name().equals(table)).findFirst();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.api;

import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -56,7 +56,7 @@ void nomalize() {

static class LocalPlatform extends BenchPlatform {
LocalPlatform(Path dir, String fileName) {
super(dir, fileName, getProfileProps());
super(null, dir, fileName, getProfileProps());
}

static Properties getProfileProps() {
Expand Down
Loading