Skip to content

Commit 27b52e6

Browse files
committed
8365066: RecordingStream and RemoteRecordingStream do not terminate when the associated Recording is stopped or closed externally
1 parent 241808e commit 27b52e6

File tree

15 files changed

+555
-51
lines changed

15 files changed

+555
-51
lines changed

src/jdk.jfr/share/classes/jdk/jfr/consumer/EventStream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import jdk.jfr.internal.consumer.EventDirectoryStream;
3737
import jdk.jfr.internal.consumer.EventFileStream;
38+
import jdk.jfr.internal.consumer.FileEventSource;
3839

3940
/**
4041
* Represents a stream of events.
@@ -112,7 +113,7 @@ public interface EventStream extends AutoCloseable {
112113
public static EventStream openRepository() throws IOException {
113114
return new EventDirectoryStream(
114115
null,
115-
null,
116+
new FileEventSource(),
116117
Collections.emptyList(),
117118
false
118119
);
@@ -137,7 +138,7 @@ public static EventStream openRepository(Path directory) throws IOException {
137138
Objects.requireNonNull(directory, "directory");
138139
return new EventDirectoryStream(
139140
directory,
140-
null,
141+
new FileEventSource(),
141142
Collections.emptyList(),
142143
true
143144
);

src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import jdk.jfr.RecordingState;
4444
import jdk.jfr.internal.PlatformRecording;
4545
import jdk.jfr.internal.PrivateAccess;
46+
import jdk.jfr.internal.management.EventSource;
47+
import jdk.jfr.internal.consumer.LocalRecordingEventSource;
4648
import jdk.jfr.internal.util.Utils;
4749
import jdk.jfr.internal.consumer.EventDirectoryStream;
4850
import jdk.jfr.internal.management.StreamBarrier;
@@ -82,6 +84,7 @@ public void accept(Long endNanos) {
8284
private final EventDirectoryStream directoryStream;
8385
private long maxSize;
8486
private Duration maxAge;
87+
private final EventSource eventSource;
8588

8689
/**
8790
* Creates an event stream for the current JVM (Java Virtual Machine).
@@ -100,9 +103,10 @@ private RecordingStream(Map<String, String> settings) {
100103
this.recording.setName("Recording Stream: " + creationTime);
101104
try {
102105
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
106+
this.eventSource = new LocalRecordingEventSource(pr);
103107
this.directoryStream = new EventDirectoryStream(
104108
null,
105-
pr,
109+
eventSource,
106110
configurations(),
107111
false
108112
);
@@ -392,9 +396,9 @@ public boolean stop() {
392396
boolean stopped = false;
393397
try {
394398
try (StreamBarrier sb = directoryStream.activateStreamBarrier()) {
395-
stopped = recording.stop();
399+
stopped = eventSource.stop();
396400
directoryStream.setCloseOnComplete(false);
397-
sb.setStreamEnd(recording.getStopTime().toEpochMilli());
401+
sb.setStreamEnd(eventSource.getStopTime());
398402
}
399403
directoryStream.awaitTermination();
400404
} catch (InterruptedException | IOException e) {

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import jdk.jfr.internal.LogLevel;
4444
import jdk.jfr.internal.LogTag;
4545
import jdk.jfr.internal.Logger;
46+
import jdk.jfr.internal.management.EventSource;
4647

4748
/*
4849
* Purpose of this class is to simplify the implementation of
@@ -60,10 +61,12 @@ public abstract class AbstractEventStream implements EventStream {
6061
private volatile boolean waitForChunks = true;
6162
private Dispatcher dispatcher;
6263
private boolean daemon = false;
64+
protected final EventSource eventSource;
6365

6466

65-
AbstractEventStream(List<Configuration> configurations) throws IOException {
67+
AbstractEventStream(List<Configuration> configurations, EventSource eventSource) throws IOException {
6668
this.configurations = configurations;
69+
this.eventSource = eventSource;
6770
}
6871

6972
@Override
@@ -207,8 +210,6 @@ public final void awaitTermination(Duration timeout) throws InterruptedException
207210

208211
protected abstract void process() throws IOException;
209212

210-
protected abstract boolean isRecordingStream();
211-
212213
protected final void closeParser() {
213214
parserState.close();
214215
}
@@ -250,7 +251,7 @@ private void startInternal(long startNanos) {
250251
if (streamConfiguration.started) {
251252
throw new IllegalStateException("Event stream can only be started once");
252253
}
253-
if (isRecordingStream() && streamConfiguration.startTime == null) {
254+
if (eventSource.requiresStartTime() && streamConfiguration.startTime == null) {
254255
streamConfiguration.setStartNanos(startNanos);
255256
}
256257
streamConfiguration.setStarted(true);

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import jdk.jfr.internal.LogLevel;
4242
import jdk.jfr.internal.LogTag;
4343
import jdk.jfr.internal.Logger;
44-
import jdk.jfr.internal.PlatformRecording;
44+
import jdk.jfr.internal.management.EventSource;
4545
import jdk.jfr.internal.util.Utils;
4646
import jdk.jfr.internal.management.StreamBarrier;
4747

@@ -55,7 +55,6 @@ public final class EventDirectoryStream extends AbstractEventStream {
5555
private static final Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
5656

5757
private final RepositoryFiles repositoryFiles;
58-
private final PlatformRecording recording;
5958
private final StreamBarrier barrier = new StreamBarrier();
6059
private final AtomicLong streamId = new AtomicLong();
6160
private ChunkParser currentParser;
@@ -66,11 +65,10 @@ public final class EventDirectoryStream extends AbstractEventStream {
6665

6766
public EventDirectoryStream(
6867
Path p,
69-
PlatformRecording recording,
68+
EventSource eventSource,
7069
List<Configuration> configurations,
7170
boolean allowSubDirectories) throws IOException {
72-
super(configurations);
73-
this.recording = recording;
71+
super(configurations, eventSource);
7472
this.repositoryFiles = new RepositoryFiles(p, allowSubDirectories);
7573
this.streamId.incrementAndGet();
7674
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Stream " + streamId + " started.");
@@ -131,7 +129,7 @@ protected void processRecursionSafe() throws IOException {
131129
Dispatcher lastDisp = null;
132130
Dispatcher disp = dispatcher();
133131
Path path;
134-
boolean validStartTime = isRecordingStream() || disp.startTime != null;
132+
boolean validStartTime = eventSource.requiresStartTime() || disp.startTime != null;
135133
if (validStartTime) {
136134
path = repositoryFiles.firstPath(disp.startNanos, true);
137135
} else {
@@ -169,6 +167,19 @@ protected void processRecursionSafe() throws IOException {
169167
"ns (epoch), parser at " + lastFlush + "ns (epoch).");
170168
return;
171169
}
170+
171+
if(!barrier.used()) {
172+
RecordingState state = eventSource.getState();
173+
if (state == RecordingState.CLOSED) {
174+
return;
175+
} else if (state == RecordingState.STOPPED) {
176+
long stopTime = eventSource.getStopTime();
177+
if (lastFlush > stopTime) {
178+
logStreamEnd("stopped at " + stopTime + "ns (epoch), parser at " + lastFlush + "ns (epoch).");
179+
return;
180+
}
181+
}
182+
}
172183
}
173184
long endNanos = currentParser.getStartNanos() + currentParser.getChunkDuration();
174185
long endMillis = Instant.ofEpochSecond(0, endNanos).toEpochMilli();
@@ -181,9 +192,13 @@ protected void processRecursionSafe() throws IOException {
181192
return;
182193
}
183194

184-
if (isRecordingStream()) {
185-
if (recording.getState() == RecordingState.STOPPED && !barrier.used()) {
186-
logStreamEnd("recording stopped externally.");
195+
if(!barrier.used()) {
196+
RecordingState state = eventSource.getState();
197+
if (state == RecordingState.CLOSED){
198+
logStreamEnd("Event source is closed externally");
199+
return;
200+
}else if(state == RecordingState.STOPPED) {
201+
logStreamEnd("Event source is stopped externally");
187202
return;
188203
}
189204
}
@@ -226,10 +241,6 @@ private void logStreamEnd(String text) {
226241
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, msg);
227242
}
228243

229-
protected boolean isRecordingStream() {
230-
return recording != null;
231-
}
232-
233244
private void processOrdered(Dispatcher c) throws IOException {
234245
if (sortedCache == null) {
235246
sortedCache = new RecordedEvent[100_000];

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventFileStream.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public final class EventFileStream extends AbstractEventStream {
4545
private RecordedEvent[] cacheSorted;
4646

4747
public EventFileStream(Path file) throws IOException {
48-
super(Collections.emptyList());
48+
super(Collections.emptyList(), new FileEventSource());
4949
this.input = new RecordingInput(file.toFile());
5050
this.input.setStreamed();
5151
}
@@ -71,11 +71,6 @@ public void close() {
7171
}
7272
}
7373

74-
@Override
75-
protected boolean isRecordingStream() {
76-
return false;
77-
}
78-
7974
@Override
8075
protected void process() throws IOException {
8176
Dispatcher disp = dispatcher();
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
26+
package jdk.jfr.internal.consumer;
27+
28+
import jdk.jfr.internal.management.EventSource;
29+
30+
public class FileEventSource implements EventSource {
31+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
26+
package jdk.jfr.internal.consumer;
27+
28+
import jdk.jfr.RecordingState;
29+
30+
import jdk.jfr.internal.PlatformRecording;
31+
import jdk.jfr.internal.management.EventSource;
32+
33+
public class LocalRecordingEventSource implements EventSource {
34+
35+
private final PlatformRecording recording;
36+
37+
public LocalRecordingEventSource(PlatformRecording recording) {
38+
this.recording = recording;
39+
}
40+
41+
@Override
42+
public boolean requiresStartTime() {
43+
return true;
44+
}
45+
46+
@Override
47+
public long getStopTime() {
48+
return recording.getStopTime().toEpochMilli();
49+
}
50+
51+
@Override
52+
public boolean stop() {
53+
return recording.stop("stopped by RecordingStream");
54+
}
55+
56+
@Override
57+
public RecordingState getState() {
58+
return recording.getState();
59+
}
60+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
26+
package jdk.jfr.internal.management;
27+
28+
import jdk.jfr.RecordingState;
29+
30+
public interface EventSource {
31+
32+
default long getStopTime() {
33+
return Long.MAX_VALUE;
34+
}
35+
36+
default boolean stop() {
37+
return true;
38+
}
39+
40+
default boolean requiresStartTime(){
41+
return false;
42+
}
43+
44+
default RecordingState getState() {
45+
return RecordingState.RUNNING;
46+
}
47+
}

src/jdk.jfr/share/classes/jdk/jfr/internal/management/ManagementSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ public static Configuration newConfiguration(String name, String label, String d
163163
// with configuration objects
164164
public static EventStream newEventDirectoryStream(
165165
Path directory,
166-
List<Configuration> confs) throws IOException {
166+
List<Configuration> confs, EventSource eventSource) throws IOException {
167167
return new EventDirectoryStream(
168168
directory,
169-
null,
169+
eventSource,
170170
confs,
171171
false
172172
);

0 commit comments

Comments
 (0)