Skip to content

Commit c45b106

Browse files
committed
Fix race issue and improve readAsynchronousFileChannel
Before this commit, ReadCompletionHandler delayed closing the channel to allow an ongoing read to complete/fail so we can get a hold of the associated DataBuffer and release it. This can be problematic since the read take time to complete but even more importantly there was a race condition where we didn't check if we've been disposed concurrently while releasing the read flag. This commit removes the delay and closes the channel immediately from cancel() and that should in turn fail any ongoing read operation, according to AsynchronousChannel, and the DataBuffer is released. Further improvements include: - combining the "reading" and "disposed" AtomicBoolean's into a single "state" AtomicReference. - an optimistic check to remain in READING mode and avoid state switches when there is demand to continue reading. Closes gh-25831
1 parent 431ec90 commit c45b106

File tree

1 file changed

+59
-46
lines changed

1 file changed

+59
-46
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -683,9 +683,7 @@ private static class ReadCompletionHandler implements CompletionHandler<Integer,
683683

684684
private final AtomicLong position;
685685

686-
private final AtomicBoolean reading = new AtomicBoolean();
687-
688-
private final AtomicBoolean disposed = new AtomicBoolean();
686+
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
689687

690688
public ReadCompletionHandler(AsynchronousFileChannel channel,
691689
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
@@ -697,66 +695,81 @@ public ReadCompletionHandler(AsynchronousFileChannel channel,
697695
this.bufferSize = bufferSize;
698696
}
699697

700-
public void read() {
701-
if (this.sink.requestedFromDownstream() > 0 &&
702-
isNotDisposed() &&
703-
this.reading.compareAndSet(false, true)) {
704-
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
705-
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
706-
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
698+
/**
699+
* Invoked when Reactive Streams consumer signals demand.
700+
*/
701+
public void request(long n) {
702+
tryRead();
703+
}
704+
705+
/**
706+
* Invoked when Reactive Streams consumer cancels.
707+
*/
708+
public void cancel() {
709+
this.state.getAndSet(State.DISPOSED);
710+
711+
// According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding
712+
// on the channel and the channel's close method is invoked, then the I/O operation
713+
// fails with the exception AsynchronousCloseException". That should invoke the failed
714+
// callback below which and the current DataBuffer should be released.
715+
716+
closeChannel(this.channel);
717+
}
718+
719+
private void tryRead() {
720+
if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) {
721+
read();
707722
}
708723
}
709724

725+
private void read() {
726+
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
727+
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
728+
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
729+
}
730+
710731
@Override
711732
public void completed(Integer read, DataBuffer dataBuffer) {
712-
if (isNotDisposed()) {
713-
if (read != -1) {
714-
this.position.addAndGet(read);
715-
dataBuffer.writePosition(read);
716-
this.sink.next(dataBuffer);
717-
this.reading.set(false);
718-
read();
719-
}
720-
else {
721-
release(dataBuffer);
722-
closeChannel(this.channel);
723-
if (this.disposed.compareAndSet(false, true)) {
724-
this.sink.complete();
725-
}
726-
this.reading.set(false);
727-
}
733+
if (this.state.get().equals(State.DISPOSED)) {
734+
release(dataBuffer);
735+
closeChannel(this.channel);
736+
return;
728737
}
729-
else {
738+
739+
if (read == -1) {
730740
release(dataBuffer);
731741
closeChannel(this.channel);
732-
this.reading.set(false);
742+
this.state.set(State.DISPOSED);
743+
this.sink.complete();
744+
return;
745+
}
746+
747+
this.position.addAndGet(read);
748+
dataBuffer.writePosition(read);
749+
this.sink.next(dataBuffer);
750+
751+
// Stay in READING mode if there is demand
752+
if (this.sink.requestedFromDownstream() > 0) {
753+
read();
754+
return;
755+
}
756+
757+
// Release READING mode and then try again in case of concurrent "request"
758+
if (this.state.compareAndSet(State.READING, State.IDLE)) {
759+
tryRead();
733760
}
734761
}
735762

736763
@Override
737764
public void failed(Throwable exc, DataBuffer dataBuffer) {
738765
release(dataBuffer);
739766
closeChannel(this.channel);
740-
if (this.disposed.compareAndSet(false, true)) {
741-
this.sink.error(exc);
742-
}
743-
this.reading.set(false);
744-
}
745-
746-
public void request(long n) {
747-
read();
748-
}
749-
750-
public void cancel() {
751-
if (this.disposed.compareAndSet(false, true)) {
752-
if (!this.reading.get()) {
753-
closeChannel(this.channel);
754-
}
755-
}
767+
this.state.set(State.DISPOSED);
768+
this.sink.error(exc);
756769
}
757770

758-
private boolean isNotDisposed() {
759-
return !this.disposed.get();
771+
private enum State {
772+
IDLE, READING, DISPOSED
760773
}
761774
}
762775

0 commit comments

Comments
 (0)