Skip to content

Commit f748b1e

Browse files
committed
Fix timing bug in DataBufferUtils::readAsynchronousFileChannel
This commit makes sure that reading is enabled after the current signal has been processed, not while is is being processed. The bug was only apparent while using the JettyClientHttpConnector, which requests new elements continuously, even after the end of the stream has been signalled.
1 parent 9729b46 commit f748b1e

File tree

2 files changed

+27
-17
lines changed

2 files changed

+27
-17
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ private static class ReadCompletionHandler implements CompletionHandler<Integer,
506506

507507
private final AtomicBoolean reading = new AtomicBoolean();
508508

509-
private final AtomicBoolean canceled = new AtomicBoolean();
509+
private final AtomicBoolean disposed = new AtomicBoolean();
510510

511511
public ReadCompletionHandler(AsynchronousFileChannel channel,
512512
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
@@ -519,7 +519,9 @@ public ReadCompletionHandler(AsynchronousFileChannel channel,
519519
}
520520

521521
public void read() {
522-
if (this.sink.requestedFromDownstream() > 0 && this.reading.compareAndSet(false, true)) {
522+
if (this.sink.requestedFromDownstream() > 0 &&
523+
isNotDisposed() &&
524+
this.reading.compareAndSet(false, true)) {
523525
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
524526
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
525527
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
@@ -528,50 +530,54 @@ public void read() {
528530

529531
@Override
530532
public void completed(Integer read, DataBuffer dataBuffer) {
531-
this.reading.set(false);
532-
if (!isCanceled()) {
533+
if (isNotDisposed()) {
533534
if (read != -1) {
534535
this.position.addAndGet(read);
535536
dataBuffer.writePosition(read);
536537
this.sink.next(dataBuffer);
538+
this.reading.set(false);
537539
read();
538540
}
539541
else {
540542
release(dataBuffer);
541543
closeChannel(this.channel);
542-
this.sink.complete();
544+
if (this.disposed.compareAndSet(false, true)) {
545+
this.sink.complete();
546+
}
547+
this.reading.set(false);
543548
}
544549
}
545550
else {
546551
release(dataBuffer);
547552
closeChannel(this.channel);
553+
this.reading.set(false);
548554
}
549555
}
550556

551557
@Override
552558
public void failed(Throwable exc, DataBuffer dataBuffer) {
553-
this.reading.set(false);
554559
release(dataBuffer);
555560
closeChannel(this.channel);
556-
if (!isCanceled()) {
561+
if (this.disposed.compareAndSet(false, true)) {
557562
this.sink.error(exc);
558563
}
564+
this.reading.set(false);
559565
}
560566

561567
public void request(long n) {
562568
read();
563569
}
564570

565571
public void cancel() {
566-
if (this.canceled.compareAndSet(false, true)) {
572+
if (this.disposed.compareAndSet(false, true)) {
567573
if (!this.reading.get()) {
568574
closeChannel(this.channel);
569575
}
570576
}
571577
}
572578

573-
private boolean isCanceled() {
574-
return this.canceled.get();
579+
private boolean isNotDisposed() {
580+
return !this.disposed.get();
575581
}
576582

577583
}

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.io.ByteArrayOutputStream;
2020
import java.io.IOException;
21+
import java.io.UncheckedIOException;
2122
import java.net.URI;
23+
import java.nio.charset.StandardCharsets;
2224
import java.nio.file.Files;
2325
import java.time.Duration;
2426
import java.util.Arrays;
@@ -406,7 +408,6 @@ public void shouldSendLargeTextFile() throws IOException {
406408
prepareResponse(response -> {});
407409

408410
Resource resource = new ClassPathResource("largeTextFile.txt", getClass());
409-
byte[] expected = Files.readAllBytes(resource.getFile().toPath());
410411
Flux<DataBuffer> body = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096);
411412

412413
Mono<Void> result = this.webClient.post()
@@ -415,18 +416,21 @@ public void shouldSendLargeTextFile() throws IOException {
415416
.retrieve()
416417
.bodyToMono(Void.class);
417418

418-
StepVerifier.create(result).verifyComplete();
419+
StepVerifier.create(result)
420+
.expectComplete()
421+
.verify(Duration.ofSeconds(5));
419422

420423
expectRequest(request -> {
421-
ByteArrayOutputStream actual = new ByteArrayOutputStream();
424+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
422425
try {
423-
request.getBody().copyTo(actual);
426+
request.getBody().copyTo(bos);
427+
String actual = bos.toString("UTF-8");
428+
String expected = new String(Files.readAllBytes(resource.getFile().toPath()), StandardCharsets.UTF_8);
429+
assertEquals(expected, actual);
424430
}
425431
catch (IOException ex) {
426-
throw new IllegalStateException(ex);
432+
throw new UncheckedIOException(ex);
427433
}
428-
assertEquals(expected.length, actual.size());
429-
assertEquals(hash(expected), hash(actual.toByteArray()));
430434
});
431435
}
432436

0 commit comments

Comments
 (0)