Skip to content

Commit 78d1591

Browse files
committed
Merge branch '5.2.x'
2 parents b1da893 + f35903f commit 78d1591

File tree

6 files changed

+77
-36
lines changed

6 files changed

+77
-36
lines changed

spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import org.springframework.core.ResolvableType;
2525
import org.springframework.core.io.buffer.DataBuffer;
2626
import org.springframework.core.io.buffer.DataBufferFactory;
27+
import org.springframework.core.io.buffer.DataBufferUtils;
2728
import org.springframework.core.io.buffer.PooledDataBuffer;
2829
import org.springframework.lang.Nullable;
2930
import org.springframework.util.MimeType;
@@ -51,7 +52,7 @@ public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBuf
5152
return Flux.from(inputStream)
5253
.take(1)
5354
.concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
54-
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
55+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
5556
}
5657

5758
/**

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,9 @@
3939
import java.util.concurrent.atomic.AtomicReference;
4040
import java.util.function.Consumer;
4141

42+
import io.netty.util.IllegalReferenceCountException;
43+
import org.apache.commons.logging.Log;
44+
import org.apache.commons.logging.LogFactory;
4245
import org.reactivestreams.Publisher;
4346
import org.reactivestreams.Subscription;
4447
import reactor.core.publisher.BaseSubscriber;
@@ -60,6 +63,8 @@
6063
*/
6164
public abstract class DataBufferUtils {
6265

66+
private final static Log logger = LogFactory.getLog(DataBufferUtils.class);
67+
6368
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
6469

6570

@@ -494,7 +499,15 @@ public static boolean release(@Nullable DataBuffer dataBuffer) {
494499
if (dataBuffer instanceof PooledDataBuffer) {
495500
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
496501
if (pooledDataBuffer.isAllocated()) {
497-
return pooledDataBuffer.release();
502+
try {
503+
return pooledDataBuffer.release();
504+
}
505+
catch (IllegalReferenceCountException ex) {
506+
if (logger.isDebugEnabled()) {
507+
logger.debug("RefCount already at 0", ex);
508+
}
509+
return false;
510+
}
498511
}
499512
}
500513
return false;
@@ -523,7 +536,6 @@ public static Consumer<DataBuffer> releaseConsumer() {
523536
* @return a buffer that is composed from the {@code dataBuffers} argument
524537
* @since 5.0.3
525538
*/
526-
@SuppressWarnings("unchecked")
527539
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
528540
return join(dataBuffers, -1);
529541
}

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,15 +17,13 @@
1717
package org.springframework.http.client.reactive;
1818

1919
import java.net.URI;
20+
import java.util.concurrent.atomic.AtomicReference;
2021
import java.util.function.Function;
2122

22-
import io.netty.buffer.ByteBufAllocator;
2323
import reactor.core.publisher.Mono;
24-
import reactor.netty.NettyInbound;
2524
import reactor.netty.NettyOutbound;
2625
import reactor.netty.http.client.HttpClient;
2726
import reactor.netty.http.client.HttpClientRequest;
28-
import reactor.netty.http.client.HttpClientResponse;
2927
import reactor.netty.resources.ConnectionProvider;
3028
import reactor.netty.resources.LoopResources;
3129

@@ -104,12 +102,23 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
104102
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
105103
}
106104

105+
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
106+
107107
return this.httpClient
108108
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
109109
.uri(uri.toString())
110110
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
111-
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
112-
.next();
111+
.responseConnection((response, connection) -> {
112+
responseRef.set(new ReactorClientHttpResponse(response, connection));
113+
return Mono.just((ClientHttpResponse) responseRef.get());
114+
})
115+
.next()
116+
.doOnCancel(() -> {
117+
ReactorClientHttpResponse response = responseRef.get();
118+
if (response != null && response.bodyNotSubscribed()) {
119+
response.getConnection().dispose();
120+
}
121+
});
113122
}
114123

115124
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request,
@@ -118,10 +127,4 @@ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpCl
118127
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
119128
}
120129

121-
private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
122-
ByteBufAllocator allocator) {
123-
124-
return new ReactorClientHttpResponse(response, nettyInbound, allocator);
125-
}
126-
127130
}

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
package org.springframework.http.client.reactive;
1818

1919
import java.util.Collection;
20-
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.function.BiFunction;
2122

22-
import io.netty.buffer.ByteBufAllocator;
2323
import reactor.core.publisher.Flux;
24+
import reactor.netty.Connection;
2425
import reactor.netty.NettyInbound;
2526
import reactor.netty.http.client.HttpClientResponse;
2627

@@ -48,16 +49,24 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
4849

4950
private final NettyDataBufferFactory bufferFactory;
5051

52+
private final Connection connection;
53+
5154
private final HttpHeaders headers;
5255

53-
private final AtomicBoolean rejectSubscribers = new AtomicBoolean();
56+
// 0 - not subscribed, 1 - subscribed, 2 - cancelled
57+
private final AtomicInteger state = new AtomicInteger(0);
5458

5559

56-
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
60+
/**
61+
* Constructor that matches the inputs from
62+
* {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}.
63+
* @since 5.3
64+
*/
65+
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
5766
this.response = response;
58-
this.inbound = inbound;
59-
this.bufferFactory = new NettyDataBufferFactory(alloc);
60-
67+
this.inbound = connection.inbound();
68+
this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
69+
this.connection = connection;
6170
MultiValueMap<String, String> adapter = new NettyHeadersAdapter(response.responseHeaders());
6271
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
6372
}
@@ -67,17 +76,17 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou
6776
public Flux<DataBuffer> getBody() {
6877
return this.inbound.receive()
6978
.doOnSubscribe(s -> {
70-
if (this.rejectSubscribers.get()) {
71-
throw new IllegalStateException("The client response body can only be consumed once.");
79+
if (!this.state.compareAndSet(0, 1)) {
80+
// https://github.com/reactor/reactor-netty/issues/503
81+
// FluxReceive rejects multiple subscribers, but not after a cancel().
82+
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
83+
// So we need to reject once in cancelled state.
84+
if (this.state.get() == 2) {
85+
throw new IllegalStateException("The client response body can only be consumed once.");
86+
}
7287
}
7388
})
74-
.doOnCancel(() ->
75-
// https://github.com/reactor/reactor-netty/issues/503
76-
// FluxReceive rejects multiple subscribers, but not after a cancel().
77-
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
78-
// So we need to intercept and reject them in that case.
79-
this.rejectSubscribers.set(true)
80-
)
89+
.doOnCancel(() -> this.state.compareAndSet(1, 2))
8190
.map(byteBuf -> {
8291
byteBuf.retain();
8392
return this.bufferFactory.wrap(byteBuf);
@@ -114,6 +123,20 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
114123
return CollectionUtils.unmodifiableMultiValueMap(result);
115124
}
116125

126+
/**
127+
* For use by {@link ReactorClientHttpConnector}.
128+
*/
129+
boolean bodyNotSubscribed() {
130+
return this.state.get() == 0;
131+
}
132+
133+
/**
134+
* For use by {@link ReactorClientHttpConnector}.
135+
*/
136+
Connection getConnection() {
137+
return this.connection;
138+
}
139+
117140
@Override
118141
public String toString() {
119142
return "ReactorClientHttpResponse{" +

spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.core.codec.Encoder;
3030
import org.springframework.core.codec.Hints;
3131
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DataBufferUtils;
3233
import org.springframework.core.io.buffer.PooledDataBuffer;
3334
import org.springframework.http.HttpLogging;
3435
import org.springframework.http.MediaType;
@@ -126,13 +127,13 @@ public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType eleme
126127
.flatMap(buffer -> {
127128
message.getHeaders().setContentLength(buffer.readableByteCount());
128129
return message.writeWith(Mono.just(buffer)
129-
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
130+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
130131
});
131132
}
132133

133134
if (isStreamingMediaType(contentType)) {
134135
return message.writeAndFlushWith(body.map(buffer ->
135-
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
136+
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)));
136137
}
137138

138139
return message.writeWith(body);

spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.core.io.Resource;
3939
import org.springframework.core.io.buffer.DataBuffer;
4040
import org.springframework.core.io.buffer.DataBufferFactory;
41+
import org.springframework.core.io.buffer.DataBufferUtils;
4142
import org.springframework.core.io.buffer.PooledDataBuffer;
4243
import org.springframework.core.log.LogFormatUtils;
4344
import org.springframework.http.HttpEntity;
@@ -206,7 +207,7 @@ private Mono<Void> writeMultipart(MultiValueMap<String, ?> map,
206207
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
207208
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
208209
.concatWith(generateLastLine(boundary, bufferFactory))
209-
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
210+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
210211

211212
return outputMessage.writeWith(body);
212213
}

0 commit comments

Comments
 (0)