Skip to content

Commit 5e3c5d4

Browse files
committed
Avoid collecting Flux elements in KotlinSerializationJsonEncoder
Closes gh-33428
1 parent 907859f commit 5e3c5d4

File tree

3 files changed

+101
-23
lines changed

3 files changed

+101
-23
lines changed

spring-web/src/main/java/org/springframework/http/codec/KotlinSerializationStringEncoder.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package org.springframework.http.codec;
1818

19+
import java.util.ArrayList;
1920
import java.util.Collection;
2021
import java.util.HashSet;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Set;
2425

25-
import kotlin.text.Charsets;
2626
import kotlinx.serialization.KSerializer;
2727
import kotlinx.serialization.StringFormat;
2828
import org.reactivestreams.Publisher;
@@ -52,6 +52,11 @@
5252
public abstract class KotlinSerializationStringEncoder<T extends StringFormat> extends KotlinSerializationSupport<T>
5353
implements Encoder<Object> {
5454

55+
private static final byte[] NEWLINE_SEPARATOR = {'\n'};
56+
57+
protected static final byte[] EMPTY_BYTES = new byte[0];
58+
59+
5560
// CharSequence encoding needed for now, see https://github.com/Kotlin/kotlinx.serialization/issues/204 for more details
5661
private final CharSequenceEncoder charSequenceEncoder = CharSequenceEncoder.allMimeTypes();
5762
private final Set<MimeType> streamingMediaTypes = new HashSet<>();
@@ -85,22 +90,40 @@ public List<MimeType> getEncodableMimeTypes(ResolvableType elementType) {
8590
return supportedMimeTypes();
8691
}
8792

88-
8993
@Override
9094
public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory,
91-
ResolvableType elementType,
92-
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
93-
if (inputStream instanceof Mono) {
94-
return Mono.from(inputStream)
95+
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
96+
97+
if (inputStream instanceof Mono<?> mono) {
98+
return mono
9599
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
96100
.flux();
97101
}
98-
99102
if (mimeType != null && this.streamingMediaTypes.contains(mimeType)) {
100103
return Flux.from(inputStream)
101-
.map(list -> encodeValue(list, bufferFactory, elementType, mimeType, hints)
102-
.write("\n", Charsets.UTF_8));
104+
.map(value -> encodeStreamingValue(value, bufferFactory, elementType, mimeType, hints, EMPTY_BYTES,
105+
NEWLINE_SEPARATOR));
103106
}
107+
return encodeNonStream(inputStream, bufferFactory, elementType, mimeType, hints);
108+
}
109+
110+
protected DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFactory,
111+
ResolvableType valueType, @Nullable MimeType mimeType,
112+
@Nullable Map<String, Object> hints, byte[] prefix, byte[] suffix) {
113+
114+
List<DataBuffer> buffers = new ArrayList<>(3);
115+
if (prefix.length > 0) {
116+
buffers.add(bufferFactory.allocateBuffer(prefix.length).write(prefix));
117+
}
118+
buffers.add(encodeValue(value, bufferFactory, valueType, mimeType, hints));
119+
if (suffix.length > 0) {
120+
buffers.add(bufferFactory.allocateBuffer(suffix.length).write(suffix));
121+
}
122+
return bufferFactory.join(buffers);
123+
}
124+
125+
protected Flux<DataBuffer> encodeNonStream(Publisher<?> inputStream, DataBufferFactory bufferFactory,
126+
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
104127

105128
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
106129
return Flux.from(inputStream)
@@ -109,7 +132,6 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
109132
.flux();
110133
}
111134

112-
113135
@Override
114136
public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
115137
ResolvableType valueType, @Nullable MimeType mimeType,

spring-web/src/main/java/org/springframework/http/codec/json/KotlinSerializationJsonEncoder.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@
1717
package org.springframework.http.codec.json;
1818

1919
import java.util.List;
20+
import java.util.Map;
2021

2122
import kotlinx.serialization.json.Json;
23+
import org.reactivestreams.Publisher;
24+
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.Mono;
2226

27+
import org.springframework.core.ResolvableType;
28+
import org.springframework.core.io.buffer.DataBuffer;
29+
import org.springframework.core.io.buffer.DataBufferFactory;
2330
import org.springframework.http.MediaType;
2431
import org.springframework.http.codec.KotlinSerializationStringEncoder;
32+
import org.springframework.lang.Nullable;
33+
import org.springframework.util.MimeType;
2534

2635
/**
2736
* Encode from an {@code Object} stream to a byte stream of JSON objects using
@@ -49,4 +58,38 @@ public KotlinSerializationJsonEncoder(Json json) {
4958
setStreamingMediaTypes(List.of(MediaType.APPLICATION_NDJSON));
5059
}
5160

61+
@Override
62+
public Flux<DataBuffer> encodeNonStream(Publisher<?> inputStream, DataBufferFactory bufferFactory,
63+
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
64+
65+
JsonArrayJoinHelper helper = new JsonArrayJoinHelper();
66+
return Flux.from(inputStream)
67+
.map(value -> encodeStreamingValue(value, bufferFactory, elementType, mimeType, hints,
68+
helper.getPrefix(), EMPTY_BYTES))
69+
.switchIfEmpty(Mono.fromCallable(() -> bufferFactory.wrap(helper.getPrefix())))
70+
.concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix())));
71+
}
72+
73+
74+
private static class JsonArrayJoinHelper {
75+
76+
private static final byte[] COMMA_SEPARATOR = {','};
77+
78+
private static final byte[] OPEN_BRACKET = {'['};
79+
80+
private static final byte[] CLOSE_BRACKET = {']'};
81+
82+
private boolean firstItemEmitted;
83+
84+
public byte[] getPrefix() {
85+
byte[] prefix = (this.firstItemEmitted ? COMMA_SEPARATOR : OPEN_BRACKET);
86+
this.firstItemEmitted = true;
87+
return prefix;
88+
}
89+
90+
public byte[] getSuffix() {
91+
return CLOSE_BRACKET;
92+
}
93+
}
94+
5295
}

spring-web/src/test/kotlin/org/springframework/http/codec/json/KotlinSerializationJsonEncoderTests.kt

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@ import org.junit.jupiter.api.Test
2222
import org.springframework.core.MethodParameter
2323
import org.springframework.core.Ordered
2424
import org.springframework.core.ResolvableType
25-
import org.springframework.core.io.buffer.DataBuffer
26-
import org.springframework.core.io.buffer.DataBufferUtils
2725
import org.springframework.core.testfixture.codec.AbstractEncoderTests
2826
import org.springframework.http.MediaType
2927
import org.springframework.http.codec.ServerSentEvent
3028
import reactor.core.publisher.Flux
3129
import reactor.core.publisher.Mono
32-
import reactor.test.StepVerifier.FirstStep
30+
import reactor.test.StepVerifier
3331
import java.math.BigDecimal
3432
import java.nio.charset.StandardCharsets
3533
import kotlin.reflect.jvm.javaMethod
@@ -72,15 +70,32 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests<KotlinSerializa
7270
Pojo("foofoofoo", "barbarbar")
7371
)
7472
testEncode(input, Pojo::class.java) {
75-
it.consumeNextWith(expectString("[" +
76-
"{\"foo\":\"foo\",\"bar\":\"bar\"}," +
77-
"{\"foo\":\"foofoo\",\"bar\":\"barbar\"}," +
78-
"{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}]")
79-
.andThen { dataBuffer -> DataBufferUtils.release(dataBuffer) })
73+
it.consumeNextWith(expectString("[{\"foo\":\"foo\",\"bar\":\"bar\"}"))
74+
.consumeNextWith(expectString(",{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"))
75+
.consumeNextWith(expectString(",{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"))
76+
.consumeNextWith(expectString("]"))
8077
.verifyComplete()
8178
}
8279
}
8380

81+
@Test
82+
fun encodeEmpty() {
83+
testEncode(Flux.empty(), Pojo::class.java) {
84+
it
85+
.consumeNextWith(expectString("["))
86+
.consumeNextWith(expectString("]"))
87+
.verifyComplete()
88+
}
89+
}
90+
91+
@Test
92+
fun encodeWithErrorAsFirstSignal() {
93+
val message = "I'm a teapot"
94+
val input = Flux.error<Any>(IllegalStateException(message))
95+
val output = encoder.encode(input, this.bufferFactory, ResolvableType.forClass(Pojo::class.java), null, null)
96+
StepVerifier.create(output).expectErrorMessage(message).verify()
97+
}
98+
8499
@Test
85100
fun encodeStream() {
86101
val input = Flux.just(
@@ -105,9 +120,8 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests<KotlinSerializa
105120
fun encodeMono() {
106121
val input = Mono.just(Pojo("foo", "bar"))
107122
testEncode(input, Pojo::class.java) {
108-
it.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}")
109-
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
110-
.verifyComplete()
123+
it.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}"))
124+
.verifyComplete()
111125
}
112126
}
113127

@@ -116,8 +130,7 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests<KotlinSerializa
116130
val input = Mono.just(mapOf("value" to null))
117131
val methodParameter = MethodParameter.forExecutable(::handleMapWithNullable::javaMethod.get()!!, -1)
118132
testEncode(input, ResolvableType.forMethodParameter(methodParameter), null, null) {
119-
it.consumeNextWith(expectString("{\"value\":null}")
120-
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
133+
it.consumeNextWith(expectString("{\"value\":null}"))
121134
.verifyComplete()
122135
}
123136
}

0 commit comments

Comments
 (0)