Skip to content

Commit fa9dacc

Browse files
authored
Fixes ByteBuf Leak during Edge Cases of writeBodyTo (Azure#30670)
Fixes ByteBuf Leak during Edge Cases of writeBodyTo
1 parent a41bebe commit fa9dacc

File tree

3 files changed

+127
-9
lines changed

3 files changed

+127
-9
lines changed

sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/NettyAsyncHttpResponse.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ public NettyAsyncHttpResponse(HttpClientResponse reactorNettyResponse, Connectio
4040

4141
@Override
4242
public Flux<ByteBuffer> getBody() {
43-
return bodyIntern().doFinally(ignored -> close())
44-
.map(byteBuf -> this.disableBufferCopy ? byteBuf.nioBuffer() : deepCopyBuffer(byteBuf));
43+
return bodyIntern()
44+
.map(byteBuf -> this.disableBufferCopy ? byteBuf.nioBuffer() : deepCopyBuffer(byteBuf))
45+
.doFinally(ignored -> close());
4546
}
4647

4748
@Override
@@ -61,7 +62,7 @@ public Mono<String> getBodyAsString(Charset charset) {
6162

6263
@Override
6364
public Mono<InputStream> getBodyAsInputStream() {
64-
return bodyIntern().aggregate().asInputStream();
65+
return bodyIntern().aggregate().asInputStream().doFinally(ignored -> close());
6566
}
6667

6768
@Override
@@ -70,13 +71,28 @@ public Mono<Void> writeBodyToAsync(AsynchronousByteChannel channel) {
7071
.flatMapSequential(nettyBuffer ->
7172
FluxUtil.writeToAsynchronousByteChannel(Flux.just(nettyBuffer.nioBuffer()), channel)
7273
.doFinally(ignored -> nettyBuffer.release()), 1, 1)
74+
.doFinally(ignored -> close())
7375
.then();
7476
}
7577

7678
@Override
7779
public void writeBodyTo(WritableByteChannel channel) {
78-
bodyIntern().retain()
79-
.publishOn(Schedulers.boundedElastic())
80+
// Since this uses a synchronous write this doesn't need to retain the ByteBuf as the async version does.
81+
// In fact, if retain is used here and there is a cancellation or exception during writing this will likely
82+
// leak ByteBufs as it doesn't have the asynchronous stream handling to properly close them when errors happen
83+
// during writing.
84+
//
85+
// This also must use subscribeOn rather than publishOn as subscribeOn will have the entire asynchronous chain
86+
// run on the same subscriber where publishOn only affects reactive operations after the publishOn. While this
87+
// doesn't seem like it would make much of a difference as this entire call will be blocked it fixes a race
88+
// condition. reactor-netty's inbound receive uses a ByteBuf pool to handle the network response and the
89+
// returned ByteBufFlux is configured to release the ByteBuf back to the pool when the onNext operation
90+
// completes. Unfortunately, when publishOn is used each ByteBuf must be scheduled in the publisher thread using
91+
// a future and the publishOn onNext handler will complete once that operation is scheduled, not when it's
92+
// complete. This introduces a previously seen, but in a different flavor, race condition where the write
93+
// operation gets scheduled on one thread and the ByteBuf release happens on another, leaving the write
94+
// operation racing to complete before the release happens. With all that said, leave this as subscribeOn.
95+
bodyIntern().subscribeOn(Schedulers.boundedElastic())
8096
.map(nettyBuffer -> {
8197
try {
8298
ByteBuffer nioBuffer = nettyBuffer.nioBuffer();
@@ -86,10 +102,10 @@ public void writeBodyTo(WritableByteChannel channel) {
86102
return nettyBuffer;
87103
} catch (IOException e) {
88104
throw Exceptions.propagate(e);
89-
} finally {
90-
nettyBuffer.release();
91105
}
92-
}).then().block();
106+
})
107+
.doFinally(ignored -> close())
108+
.then().block();
93109
}
94110

95111
@Override

sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,17 @@
2424
import reactor.core.scheduler.Schedulers;
2525
import reactor.test.StepVerifier;
2626

27+
import java.io.IOException;
28+
import java.nio.ByteBuffer;
29+
import java.nio.channels.AsynchronousByteChannel;
30+
import java.nio.channels.CompletionHandler;
31+
import java.nio.channels.WritableByteChannel;
2732
import java.security.SecureRandom;
2833
import java.time.Duration;
2934
import java.util.Collection;
35+
import java.util.concurrent.CompletableFuture;
3036
import java.util.concurrent.ConcurrentLinkedDeque;
37+
import java.util.concurrent.Future;
3138
import java.util.concurrent.atomic.AtomicInteger;
3239
import java.util.function.Function;
3340

@@ -106,6 +113,99 @@ public void closeHttpResponseWithConsumingPartialBody() {
106113
runScenario(response -> response.getBody().next().flatMap(ignored -> Mono.fromRunnable(response::close)));
107114
}
108115

116+
@Test
117+
public void closeHttpResponseWithConsumingPartialWriteAsync() {
118+
runScenario(response -> response.writeBodyToAsync(new ThrowingAsynchronousByteChannel())
119+
.onErrorResume(throwable -> Mono.empty()));
120+
}
121+
122+
private static final class ThrowingAsynchronousByteChannel implements AsynchronousByteChannel {
123+
private boolean open = true;
124+
int writeCount = 0;
125+
126+
@Override
127+
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
128+
}
129+
130+
@Override
131+
public Future<Integer> read(ByteBuffer dst) {
132+
return null;
133+
}
134+
135+
@Override
136+
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
137+
if (writeCount++ < 3) {
138+
int remaining = src.remaining();
139+
src.position(src.position() + remaining);
140+
handler.completed(remaining, attachment);
141+
} else {
142+
handler.failed(new IOException(), attachment);
143+
}
144+
}
145+
146+
@Override
147+
public Future<Integer> write(ByteBuffer src) {
148+
if (writeCount++ < 3) {
149+
int remaining = src.remaining();
150+
src.position(src.position() + remaining);
151+
152+
return CompletableFuture.completedFuture(remaining);
153+
} else {
154+
CompletableFuture<Integer> failed = new CompletableFuture<>();
155+
failed.completeExceptionally(new IOException());
156+
return failed;
157+
}
158+
}
159+
160+
@Override
161+
public boolean isOpen() {
162+
return open;
163+
}
164+
165+
@Override
166+
public void close() {
167+
open = false;
168+
}
169+
}
170+
171+
@Test
172+
public void closeHttpResponseWithConsumingPartialWrite() {
173+
runScenario(response -> {
174+
try {
175+
response.writeBodyTo(new ThrowingWritableByteChannel());
176+
} catch (Exception ignored) {
177+
}
178+
179+
return Mono.empty();
180+
});
181+
}
182+
183+
private static final class ThrowingWritableByteChannel implements WritableByteChannel {
184+
private boolean open = true;
185+
int writeCount = 0;
186+
187+
@Override
188+
public int write(ByteBuffer src) throws IOException {
189+
if (writeCount++ < 3) {
190+
int remaining = src.remaining();
191+
src.position(src.position() + remaining);
192+
return remaining;
193+
} else {
194+
throw new IOException();
195+
}
196+
}
197+
198+
@Override
199+
public boolean isOpen() {
200+
return open;
201+
}
202+
203+
@Override
204+
public void close() throws IOException {
205+
open = false;
206+
}
207+
}
208+
109209
@Test
110210
public void closeHttpResponseWithConsumingFullBody() {
111211
runScenario(response -> response.getBodyAsByteArray().flatMap(ignored -> Mono.fromRunnable(response::close)));

sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ public Mono<Void> writeBodyToAsync(AsynchronousByteChannel channel) {
163163
public void writeBodyTo(WritableByteChannel channel) throws IOException {
164164
Flux<ByteBuffer> body = getBody();
165165
if (body != null) {
166-
FluxUtil.writeToWritableByteChannel(body, channel).block();
166+
FluxUtil.writeToWritableByteChannel(body, channel)
167+
.doFinally(ignored -> close())
168+
.block();
167169
}
168170
}
169171

0 commit comments

Comments
 (0)