Skip to content

Commit fd6547b

Browse files
authored
Replace doFinally with Mono/Flux.using in Core libraries (Azure#36997)
Replace doFinally with Mono/Flux.using in Core libraries
1 parent 0e0e6f4 commit fd6547b

File tree

17 files changed

+127
-103
lines changed

17 files changed

+127
-103
lines changed

eng/versioning/external_dependencies.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ redis.clients:jedis;4.3.1
114114
io.lettuce:lettuce-core;6.2.0.RELEASE
115115
org.redisson:redisson;3.17.0
116116
net.bytebuddy:byte-buddy;1.12.23
117+
testdep_net.bytebuddy:byte-buddy;1.14.8
118+
testdep_net.bytebuddy:byte-buddy-agent;1.14.8
117119

118120
## Spring boot dependency versions
119121
org.springframework.boot:spring-boot-dependencies;2.7.14

sdk/core/azure-core-amqp/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,20 @@
114114
<version>4.11.0</version> <!-- {x-version-update;org.mockito:mockito-core;external_dependency} -->
115115
<scope>test</scope>
116116
</dependency>
117+
<!-- bytebuddy dependencies are required for mockito 4.11.0 to work with Java 21. Mockito 4.11.0 is the last release -->
118+
<!-- of Mockito supporting Java 8 as a baseline. -->
119+
<dependency>
120+
<groupId>net.bytebuddy</groupId>
121+
<artifactId>byte-buddy</artifactId>
122+
<version>1.14.8</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy;external_dependency} -->
123+
<scope>test</scope>
124+
</dependency>
125+
<dependency>
126+
<groupId>net.bytebuddy</groupId>
127+
<artifactId>byte-buddy-agent</artifactId>
128+
<version>1.14.8</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy-agent;external_dependency} -->
129+
<scope>test</scope>
130+
</dependency>
117131
<dependency>
118132
<groupId>com.azure</groupId>
119133
<artifactId>azure-core-test</artifactId>

sdk/core/azure-core-http-jdk-httpclient/src/main/java/com/azure/core/http/jdk/httpclient/JdkHttpResponseAsync.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ final class JdkHttpResponseAsync extends JdkHttpResponseBase {
2626

2727
@Override
2828
public Flux<ByteBuffer> getBody() {
29-
return this.contentFlux.doFinally(signalType -> disposed = true);
29+
return Flux.using(() -> this, ignored -> contentFlux, ignored -> disposed = true);
3030
}
3131

3232
@Override

sdk/core/azure-core-http-jdk-httpclient/src/main/java/com/azure/core/http/jdk/httpclient/JdkHttpResponseSync.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public Flux<ByteBuffer> getBody() {
4747
if (bodyBytes != null) {
4848
return Mono.fromSupplier(() -> ByteBuffer.wrap(bodyBytes)).flux();
4949
} else {
50-
return FluxUtil.toFluxByteBuffer(bodyStream).doFinally(ignored -> close());
50+
return Flux.using(() -> this, ignored -> FluxUtil.toFluxByteBuffer(bodyStream), JdkHttpResponseSync::close);
5151
}
5252
}
5353

sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.core.http.netty.implementation.AzureNettyHttpClientContext;
1212
import com.azure.core.http.netty.implementation.NettyAsyncHttpBufferedResponse;
1313
import com.azure.core.http.netty.implementation.NettyAsyncHttpResponse;
14+
import com.azure.core.http.netty.implementation.Utility;
1415
import com.azure.core.implementation.util.BinaryDataContent;
1516
import com.azure.core.implementation.util.BinaryDataHelper;
1617
import com.azure.core.implementation.util.ByteArrayContent;
@@ -54,8 +55,6 @@
5455
import java.util.Objects;
5556
import java.util.function.BiFunction;
5657

57-
import static com.azure.core.http.netty.implementation.Utility.closeConnection;
58-
5958
/**
6059
* This class provides a Netty-based implementation for the {@link HttpClient} interface. Creating an instance of this
6160
* class can be achieved by using the {@link NettyAsyncHttpClientBuilder} class, which offers Netty-specific API for
@@ -304,15 +303,14 @@ private static BiFunction<HttpClientResponse, Connection, Mono<Tuple2<HttpRespon
304303
// For now, eagerlyReadResponse and ignoreResponseBody works the same.
305304
// if (ignoreResponseBody) {
306305
// AtomicBoolean firstNext = new AtomicBoolean(true);
307-
// return reactorNettyConnection.inbound().receive()
306+
// return Mono.using(() -> reactorNettyConnection, connection -> connection.inbound().receive()
308307
// .doOnNext(ignored -> {
309308
// if (!firstNext.compareAndSet(true, false)) {
310309
// LOGGER.log(LogLevel.WARNING, () -> "Received HTTP response body when one wasn't expected. "
311310
// + "Response body will be ignored as directed.");
312311
// }
313312
// })
314-
// .ignoreElements()
315-
// .doFinally(ignored -> closeConnection(reactorNettyConnection))
313+
// .ignoreElements(), Utility::closeConnection)
316314
// .then(Mono.fromSupplier(() -> new NettyAsyncHttpBufferedResponse(reactorNettyResponse, restRequest,
317315
// EMPTY_BYTES, headersEagerlyConverted)));
318316
// }
@@ -323,11 +321,11 @@ private static BiFunction<HttpClientResponse, Connection, Mono<Tuple2<HttpRespon
323321
*/
324322
if (eagerlyReadResponse || ignoreResponseBody) {
325323
// Set up the body flux and dispose the connection once it has been received.
326-
return reactorNettyConnection.inbound().receive().aggregate().asByteArray()
327-
.doFinally(ignored -> closeConnection(reactorNettyConnection))
324+
return Mono.using(() -> reactorNettyConnection, connection -> connection.inbound().receive()
325+
.aggregate().asByteArray()
328326
.switchIfEmpty(Mono.just(EMPTY_BYTES))
329327
.map(bytes -> Tuples.of(new NettyAsyncHttpBufferedResponse(reactorNettyResponse, restRequest, bytes,
330-
headersEagerlyConverted), reactorNettyResponse.responseHeaders()));
328+
headersEagerlyConverted), reactorNettyResponse.responseHeaders())), Utility::closeConnection);
331329
} else {
332330
return Mono.just(Tuples.of(new NettyAsyncHttpResponse(reactorNettyResponse, reactorNettyConnection,
333331
restRequest, disableBufferCopy, headersEagerlyConverted), reactorNettyResponse.responseHeaders()));

sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/NettyAsyncHttpResponse.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@ public NettyAsyncHttpResponse(HttpClientResponse reactorNettyResponse, Connectio
3838

3939
@Override
4040
public Flux<ByteBuffer> getBody() {
41-
return bodyIntern()
42-
.map(byteBuf -> this.disableBufferCopy ? byteBuf.nioBuffer() : deepCopyBuffer(byteBuf))
43-
.doFinally(ignored -> close());
41+
return Flux.using(() -> this, response -> response.bodyIntern()
42+
.map(byteBuf -> this.disableBufferCopy ? byteBuf.nioBuffer() : deepCopyBuffer(byteBuf)),
43+
NettyAsyncHttpResponse::close);
4444
}
4545

4646
@Override
4747
public Mono<byte[]> getBodyAsByteArray() {
48-
return bodyIntern().aggregate().asByteArray().doFinally(ignored -> close());
48+
return Mono.using(() -> this, response -> response.bodyIntern().aggregate().asByteArray(),
49+
NettyAsyncHttpResponse::close);
4950
}
5051

5152
@Override
@@ -56,12 +57,14 @@ public Mono<String> getBodyAsString() {
5657

5758
@Override
5859
public Mono<String> getBodyAsString(Charset charset) {
59-
return bodyIntern().aggregate().asString(charset).doFinally(ignored -> close());
60+
return Mono.using(() -> this, response -> response.bodyIntern().aggregate().asString(charset),
61+
NettyAsyncHttpResponse::close);
6062
}
6163

6264
@Override
6365
public Mono<InputStream> getBodyAsInputStream() {
64-
return bodyIntern().aggregate().asInputStream().doFinally(ignored -> close());
66+
return Mono.using(() -> this, response -> response.bodyIntern().aggregate().asInputStream(),
67+
NettyAsyncHttpResponse::close);
6568
}
6669

6770
@Override
@@ -89,10 +92,9 @@ public void writeBodyTo(WritableByteChannel channel) {
8992
// complete. This introduces a previously seen, but in a different flavor, race condition where the write
9093
// operation gets scheduled on one thread and the ByteBuf release happens on another, leaving the write
9194
// operation racing to complete before the release happens. With all that said, leave this as subscribeOn.
92-
Mono.<Void>create(sink -> bodyIntern().subscribe(
95+
Mono.using(() -> this, response -> Mono.<Void>create(sink -> response.bodyIntern().subscribe(
9396
new ByteBufWriteSubscriber(channel::write, sink, getContentLength())))
94-
.subscribeOn(Schedulers.boundedElastic())
95-
.doFinally(ignored -> close())
97+
.subscribeOn(Schedulers.boundedElastic()), NettyAsyncHttpResponse::close)
9698
.block();
9799
}
98100

sdk/core/azure-core-http-vertx/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,20 @@
141141
<version>4.11.0</version><!-- {x-version-update;org.mockito:mockito-inline;external_dependency} -->
142142
<scope>test</scope>
143143
</dependency>
144+
<!-- bytebuddy dependencies are required for mockito 4.11.0 to work with Java 21. Mockito 4.11.0 is the last release -->
145+
<!-- of Mockito supporting Java 8 as a baseline. -->
146+
<dependency>
147+
<groupId>net.bytebuddy</groupId>
148+
<artifactId>byte-buddy</artifactId>
149+
<version>1.14.8</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy;external_dependency} -->
150+
<scope>test</scope>
151+
</dependency>
152+
<dependency>
153+
<groupId>net.bytebuddy</groupId>
154+
<artifactId>byte-buddy-agent</artifactId>
155+
<version>1.14.8</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy-agent;external_dependency} -->
156+
<scope>test</scope>
157+
</dependency>
144158
</dependencies>
145159

146160
<build>

sdk/core/azure-core-test/src/main/java/com/azure/core/test/http/HttpClientTests.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2366,16 +2366,14 @@ public void simpleDownloadTest(Context context) {
23662366
@MethodSource("downloadTestArgumentProvider")
23672367
public void simpleDownloadTestAsync(Context context) {
23682368
StepVerifier.create(createService(DownloadService.class).getBytesAsync(getRequestUri(), context)
2369-
.flatMap(response -> response.getValue().map(ByteBuffer::remaining)
2370-
.reduce(0, Integer::sum)
2371-
.doFinally(ignore -> response.close())))
2369+
.flatMap(response -> Mono.using(() -> response, r -> r.getValue().map(ByteBuffer::remaining)
2370+
.reduce(0, Integer::sum), StreamResponse::close)))
23722371
.assertNext(count -> assertEquals(30720, count))
23732372
.verifyComplete();
23742373

23752374
StepVerifier.create(createService(DownloadService.class).getBytesAsync(getRequestUri(), context)
2376-
.flatMap(response -> Mono.zip(MessageDigestUtils.md5(response.getValue()),
2377-
Mono.just(response.getHeaders().getValue(HttpHeaderName.ETAG)))
2378-
.doFinally(ignore -> response.close())))
2375+
.flatMap(response -> Mono.using(() -> response, r -> Mono.zip(MessageDigestUtils.md5(r.getValue()),
2376+
Mono.just(r.getHeaders().getValue(HttpHeaderName.ETAG))), StreamResponse::close)))
23792377
.assertNext(hashTuple -> assertEquals(hashTuple.getT2(), hashTuple.getT1()))
23802378
.verifyComplete();
23812379
}
@@ -2433,12 +2431,13 @@ public void streamResponseCanTransferBodyAsync(Context context) throws IOExcepti
24332431
() -> IOUtils.toAsynchronousByteChannel(AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE), 0),
24342432
streamResponse::writeValueToAsync,
24352433
channel -> {
2434+
streamResponse.close();
24362435
try {
24372436
channel.close();
24382437
} catch (IOException e) {
24392438
throw Exceptions.propagate(e);
24402439
}
2441-
}).doFinally(ignored -> streamResponse.close())
2440+
})
24422441
.then(Mono.just(streamResponse.getHeaders().getValue(HttpHeaderName.ETAG)))))
24432442
.assertNext(hash -> {
24442443
try {

sdk/core/azure-core/src/main/java/com/azure/core/credential/SimpleTokenCache.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ public Mono<AccessToken> getToken() {
8585
// cache hasn't expired, ignore refresh error this time
8686
fallback = Mono.just(cache);
8787
}
88-
return tokenRefresh
89-
.materialize()
88+
89+
return Mono.using(() -> wip, ignored -> tokenRefresh.materialize()
9090
.flatMap(signal -> {
9191
AccessToken accessToken = signal.get();
9292
Throwable error = signal.getThrowable();
@@ -107,8 +107,7 @@ public Mono<AccessToken> getToken() {
107107
return fallback;
108108
}
109109
})
110-
.doOnError(sinksOne::tryEmitError)
111-
.doFinally(ignored -> wip.set(null));
110+
.doOnError(sinksOne::tryEmitError), w -> w.set(null));
112111
} else if (cache != null && !cache.isExpired()) {
113112
// another thread might be refreshing the token proactively, but the current token is still valid
114113
return Mono.just(cache);

sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ public Mono<Void> writeBodyToAsync(AsynchronousByteChannel channel) {
175175
public void writeBodyTo(WritableByteChannel channel) throws IOException {
176176
Flux<ByteBuffer> body = getBody();
177177
if (body != null) {
178-
FluxUtil.writeToWritableByteChannel(body, channel)
179-
.doFinally(ignored -> close())
178+
Mono.using(() -> this, ignored -> FluxUtil.writeToWritableByteChannel(body, channel), HttpResponse::close)
180179
.block();
181180
}
182181
}

0 commit comments

Comments
 (0)