Skip to content

Commit 5b437d7

Browse files
committed
8365066: RecordingStream and RemoteRecordingStream do not terminate when the associated Recording is stopped or closed externally
1 parent 5febc4e commit 5b437d7

File tree

7 files changed

+618
-64
lines changed

7 files changed

+618
-64
lines changed

src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java

Lines changed: 126 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Objects;
36+
import java.util.concurrent.locks.ReentrantLock;
3637
import java.util.function.Consumer;
3738

3839
import jdk.jfr.Configuration;
3940
import jdk.jfr.Event;
4041
import jdk.jfr.EventSettings;
4142
import jdk.jfr.EventType;
43+
import jdk.jfr.FlightRecorder;
44+
import jdk.jfr.FlightRecorderListener;
4245
import jdk.jfr.Recording;
4346
import jdk.jfr.RecordingState;
4447
import jdk.jfr.internal.PlatformRecording;
@@ -82,6 +85,11 @@ public void accept(Long endNanos) {
8285
private final EventDirectoryStream directoryStream;
8386
private long maxSize;
8487
private Duration maxAge;
88+
private final ReentrantLock lock = new ReentrantLock();
89+
private volatile boolean closed;
90+
private volatile boolean stopped;
91+
private LocalStoppedListener localStoppedListener;
92+
private LocalClosedListener localClosedListener;
8593

8694
/**
8795
* Creates an event stream for the current JVM (Java Virtual Machine).
@@ -325,9 +333,41 @@ public void onError(Consumer<Throwable> action) {
325333

326334
@Override
327335
public void close() {
328-
directoryStream.setChunkCompleteHandler(null);
329-
recording.close();
330-
directoryStream.close();
336+
try {
337+
lock.lock();
338+
if (closed) {
339+
return;
340+
}
341+
if (localStoppedListener != null) {
342+
FlightRecorder.removeListener(localStoppedListener);
343+
}
344+
if (localClosedListener != null) {
345+
FlightRecorder.removeListener(localClosedListener);
346+
}
347+
recording.close();
348+
closeInternal();
349+
} finally {
350+
lock.unlock();
351+
}
352+
}
353+
354+
private void closeInternal() {
355+
final boolean isHeldByCurrentThread = lock.isHeldByCurrentThread();
356+
try {
357+
if (!isHeldByCurrentThread) {
358+
lock.lock();
359+
}
360+
if (closed) {
361+
return;
362+
}
363+
directoryStream.setChunkCompleteHandler(null);
364+
directoryStream.close();
365+
closed = true;
366+
} finally {
367+
if (!isHeldByCurrentThread) {
368+
lock.unlock();
369+
}
370+
}
331371
}
332372

333373
@Override
@@ -339,6 +379,10 @@ public boolean remove(Object action) {
339379
public void start() {
340380
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
341381
long startNanos = pr.start();
382+
this.localStoppedListener = new LocalStoppedListener(pr.getId(), this);
383+
this.localClosedListener = new LocalClosedListener(pr.getId(), this);
384+
FlightRecorder.addListener(localStoppedListener);
385+
FlightRecorder.addListener(localClosedListener);
342386
updateOnCompleteHandler();
343387
directoryStream.start(startNanos);
344388
}
@@ -362,6 +406,10 @@ public void start() {
362406
public void startAsync() {
363407
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
364408
long startNanos = pr.start();
409+
this.localStoppedListener = new LocalStoppedListener(pr.getId(), this);
410+
this.localClosedListener = new LocalClosedListener(pr.getId(), this);
411+
FlightRecorder.addListener(localStoppedListener);
412+
FlightRecorder.addListener(localClosedListener);
365413
updateOnCompleteHandler();
366414
directoryStream.startAsync(startNanos);
367415
}
@@ -389,20 +437,47 @@ public void startAsync() {
389437
* @since 20
390438
*/
391439
public boolean stop() {
392-
boolean stopped = false;
393440
try {
394-
try (StreamBarrier sb = directoryStream.activateStreamBarrier()) {
395-
stopped = recording.stop();
396-
directoryStream.setCloseOnComplete(false);
397-
sb.setStreamEnd(recording.getStopTime().toEpochMilli());
441+
lock.lock();
442+
if (stopped) {
443+
return true;
398444
}
445+
if (localStoppedListener != null) {
446+
FlightRecorder.removeListener(localStoppedListener);
447+
}
448+
recording.stop();
449+
stopInternal(recording.getStopTime().toEpochMilli());
399450
directoryStream.awaitTermination();
400-
} catch (InterruptedException | IOException e) {
451+
} catch (InterruptedException e) {
401452
// OK, return
453+
} finally {
454+
lock.unlock();
402455
}
403456
return stopped;
404457
}
405458

459+
private void stopInternal(long stopTime) {
460+
final boolean isHeldByCurrentThread = lock.isHeldByCurrentThread();
461+
try {
462+
if (!isHeldByCurrentThread) {
463+
lock.lock();
464+
}
465+
if (stopped) {
466+
return;
467+
}
468+
try (StreamBarrier sb = directoryStream.activateStreamBarrier()) {
469+
directoryStream.setCloseOnComplete(false);
470+
sb.setStreamEnd(stopTime);
471+
stopped = true;
472+
} catch (Exception e) {
473+
}
474+
} finally {
475+
if (!isHeldByCurrentThread) {
476+
lock.unlock();
477+
}
478+
}
479+
}
480+
406481
/**
407482
* Writes recording data to a file.
408483
* <p>
@@ -481,4 +556,46 @@ private void updateOnCompleteHandler() {
481556
directoryStream.setChunkCompleteHandler(new ChunkConsumer(recording));
482557
}
483558
}
559+
560+
static final class LocalStoppedListener implements FlightRecorderListener {
561+
562+
private final long recordingId;
563+
private final RecordingStream recordingStream;
564+
565+
public LocalStoppedListener(long recordingId, RecordingStream recordingStream) {
566+
this.recordingId = recordingId;
567+
this.recordingStream = recordingStream;
568+
}
569+
570+
@Override
571+
public void recordingStateChanged(Recording recording) {
572+
if (this.recordingId != recording.getId()) {
573+
return;
574+
}
575+
if (recording.getState() == RecordingState.STOPPED) {
576+
recordingStream.stopInternal(recording.getStopTime().toEpochMilli());
577+
}
578+
}
579+
}
580+
581+
static final class LocalClosedListener implements FlightRecorderListener {
582+
583+
private final long recordingId;
584+
private final RecordingStream recordingStream;
585+
586+
public LocalClosedListener(long recordingId, RecordingStream recordingStream) {
587+
this.recordingId = recordingId;
588+
this.recordingStream = recordingStream;
589+
}
590+
591+
@Override
592+
public void recordingStateChanged(Recording recording) {
593+
if (this.recordingId != recording.getId()) {
594+
return;
595+
}
596+
if (recording.getState() == RecordingState.CLOSED) {
597+
recordingStream.closeInternal();
598+
}
599+
}
600+
}
484601
}

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.function.Consumer;
3636

3737
import jdk.jfr.Configuration;
38-
import jdk.jfr.RecordingState;
3938
import jdk.jfr.consumer.RecordedEvent;
4039
import jdk.jfr.internal.JVM;
4140
import jdk.jfr.internal.LogLevel;
@@ -181,13 +180,6 @@ protected void processRecursionSafe() throws IOException {
181180
return;
182181
}
183182

184-
if (isRecordingStream()) {
185-
if (recording.getState() == RecordingState.STOPPED && !barrier.used()) {
186-
logStreamEnd("recording stopped externally.");
187-
return;
188-
}
189-
}
190-
191183
if (repositoryFiles.hasFixedPath() && currentParser.isFinalChunk()) {
192184
logStreamEnd("JVM process exited/crashed, or repository migrated to an unknown location.");
193185
return;

0 commit comments

Comments
 (0)