Skip to content

Commit 9c533d8

Browse files
authored
Improved session handling and platform details. Upgraded to 0.38.0 (#398)
1 parent 1c32832 commit 9c533d8

File tree

8 files changed

+104
-48
lines changed

8 files changed

+104
-48
lines changed

pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
</plugin>
7777
<plugin>
7878
<artifactId>maven-compiler-plugin</artifactId>
79-
<version>3.13.0</version>
79+
<version>3.14.0</version>
8080
<configuration>
8181
<source>17</source>
8282
<target>17</target>
@@ -167,7 +167,7 @@
167167
'core.autocrlf' to true -->
168168
<groupId>com.diffplug.spotless</groupId>
169169
<artifactId>spotless-maven-plugin</artifactId>
170-
<version>2.44.2</version>
170+
<version>2.44.3</version>
171171
<configuration>
172172
<java>
173173
<includes>
@@ -247,7 +247,7 @@
247247
<dependency>
248248
<groupId>io.netty</groupId>
249249
<artifactId>netty-all</artifactId>
250-
<version>4.1.114.Final</version>
250+
<version>4.1.115.Final</version>
251251
</dependency>
252252
<!-- Added because of conflicts -->
253253
<dependency>
@@ -259,7 +259,7 @@
259259
<dependency>
260260
<groupId>com.google.protobuf</groupId>
261261
<artifactId>protobuf-java</artifactId>
262-
<version>3.25.4</version>
262+
<version>3.25.5</version>
263263
</dependency>
264264
<dependency>
265265
<groupId>io.confluent</groupId>
@@ -274,12 +274,12 @@
274274
<dependency>
275275
<groupId>io.deephaven</groupId>
276276
<artifactId>deephaven-java-client-barrage-dagger</artifactId>
277-
<version>0.37.6</version>
277+
<version>0.38.0</version>
278278
</dependency>
279279
<dependency>
280280
<groupId>io.deephaven</groupId>
281281
<artifactId>deephaven-log-to-slf4j</artifactId>
282-
<version>0.37.6</version>
282+
<version>0.38.0</version>
283283
</dependency>
284284
<dependency>
285285
<groupId>org.junit.platform</groupId>

src/main/java/io/deephaven/benchmark/api/Bench.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
22
package io.deephaven.benchmark.api;
33

44
import java.io.Closeable;
@@ -69,13 +69,14 @@ static public Bench create(Object testInst) {
6969
final BenchLog runLog;
7070
final List<Future<Metrics>> futures = new ArrayList<>();
7171
final List<Closeable> closeables = new ArrayList<>();
72+
final Session session = new Session();
7273
private boolean isClosed = false;
7374

7475
Bench(Class<?> testInst) {
7576
this.testInst = testInst;
7677
this.result = new BenchResult(outputDir);
7778
this.metrics = new BenchMetrics(outputDir);
78-
this.platform = new BenchPlatform(outputDir);
79+
this.platform = new BenchPlatform(this, outputDir);
7980
this.queryLog = new QueryLog(outputDir, testInst);
8081
this.runLog = new BenchLog(outputDir, testInst);
8182
}
@@ -245,6 +246,7 @@ public void close() {
245246
platform.commit();
246247
runLog.close();
247248
queryLog.close();
249+
session.close();
248250
}
249251

250252
Metrics awaitCompletion(Future<Metrics> future) {

src/main/java/io/deephaven/benchmark/api/BenchPlatform.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
22
package io.deephaven.benchmark.api;
33

44
import java.io.BufferedWriter;
@@ -23,6 +23,7 @@
2323
public class BenchPlatform {
2424
static final Map<String, Property> properties = new LinkedHashMap<>();
2525
static boolean hasBeenCommitted = false;
26+
final Bench api;
2627
final Path platformFile;
2728
final Properties profileProps;
2829

@@ -32,8 +33,8 @@ public class BenchPlatform {
3233
*
3334
* @param parent the parent directory of the platform file
3435
*/
35-
BenchPlatform(Path parent) {
36-
this(parent, Bench.platformFileName, Bench.profile.getProperties());
36+
BenchPlatform(Bench api, Path parent) {
37+
this(api, parent, Bench.platformFileName, Bench.profile.getProperties());
3738
}
3839

3940
/**
@@ -42,7 +43,8 @@ public class BenchPlatform {
4243
* @param parent the parent directory of the platform file
4344
* @param platformFileName the name the file to store platform properties
4445
*/
45-
BenchPlatform(Path parent, String platformFileName, Properties profileProps) {
46+
BenchPlatform(Bench api, Path parent, String platformFileName, Properties profileProps) {
47+
this.api = api;
4648
this.platformFile = parent.resolve(platformFileName);
4749
this.profileProps = profileProps;
4850
}
@@ -87,14 +89,10 @@ void commit() {
8789
* @return a cached result table containing properties
8890
*/
8991
protected ResultTable fetchResult(String query) {
90-
Bench api = new Bench(Bench.class);
91-
api.setName("# Write Platform Details"); // # means skip adding to results file
92-
9392
var tbl = new AtomicReference<ResultTable>();
9493
api.query(query).fetchAfter("bench_api_platform", table -> {
9594
tbl.set(table);
9695
}).execute();
97-
api.close();
9896

9997
return tbl.get();
10098
}

src/main/java/io/deephaven/benchmark/api/BenchQuery.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.util.concurrent.Future;
99
import java.util.function.Consumer;
1010
import java.util.function.Function;
11-
import io.deephaven.benchmark.connect.Connector;
1211
import io.deephaven.benchmark.connect.ConnectorFactory;
1312
import io.deephaven.benchmark.connect.ResultTable;
1413
import io.deephaven.benchmark.metric.Metrics;
@@ -26,12 +25,13 @@ final public class BenchQuery implements Closeable {
2625
final Map<String, Consumer<ResultTable>> snapshotFetchers = new LinkedHashMap<>();
2726
final Map<String, Function<ResultTable, Boolean>> tickingFetchers = new LinkedHashMap<>();
2827
final Properties props = new Properties();
29-
private Connector session = null;
28+
final Session session;
3029

3130
BenchQuery(Bench bench, String logic, QueryLog queryLog) {
3231
this.bench = bench;
3332
this.logic = logic;
3433
this.queryLog = queryLog;
34+
this.session = bench.session;
3535
}
3636

3737
/**
@@ -40,7 +40,7 @@ final public class BenchQuery implements Closeable {
4040
*
4141
* @param table a table name present in the query logic
4242
* @param tableHandler a consumer that receives a non-live snapshot of the table
43-
* @return a query configuration instance
43+
* @return this bench query instance
4444
*/
4545
public BenchQuery fetchAfter(String table, Consumer<ResultTable> tableHandler) {
4646
snapshotFetchers.put(table, tableHandler);
@@ -53,7 +53,7 @@ public BenchQuery fetchAfter(String table, Consumer<ResultTable> tableHandler) {
5353
*
5454
* @param table a table name present in the query logic
5555
* @param tableHandler a function that receives non-live snapshot of the table
56-
* @return a query configuration instance
56+
* @return this bench query instance
5757
*/
5858
public BenchQuery fetchDuring(String table, Function<ResultTable, Boolean> tableHandler) {
5959
tickingFetchers.put(table, tableHandler);
@@ -62,8 +62,15 @@ public BenchQuery fetchDuring(String table, Function<ResultTable, Boolean> table
6262

6363
/**
6464
* Add properties to be passed to the <code>Connector</code> used in the query
65+
*
66+
* @param name the name of the property
67+
* @param value the value of the property
68+
* @return this bench query instance
6569
*/
6670
public BenchQuery withProperty(String name, String value) {
71+
if (session.getConnector() != null) {
72+
throw new RuntimeException("Cannot set properties after first query is executed");
73+
}
6774
if (value != null && !value.isBlank()) {
6875
props.setProperty(name, value);
6976
}
@@ -76,11 +83,12 @@ public BenchQuery withProperty(String name, String value) {
7683
public void execute() {
7784
var timer = Timer.start();
7885
executeBarrageQuery(logic);
79-
tickingFetchers.entrySet().forEach(e -> bench.addFuture(session.fetchTickingData(e.getKey(), e.getValue())));
86+
var conn = session.getConnector();
87+
tickingFetchers.entrySet().forEach(e -> bench.addFuture(conn.fetchTickingData(e.getKey(), e.getValue())));
8088

8189
snapshotFetchers.entrySet().forEach(e -> {
8290
try {
83-
Future<Metrics> f = session.fetchSnapshotData(e.getKey(), e.getValue());
91+
Future<Metrics> f = conn.fetchSnapshotData(e.getKey(), e.getValue());
8492
Metrics metrics = f.get();
8593
metrics.set("duration.secs", timer.duration().toMillis() / 1000.0);
8694
bench.addFuture(f);
@@ -94,33 +102,33 @@ public void execute() {
94102
* Unsubscribe any fetchers, free used variables, and close the session
95103
*/
96104
public void close() {
97-
if (session == null)
105+
var conn = session.getConnector();
106+
if (conn == null)
98107
return;
99108

100-
if (!session.getUsedVariableNames().isEmpty()) {
101-
String logic = String.join("=None; ", session.getUsedVariableNames()) + "=None\n";
109+
if (!conn.getUsedVariableNames().isEmpty()) {
110+
String logic = String.join("=None; ", conn.getUsedVariableNames()) + "=None\n";
102111
executeBarrageQuery(logic);
103112
}
104-
105-
session.close();
106-
session = null;
107113
}
108114

109115
// Add function defs in separate query so if there are errors in the "logic" part, the line numbers match up
110116
private void executeBarrageQuery(String logic) {
111-
if (session == null) {
117+
var conn = session.getConnector();
118+
if (conn == null) {
112119
var connectorClass = bench.property("connector.class", "io.deephaven.benchmark.connect.BarrageConnector");
113120
var localProps = Bench.profile.getProperties();
114121
localProps.putAll(props);
115-
session = ConnectorFactory.create(connectorClass, localProps);
122+
conn = ConnectorFactory.create(connectorClass, localProps);
123+
session.setConnector(conn);
116124
}
117125
String snippetsLogic = Bench.profile.replaceProperties(Snippets.getFunctions(logic));
118126
if (!snippetsLogic.isBlank()) {
119127
queryLog.logQuery(snippetsLogic);
120-
session.executeQuery(snippetsLogic);
128+
conn.executeQuery(snippetsLogic);
121129
}
122130
String userLogic = Bench.profile.replaceProperties(logic);
123-
session.executeQuery(userLogic);
131+
conn.executeQuery(userLogic);
124132
queryLog.logQuery(userLogic);
125133
}
126134

src/main/java/io/deephaven/benchmark/api/BenchTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public boolean generateParquet() {
183183
String q = replaceTableAndGeneratorFields(useExistingParquetQuery);
184184

185185
AtomicBoolean usedExistingParquet = new AtomicBoolean(false);
186-
bench.query(q).fetchAfter("result", table -> {
186+
bench.query(q).fetchAfter("used_existing_parquet_" + tableName, table -> {
187187
usedExistingParquet.set(table.getValue(0, "UsedExistingParquet").toString().equalsIgnoreCase("true"));
188188
}).execute();
189189

@@ -346,7 +346,7 @@ with open(path) as f:
346346
os.link(str(matching_gen_parquet) + '.gen.parquet', table_parquet)
347347
usedExisting = True
348348
349-
result = new_table([string_col("UsedExistingParquet", [str(usedExisting)])])
349+
used_existing_parquet_${table.name} = new_table([string_col("UsedExistingParquet", [str(usedExisting)])])
350350
""";
351351

352352
static final String kafkaToParquetQuery = """
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
2+
package io.deephaven.benchmark.api;
3+
4+
import io.deephaven.benchmark.connect.Connector;
5+
6+
/**
7+
* Contains session information that is shared between queries but is managed by <code>BenchQuery</code>. Each
8+
* <code>Bench</code> instance allows only one reusable Session. If multiple queries are executed from the same
9+
* <code>Bench</code> instance, the queries will be executed against the same session.
10+
*/
11+
class Session {
12+
private Connector connector = null;
13+
14+
void setConnector(Connector connector) {
15+
this.connector = connector;
16+
}
17+
18+
Connector getConnector() {
19+
return connector;
20+
}
21+
22+
void close() {
23+
if (connector != null) {
24+
connector.close();
25+
connector = null;
26+
}
27+
}
28+
29+
}

src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.arrow.memory.RootAllocator;
1212
import io.deephaven.benchmark.metric.Metrics;
1313
import io.deephaven.benchmark.metric.MetricsFuture;
14+
import io.deephaven.benchmark.util.Log;
1415
import io.deephaven.client.impl.*;
1516
import io.deephaven.client.impl.script.Changes;
1617
import io.deephaven.engine.context.ExecutionContext;
@@ -98,7 +99,7 @@ public Set<String> getUsedVariableNames() {
9899
}
99100

100101
/**
101-
* Fetch the rows of a table create or modified by this session's queries
102+
* Fetch the rows of a table created or modified by this session's queries
102103
*
103104
* @param table the name of the table to fetch data from
104105
* @param tableHandler a consumer used to process the result table
@@ -108,6 +109,10 @@ public Future<Metrics> fetchSnapshotData(String table, Consumer<ResultTable> tab
108109
checkClosed();
109110
Metrics metrics = new Metrics("test-runner", "session." + table);
110111
MetricsFuture future = new MetricsFuture(metrics);
112+
113+
if (snapshots.containsKey(table))
114+
throw new RuntimeException("Cannot subscribe twice to the same table: " + table);
115+
111116
snapshots.computeIfAbsent(table, s -> {
112117
try {
113118
BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder().build();
@@ -141,6 +146,10 @@ public Future<Metrics> fetchTickingData(String table, Function<ResultTable, Bool
141146
checkClosed();
142147
Metrics metrics = new Metrics("test-runner", "session." + table);
143148
MetricsFuture future = new MetricsFuture(metrics);
149+
150+
if (subscriptions.containsKey(table))
151+
throw new RuntimeException("Cannot subscribe twice to the same table: " + table);
152+
144153
subscriptions.computeIfAbsent(table, s -> {
145154
try {
146155
BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder().build();
@@ -169,17 +178,17 @@ public Future<Metrics> fetchTickingData(String table, Function<ResultTable, Bool
169178
* the session on the server.
170179
*/
171180
public void close() {
172-
try {
173-
if (isClosed.get())
174-
return;
175-
isClosed.set(true);
176-
subscriptions.values().forEach(s -> {
177-
s.handle.close();
178-
});
179-
subscriptions.clear();
180-
variableNames.clear();
181-
} catch (Exception ex) {
182-
}
181+
if (isClosed.get())
182+
return;
183+
isClosed.set(true);
184+
subscriptions.keySet().forEach(t -> {
185+
closeSubscription(t);
186+
});
187+
snapshots.keySet().forEach(t -> {
188+
closeSubscription(t);
189+
});
190+
variableNames.clear();
191+
183192
try {
184193
console.close();
185194
} catch (Exception ex) {
@@ -207,6 +216,16 @@ private void checkClosed() {
207216
throw new RuntimeException("Session is closed");
208217
}
209218

219+
private void closeSubscription(String tableName) {
220+
try {
221+
var subscription = subscriptions.remove(tableName);
222+
if (subscription != null)
223+
subscription.handle.close();
224+
} catch (Exception ex) {
225+
Log.info("Failed to close handle for subscription: %s", tableName);
226+
}
227+
}
228+
210229
private FieldInfo findTable(String table) {
211230
Optional<FieldInfo> found =
212231
changes.changes().created().stream().filter(f -> f.name().equals(table)).findFirst();

src/test/java/io/deephaven/benchmark/api/BenchPlatformTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */
1+
/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */
22
package io.deephaven.benchmark.api;
33

44
import static org.junit.jupiter.api.Assertions.*;
@@ -56,7 +56,7 @@ void nomalize() {
5656

5757
static class LocalPlatform extends BenchPlatform {
5858
LocalPlatform(Path dir, String fileName) {
59-
super(dir, fileName, getProfileProps());
59+
super(null, dir, fileName, getProfileProps());
6060
}
6161

6262
static Properties getProfileProps() {

0 commit comments

Comments
 (0)