Skip to content

Commit bd956ed

Browse files
committed
DataBuffer fixes in Protobuf codecs
Closes gh-22731
1 parent 2835424 commit bd956ed

File tree

4 files changed

+104
-45
lines changed

4 files changed

+104
-45
lines changed

spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,6 +77,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
7777

7878
private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>();
7979

80+
8081
private final ExtensionRegistry extensionRegistry;
8182

8283
private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;
@@ -114,8 +115,12 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
114115
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
115116
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
116117

118+
MessageDecoderFunction decoderFunction =
119+
new MessageDecoderFunction(elementType, this.maxMessageSize);
120+
117121
return Flux.from(inputStream)
118-
.flatMapIterable(new MessageDecoderFunction(elementType, this.maxMessageSize));
122+
.flatMapIterable(decoderFunction)
123+
.doOnTerminate(decoderFunction::discard);
119124
}
120125

121126
@Override
@@ -212,12 +217,13 @@ public Iterable<? extends Message> apply(DataBuffer input) {
212217
this.messageBytesToRead -= chunkBytesToRead;
213218

214219
if (this.messageBytesToRead == 0) {
215-
Message.Builder builder = getMessageBuilder(this.elementType.toClass());
216-
ByteBuffer buffer = this.output.asByteBuffer();
217-
builder.mergeFrom(CodedInputStream.newInstance(buffer), extensionRegistry);
218-
messages.add(builder.build());
220+
CodedInputStream stream = CodedInputStream.newInstance(this.output.asByteBuffer());
219221
DataBufferUtils.release(this.output);
220222
this.output = null;
223+
Message message = getMessageBuilder(this.elementType.toClass())
224+
.mergeFrom(stream, extensionRegistry)
225+
.build();
226+
messages.add(message);
221227
}
222228
} while (remainingBytesToRead > 0);
223229
return messages;
@@ -286,6 +292,12 @@ private boolean readMessageSize(DataBuffer input) {
286292
this.offset = 0;
287293
throw new DecodingException("Cannot parse message size: malformed varint");
288294
}
295+
296+
public void discard() {
297+
if (this.output != null) {
298+
DataBufferUtils.release(this.output);
299+
}
300+
}
289301
}
290302

291303
}

spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717
package org.springframework.http.codec.protobuf;
1818

1919
import java.io.IOException;
20-
import java.io.OutputStream;
2120
import java.util.Collections;
2221
import java.util.List;
2322
import java.util.Map;
@@ -31,6 +30,7 @@
3130
import org.springframework.core.ResolvableType;
3231
import org.springframework.core.io.buffer.DataBuffer;
3332
import org.springframework.core.io.buffer.DataBufferFactory;
33+
import org.springframework.core.io.buffer.DataBufferUtils;
3434
import org.springframework.http.MediaType;
3535
import org.springframework.http.codec.HttpMessageEncoder;
3636
import org.springframework.lang.Nullable;
@@ -73,26 +73,29 @@ public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType
7373
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory,
7474
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
7575

76-
return Flux
77-
.from(inputStream)
78-
.map(message -> encodeMessage(message, bufferFactory, !(inputStream instanceof Mono)));
79-
}
80-
81-
private DataBuffer encodeMessage(Message message, DataBufferFactory bufferFactory, boolean streaming) {
82-
DataBuffer buffer = bufferFactory.allocateBuffer();
83-
OutputStream outputStream = buffer.asOutputStream();
84-
try {
85-
if (streaming) {
86-
message.writeDelimitedTo(outputStream);
87-
}
88-
else {
89-
message.writeTo(outputStream);
90-
}
91-
return buffer;
92-
}
93-
catch (IOException ex) {
94-
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
95-
}
76+
return Flux.from(inputStream)
77+
.map(message -> {
78+
DataBuffer buffer = bufferFactory.allocateBuffer();
79+
boolean release = true;
80+
try {
81+
if (!(inputStream instanceof Mono)) {
82+
message.writeDelimitedTo(buffer.asOutputStream());
83+
}
84+
else {
85+
message.writeTo(buffer.asOutputStream());
86+
}
87+
release = false;
88+
return buffer;
89+
}
90+
catch (IOException ex) {
91+
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
92+
}
93+
finally {
94+
if (release) {
95+
DataBufferUtils.release(buffer);
96+
}
97+
}
98+
});
9699
}
97100

98101
@Override

spring-web/src/test/java/org/springframework/http/codec/CodecDataBufferLeakTests.java renamed to spring-web/src/test/java/org/springframework/http/codec/CancelWithoutDemandCodecTests.java

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.List;
2121
import java.util.function.Supplier;
2222

23+
import com.google.protobuf.Message;
24+
import org.junit.After;
2325
import org.junit.Test;
2426
import org.reactivestreams.Publisher;
2527
import org.reactivestreams.Subscription;
@@ -38,18 +40,28 @@
3840
import org.springframework.http.client.MultipartBodyBuilder;
3941
import org.springframework.http.codec.json.Jackson2JsonEncoder;
4042
import org.springframework.http.codec.multipart.MultipartHttpMessageWriter;
43+
import org.springframework.http.codec.protobuf.ProtobufDecoder;
44+
import org.springframework.http.codec.protobuf.ProtobufEncoder;
4145
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
46+
import org.springframework.protobuf.Msg;
47+
import org.springframework.protobuf.SecondMsg;
48+
import org.springframework.util.MimeType;
4249

4350
/**
4451
* Test scenarios for data buffer leaks.
4552
* @author Rossen Stoyanchev
46-
* @since 5.2
4753
*/
48-
public class CodecDataBufferLeakTests {
54+
public class CancelWithoutDemandCodecTests {
4955

5056
private final LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
5157

5258

59+
@After
60+
public void tearDown() throws Exception {
61+
this.bufferFactory.checkForLeaks();
62+
}
63+
64+
5365
@Test // gh-22107
5466
public void cancelWithEncoderHttpMessageWriterAndSingleValue() {
5567
CharSequenceEncoder encoder = CharSequenceEncoder.allMimeTypes();
@@ -58,8 +70,6 @@ public void cancelWithEncoderHttpMessageWriterAndSingleValue() {
5870

5971
writer.write(Mono.just("foo"), ResolvableType.forType(String.class), MediaType.TEXT_PLAIN,
6072
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
61-
62-
this.bufferFactory.checkForLeaks();
6373
}
6474

6575
@Test // gh-22107
@@ -73,8 +83,6 @@ public void cancelWithJackson() {
7383
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
7484
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
7585
subscriber.cancel();
76-
77-
this.bufferFactory.checkForLeaks();
7886
}
7987

8088
@Test // gh-22107
@@ -88,8 +96,39 @@ public void cancelWithJaxb2() {
8896
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
8997
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
9098
subscriber.cancel();
99+
}
91100

92-
this.bufferFactory.checkForLeaks();
101+
@Test // gh-22543
102+
public void cancelWithProtobufEncoder() {
103+
ProtobufEncoder encoder = new ProtobufEncoder();
104+
Msg msg = Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build();
105+
106+
Flux<DataBuffer> flux = encoder.encode(Mono.just(msg),
107+
this.bufferFactory, ResolvableType.forClass(Msg.class),
108+
new MimeType("application", "x-protobuf"), Collections.emptyMap());
109+
110+
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
111+
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
112+
subscriber.cancel();
113+
}
114+
115+
@Test // gh-22731
116+
public void cancelWithProtobufDecoder() throws InterruptedException {
117+
ProtobufDecoder decoder = new ProtobufDecoder();
118+
119+
Mono<DataBuffer> input = Mono.fromCallable(() -> {
120+
Msg msg = Msg.newBuilder().setFoo("Foo").build();
121+
byte[] bytes = msg.toByteArray();
122+
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
123+
buffer.write(bytes);
124+
return buffer;
125+
});
126+
127+
Flux<Message> messages = decoder.decode(input, ResolvableType.forType(Msg.class),
128+
new MimeType("application", "x-protobuf"), Collections.emptyMap());
129+
ZeroDemandMessageSubscriber subscriber = new ZeroDemandMessageSubscriber();
130+
messages.subscribe(subscriber);
131+
subscriber.cancel();
93132
}
94133

95134
@Test // gh-22107
@@ -104,8 +143,6 @@ public void cancelWithMultipartContent() {
104143

105144
writer.write(Mono.just(builder.build()), null, MediaType.MULTIPART_FORM_DATA,
106145
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
107-
108-
this.bufferFactory.checkForLeaks();
109146
}
110147

111148
@Test // gh-22107
@@ -116,8 +153,6 @@ public void cancelWithSse() {
116153

117154
writer.write(Mono.just(event), ResolvableType.forClass(ServerSentEvent.class), MediaType.TEXT_EVENT_STREAM,
118155
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
119-
120-
this.bufferFactory.checkForLeaks();
121156
}
122157

123158

@@ -183,4 +218,13 @@ protected void hookOnSubscribe(Subscription subscription) {
183218
// Just subscribe without requesting
184219
}
185220
}
221+
222+
223+
private static class ZeroDemandMessageSubscriber extends BaseSubscriber<Message> {
224+
225+
@Override
226+
protected void hookOnSubscribe(Subscription subscription) {
227+
// Just subscribe without requesting
228+
}
229+
}
186230
}

spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,10 +35,10 @@
3535
import org.springframework.protobuf.SecondMsg;
3636
import org.springframework.util.MimeType;
3737

38-
import static java.util.Collections.emptyMap;
38+
import static java.util.Collections.*;
3939
import static org.junit.Assert.*;
40-
import static org.springframework.core.ResolvableType.forClass;
41-
import static org.springframework.core.io.buffer.DataBufferUtils.release;
40+
import static org.springframework.core.ResolvableType.*;
41+
import static org.springframework.core.io.buffer.DataBufferUtils.*;
4242

4343
/**
4444
* Unit tests for {@link ProtobufDecoder}.
@@ -223,11 +223,11 @@ public void exceedMaxSize() {
223223
}
224224

225225
private Mono<DataBuffer> dataBuffer(Msg msg) {
226-
return Mono.defer(() -> {
226+
return Mono.fromCallable(() -> {
227227
byte[] bytes = msg.toByteArray();
228228
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
229229
buffer.write(bytes);
230-
return Mono.just(buffer);
230+
return buffer;
231231
});
232232
}
233233

0 commit comments

Comments
 (0)