Skip to content

Commit 25629cc

Browse files
committed
Added jfr events
1 parent f35ab4f commit 25629cc

File tree

4 files changed

+94
-37
lines changed

4 files changed

+94
-37
lines changed

src/it/java/io/deephaven/benchmark/tests/train/TrainTestRunner.java

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,46 +49,90 @@ public void test(String name, long maxExpectedRowCount, String operation, String
4949
Recording = jpy.get_type("jdk.jfr.Recording")
5050
rec = Recording()
5151
rec.setName("benchmark")
52+
53+
enabled_events=['jdk.GarbageCollection', 'jdk.GCPhasePause', 'jdk.GCPhaseConcurrent', 'jdk.GCCPUTime']
54+
for n in enabled_events:
55+
try:
56+
rec.enable(n)
57+
except Exception as e:
58+
print(f"Event Not Enabled: {e}")
59+
60+
disabled_events=['jdk.ExecutionSample', 'jdk.JavaMonitorEnter', 'jdk.JavaMonitorWait', 'jdk.ThreadSleep',
61+
'jdk.SocketRead', 'jdk.SocketWrite']
62+
for n in disabled_events:
63+
try:
64+
rec.disable(_ename)
65+
except Exception:
66+
print(f"Event Not Disabled: {e}")
67+
5268
rec.start()
5369
""";
5470

5571
static final String stopJfrQuery = """
5672
Paths = jpy.get_type("java.nio.file.Paths")
5773
RecordingFile = jpy.get_type("jdk.jfr.consumer.RecordingFile")
74+
5875
rec.dump(Paths.get("/data/benchmark.jfr"))
5976
rec.stop()
6077
rec.close()
78+
6179
events = RecordingFile.readAllEvents(Paths.get("/data/benchmark.jfr"))
6280
63-
# Log each event's fields to the console for inspection
64-
print("=== JFR event dump begin ===")
65-
for i in range(events.size()):
66-
e = events.get(i)
67-
etype = e.getEventType()
68-
print(f"Event {i}: type={etype.getName()}")
69-
fields = e.getFields()
70-
for idx in range(fields.size()):
71-
fd = fields.get(idx)
72-
fname = fd.getName()
73-
fval = e.getValue(fname)
74-
print(f" {fname} = {fval}")
75-
print("--")
76-
print("=== JFR event dump end ===")
77-
7881
jfr_rows = []
82+
83+
def getEventValue(ev, field):
84+
try:
85+
return ev.getValue(field)
86+
except Exception:
87+
return None
88+
89+
def getNanoValue(ev, duration_field):
90+
val = ev.getValue(duration_field)
91+
if val is None or str(val) == "null": return 0
92+
if isinstance(val, int): return val
93+
if hasattr(val, "size") and hasattr(val, "get"):
94+
total = 0
95+
for i in range(val.size()):
96+
d = val.get(i)
97+
if d is not None and str(d) != "null": total += d.toNanos()
98+
return total
99+
if hasattr(val, "toNanos"): return val.toNanos()
100+
raise TypeError(f"Unsupported JFR value type: {type(val)}")
101+
102+
79103
for i in range(events.size()):
80104
e = events.get(i)
81-
start = e.getStartTime().getEpochSecond() * 1000000000 + e.getStartTime().getNano()
82-
dur = e.getDuration().getSeconds() * 1000000000 + e.getDuration().getNano()
83-
jfr_rows.append([str(e.getEventType().getName()), start, dur, str(e)])
84-
jfr = new_table([
85-
string_col("origin", ["jfr" for r in jfr_rows]),
86-
string_col("type", [r[0] for r in jfr_rows]),
87-
long_col("start_ns", [r[1] for r in jfr_rows]),
88-
long_col("duration_ns", [r[2] for r in jfr_rows]),
89-
string_col("detail", [r[3] for r in jfr_rows]),
90-
])
91-
standard_events = merge([standard_events, jfr])
105+
etype = e.getEventType().getName()
106+
start = e.getStartTime().getEpochSecond() * 1000000000 + e.getStartTime().getNano();
107+
108+
if etype == 'jdk.GarbageCollection':
109+
duration = getNanoValue(e, 'duration')
110+
name = getEventValue(e, 'name')
111+
value = getNanoValue(e, 'sumOfPauses')
112+
elif etype == 'jdk.GCPhasePause' or etype == 'jdk.GCPhaseConcurrent':
113+
duration = getNanoValue(e, 'duration')
114+
name = getEventValue(e, 'name')
115+
value = duration
116+
elif etype == 'jdk.GCCPUTime':
117+
duration = getNanoValue(e, 'realTime')
118+
name = "cpuTime"
119+
value = getNanoValue(e, 'systemTime') + getNanoValue(e, 'userTime')
120+
else:
121+
continue
122+
123+
jfr_rows.append([etype, start, duration, name, value])
124+
125+
# Only create a table if we saw any GC events
126+
if len(jfr_rows) > 0:
127+
jfr_gc = new_table([
128+
string_col("origin", ["deephaven-engine" for r in jfr_rows]),
129+
string_col("type", [r[0] for r in jfr_rows]),
130+
long_col("start_ns", [r[1] for r in jfr_rows]),
131+
long_col("duration_ns", [r[2] for r in jfr_rows]),
132+
string_col("name", [r[3] for r in jfr_rows]),
133+
double_col("value", [r[4] for r in jfr_rows]),
134+
])
135+
standard_events = merge([standard_events, jfr_gc])
92136
""";
93137

94138
static final String ugpQuery = """

src/main/java/io/deephaven/benchmark/api/BenchEvents.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* Represents the events gathered during usage of the Bench API. These can include events gather by the API or the user.
1313
*/
1414
final public class BenchEvents {
15-
static final String header = "benchmark_name,origin,type,start,duration,detail";
15+
static final String header = "benchmark_name,origin,type,start,duration,name,value";
1616
final List<Event> events = new ArrayList<>();
1717
final Path file;
1818
private String name = null;
@@ -38,8 +38,9 @@ public BenchEvents add(ResultTable table) {
3838
var type = table.getValue(r, "type").toString();
3939
var startNanos = table.getNumber(r, "start_ns").longValue();
4040
var durationNanos = table.getNumber(r, "duration_ns").longValue();
41-
var details = table.getValue(r, "detail").toString();
42-
var event = new Event(origin, type, startNanos, durationNanos, details);
41+
var name = String.valueOf(table.getValue(r, "name"));
42+
var value = table.getNumber(r, "value").doubleValue();
43+
var event = new Event(origin, type, startNanos, durationNanos, name, value);
4344
events.add(event);
4445
}
4546
return this;
@@ -75,10 +76,10 @@ static void writeLine(String line, Path file) {
7576
}
7677
}
7778

78-
record Event(String origin, String type, long startNanos, long durationNanos, String detail) {
79+
record Event(String origin, String type, long startNanos, long durationNanos, String name, double value) {
7980
String toCsv() {
80-
return origin + "," + type + "," + startNanos + "," + durationNanos + "," + detail;
81+
return origin + "," + type + "," + startNanos + "," + durationNanos + "," + name + "," + value;
8182
}
8283
}
8384

84-
}
85+
}

src/main/java/io/deephaven/benchmark/api/Snippets.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ def bench_api_metrics_add(category, name, value, note=''):
175175
* ex. bench_api_metrics_table = bench_api_metrics_collect()
176176
*/
177177
static String bench_api_metrics_collect = """
178-
from deephaven import input_table, empty_table, dtypes as dht
178+
from deephaven import input_table, empty_table, new_table, dtypes as dht
179+
from deephaven.column import string_col, long_col, double_col
179180
def bench_api_metrics_collect():
180181
s = dht.string
181182
t = input_table({'timestamp':s,'origin':s,'category':s,'name':s,'value':s,'note':s})
@@ -185,8 +186,15 @@ def bench_api_metrics_collect():
185186
t.add(m1)
186187
return t
187188
188-
standard_events = new_table([ string_col("origin",[]), string_col("type",[]), long_col("start_ns",[]),
189-
long_col("duration_ns",[]), string_col("detail",[])])
189+
# Standard events table used by JFR workflows
190+
standard_events = new_table([
191+
string_col("origin", []),
192+
string_col("type", []),
193+
long_col("start_ns", []),
194+
long_col("duration_ns", []),
195+
string_col("name", []),
196+
double_col("value", []),
197+
])
190198
""";
191199

192200
/**
@@ -261,4 +269,4 @@ static String getFunc(String functionName, String functionDef, String query, Str
261269
return functionDef + System.lineSeparator();
262270
}
263271

264-
}
272+
}

src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class BarrageConnector implements Connector {
3535
static {
3636
System.setProperty("thread.initialization", ""); // Remove server side initializers (e.g. DebuggingInitializer)
3737
}
38-
static final int maxFetchCount = 1000;
38+
static final int maxFetchCount = 100000;
39+
static final int inboundMessageMB = 64;
3940
final private BarrageSession session;
4041
final private ConsoleSession console;
4142
final private ManagedChannel channel;
@@ -243,6 +244,9 @@ private ManagedChannel getManagedChannel(String host, int port) {
243244
final ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port);
244245
channelBuilder.usePlaintext();
245246
// channelBuilder.useTransportSecurity(); If eventually security is needed
247+
// Increase the maximum inbound message size so large Barrage snapshots (e.g. standard_events)
248+
// do not trip the default 4 MiB gRPC limit while prototyping benchmarks.
249+
channelBuilder.maxInboundMessageSize(inboundMessageMB * 1024 * 1024); // 32 MiB
246250

247251
return channelBuilder.build();
248252
}

0 commit comments

Comments
 (0)