Skip to content

Commit 6bae759

Browse files
committed
Use Jackson SequenceWriter for streaming
Before this commit, the AbstractJackson2Encoder instantiated a ObjectWriter per value. This is not an issue for single values or non-streaming scenarios (which effectively are the same, because in the latter values are collected into a list until offered to Jackson). However, this does create a problem for SMILE, because it allows for shared references that do not match up when writing each value with a new ObjectWriter, resulting in errors parsing the result. This commit uses Jackson's SequenceWriter for streaming scenarios, allowing Jackson to reuse the same context for writing multiple values, fixing the issue described above. Closes gh-24198
1 parent e1e8c16 commit 6bae759

File tree

2 files changed

+160
-101
lines changed

2 files changed

+160
-101
lines changed

spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java

Lines changed: 116 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717
package org.springframework.http.codec.json;
1818

1919
import java.io.IOException;
20-
import java.io.OutputStream;
2120
import java.lang.annotation.Annotation;
2221
import java.nio.charset.Charset;
2322
import java.util.ArrayList;
@@ -29,9 +28,11 @@
2928
import com.fasterxml.jackson.core.JsonEncoding;
3029
import com.fasterxml.jackson.core.JsonGenerator;
3130
import com.fasterxml.jackson.core.JsonProcessingException;
31+
import com.fasterxml.jackson.core.util.ByteArrayBuilder;
3232
import com.fasterxml.jackson.databind.JavaType;
3333
import com.fasterxml.jackson.databind.ObjectMapper;
3434
import com.fasterxml.jackson.databind.ObjectWriter;
35+
import com.fasterxml.jackson.databind.SequenceWriter;
3536
import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
3637
import org.reactivestreams.Publisher;
3738
import reactor.core.publisher.Flux;
@@ -44,7 +45,6 @@
4445
import org.springframework.core.codec.Hints;
4546
import org.springframework.core.io.buffer.DataBuffer;
4647
import org.springframework.core.io.buffer.DataBufferFactory;
47-
import org.springframework.core.io.buffer.DataBufferUtils;
4848
import org.springframework.core.log.LogFormatUtils;
4949
import org.springframework.http.MediaType;
5050
import org.springframework.http.codec.HttpMessageEncoder;
@@ -115,65 +115,53 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
115115
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
116116
Assert.notNull(elementType, "'elementType' must not be null");
117117

118-
JsonEncoding encoding = getJsonEncoding(mimeType);
119-
120118
if (inputStream instanceof Mono) {
121-
return Mono.from(inputStream).map(value ->
122-
encodeValue(value, mimeType, bufferFactory, elementType, hints, encoding)).flux();
119+
return Mono.from(inputStream)
120+
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
121+
.flux();
123122
}
124123
else {
125-
return this.streamingMediaTypes.stream()
126-
.filter(mediaType -> mediaType.isCompatibleWith(mimeType))
127-
.findFirst()
128-
.map(mediaType -> {
129-
byte[] separator = STREAM_SEPARATORS.getOrDefault(mediaType, NEWLINE_SEPARATOR);
130-
return Flux.from(inputStream).map(value -> {
131-
DataBuffer buffer = encodeValue(
132-
value, mimeType, bufferFactory, elementType, hints, encoding);
133-
if (separator != null) {
134-
buffer.write(separator);
135-
}
136-
return buffer;
137-
});
138-
})
139-
.orElseGet(() -> {
140-
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
141-
return Flux.from(inputStream).collectList().map(list ->
142-
encodeValue(list, mimeType, bufferFactory, listType, hints, encoding)).flux();
143-
});
144-
}
145-
}
146-
147-
private DataBuffer encodeValue(Object value, @Nullable MimeType mimeType, DataBufferFactory bufferFactory,
148-
ResolvableType elementType, @Nullable Map<String, Object> hints, JsonEncoding encoding) {
124+
byte[] separator = streamSeparator(mimeType);
125+
if (separator != null) { // streaming
126+
try {
127+
ObjectWriter writer = createObjectWriter(elementType, mimeType, hints);
128+
ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler());
129+
JsonEncoding encoding = getJsonEncoding(mimeType);
130+
JsonGenerator generator = getObjectMapper().getFactory().createGenerator(byteBuilder, encoding);
131+
SequenceWriter sequenceWriter = writer.writeValues(generator);
132+
133+
return Flux.from(inputStream)
134+
.map(value -> encodeStreamingValue(value, bufferFactory, hints, sequenceWriter, byteBuilder,
135+
separator));
136+
}
137+
catch (IOException ex) {
138+
return Flux.error(ex);
139+
}
140+
}
141+
else { // non-streaming
142+
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
143+
return Flux.from(inputStream)
144+
.collectList()
145+
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints))
146+
.flux();
147+
}
149148

150-
if (!Hints.isLoggingSuppressed(hints)) {
151-
LogFormatUtils.traceDebug(logger, traceOn -> {
152-
String formatted = LogFormatUtils.formatValue(value, !traceOn);
153-
return Hints.getLogPrefix(hints) + "Encoding [" + formatted + "]";
154-
});
155149
}
150+
}
156151

157-
JavaType javaType = getJavaType(elementType.getType(), null);
158-
Class<?> jsonView = (hints != null ? (Class<?>) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null);
159-
ObjectWriter writer = (jsonView != null ?
160-
getObjectMapper().writerWithView(jsonView) : getObjectMapper().writer());
152+
public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
153+
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
161154

162-
if (javaType.isContainerType()) {
163-
writer = writer.forType(javaType);
164-
}
165-
166-
writer = customizeWriter(writer, mimeType, elementType, hints);
155+
ObjectWriter writer = createObjectWriter(valueType, mimeType, hints);
156+
ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler());
157+
JsonEncoding encoding = getJsonEncoding(mimeType);
167158

168-
DataBuffer buffer = bufferFactory.allocateBuffer();
169-
boolean release = true;
170-
OutputStream outputStream = buffer.asOutputStream();
159+
logValue(hints, value);
171160

172161
try {
173-
JsonGenerator generator = getObjectMapper().getFactory().createGenerator(outputStream, encoding);
162+
JsonGenerator generator = getObjectMapper().getFactory().createGenerator(byteBuilder, encoding);
174163
writer.writeValue(generator, value);
175164
generator.flush();
176-
release = false;
177165
}
178166
catch (InvalidDefinitionException ex) {
179167
throw new CodecException("Type definition error: " + ex.getType(), ex);
@@ -182,24 +170,97 @@ private DataBuffer encodeValue(Object value, @Nullable MimeType mimeType, DataBu
182170
throw new EncodingException("JSON encoding error: " + ex.getOriginalMessage(), ex);
183171
}
184172
catch (IOException ex) {
185-
throw new IllegalStateException("Unexpected I/O error while writing to data buffer",
173+
throw new IllegalStateException("Unexpected I/O error while writing to byte array builder",
186174
ex);
187175
}
188-
finally {
189-
if (release) {
190-
DataBufferUtils.release(buffer);
191-
}
176+
177+
byte[] bytes = byteBuilder.toByteArray();
178+
DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length);
179+
buffer.write(bytes);
180+
181+
return buffer;
182+
}
183+
184+
private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints,
185+
SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, byte[] separator) {
186+
187+
logValue(hints, value);
188+
189+
try {
190+
sequenceWriter.write(value);
191+
sequenceWriter.flush();
192192
}
193+
catch (InvalidDefinitionException ex) {
194+
throw new CodecException("Type definition error: " + ex.getType(), ex);
195+
}
196+
catch (JsonProcessingException ex) {
197+
throw new EncodingException("JSON encoding error: " + ex.getOriginalMessage(), ex);
198+
}
199+
catch (IOException ex) {
200+
throw new IllegalStateException("Unexpected I/O error while writing to byte array builder",
201+
ex);
202+
}
203+
204+
byte[] bytes = byteArrayBuilder.toByteArray();
205+
byteArrayBuilder.reset();
206+
207+
int offset;
208+
int length;
209+
if (bytes.length > 0 && bytes[0] == ' ') {
210+
// SequenceWriter writes an unnecessary space in between values
211+
offset = 1;
212+
length = bytes.length - 1;
213+
}
214+
else {
215+
offset = 0;
216+
length = bytes.length;
217+
}
218+
DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length);
219+
buffer.write(bytes, offset, length);
220+
buffer.write(separator);
193221

194222
return buffer;
195223
}
196224

225+
private void logValue(@Nullable Map<String, Object> hints, Object value) {
226+
if (!Hints.isLoggingSuppressed(hints)) {
227+
LogFormatUtils.traceDebug(logger, traceOn -> {
228+
String formatted = LogFormatUtils.formatValue(value, !traceOn);
229+
return Hints.getLogPrefix(hints) + "Encoding [" + formatted + "]";
230+
});
231+
}
232+
}
233+
234+
private ObjectWriter createObjectWriter(ResolvableType valueType, @Nullable MimeType mimeType,
235+
@Nullable Map<String, Object> hints) {
236+
JavaType javaType = getJavaType(valueType.getType(), null);
237+
Class<?> jsonView = (hints != null ? (Class<?>) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null);
238+
ObjectWriter writer = (jsonView != null ?
239+
getObjectMapper().writerWithView(jsonView) : getObjectMapper().writer());
240+
241+
if (javaType.isContainerType()) {
242+
writer = writer.forType(javaType);
243+
}
244+
245+
return customizeWriter(writer, mimeType, valueType, hints);
246+
}
247+
197248
protected ObjectWriter customizeWriter(ObjectWriter writer, @Nullable MimeType mimeType,
198249
ResolvableType elementType, @Nullable Map<String, Object> hints) {
199250

200251
return writer;
201252
}
202253

254+
@Nullable
255+
private byte[] streamSeparator(@Nullable MimeType mimeType) {
256+
for (MediaType streamingMediaType : this.streamingMediaTypes) {
257+
if (streamingMediaType.isCompatibleWith(mimeType)) {
258+
return STREAM_SEPARATORS.getOrDefault(streamingMediaType, NEWLINE_SEPARATOR);
259+
}
260+
}
261+
return null;
262+
}
263+
203264
/**
204265
* Determine the JSON encoding to use for the given mime type.
205266
* @param mimeType the mime type as requested by the caller

spring-web/src/test/java/org/springframework/http/codec/json/Jackson2SmileEncoderTests.java

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,23 +20,26 @@
2020
import java.io.UncheckedIOException;
2121
import java.util.Arrays;
2222
import java.util.List;
23-
import java.util.function.Consumer;
2423

24+
import com.fasterxml.jackson.databind.MappingIterator;
2525
import com.fasterxml.jackson.databind.ObjectMapper;
2626
import org.junit.Test;
2727
import reactor.core.publisher.Flux;
2828
import reactor.core.publisher.Mono;
29+
import reactor.test.StepVerifier;
2930

3031
import org.springframework.core.ResolvableType;
3132
import org.springframework.core.codec.AbstractEncoderTestCase;
3233
import org.springframework.core.io.buffer.DataBuffer;
33-
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
34+
import org.springframework.core.io.buffer.DataBufferUtils;
3435
import org.springframework.http.codec.Pojo;
3536
import org.springframework.http.codec.ServerSentEvent;
3637
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
3738
import org.springframework.util.MimeType;
3839

39-
import static org.junit.Assert.*;
40+
import static org.junit.Assert.assertEquals;
41+
import static org.junit.Assert.assertFalse;
42+
import static org.junit.Assert.assertTrue;
4043
import static org.springframework.core.io.buffer.DataBufferUtils.release;
4144
import static org.springframework.http.MediaType.APPLICATION_XML;
4245

@@ -59,21 +62,6 @@ public Jackson2SmileEncoderTests() {
5962

6063
}
6164

62-
public Consumer<DataBuffer> pojoConsumer(Pojo expected) {
63-
return dataBuffer -> {
64-
try {
65-
Pojo actual = this.mapper.reader().forType(Pojo.class)
66-
.readValue(DataBufferTestUtils.dumpBytes(dataBuffer));
67-
assertEquals(expected, actual);
68-
release(dataBuffer);
69-
}
70-
catch (IOException ex) {
71-
throw new UncheckedIOException(ex);
72-
}
73-
};
74-
}
75-
76-
7765
@Override
7866
@Test
7967
public void canEncode() {
@@ -106,7 +94,19 @@ public void encode() {
10694
Flux<Pojo> input = Flux.fromIterable(list);
10795

10896
testEncode(input, Pojo.class, step -> step
109-
.consumeNextWith(expect(list, List.class)));
97+
.consumeNextWith(dataBuffer -> {
98+
try {
99+
Object actual = this.mapper.reader().forType(List.class)
100+
.readValue(dataBuffer.asInputStream());
101+
assertEquals(list, actual);
102+
}
103+
catch (IOException e) {
104+
throw new UncheckedIOException(e);
105+
}
106+
finally {
107+
release(dataBuffer);
108+
}
109+
}));
110110
}
111111

112112
@Test
@@ -127,32 +127,30 @@ public void encodeAsStream() throws Exception {
127127
Flux<Pojo> input = Flux.just(pojo1, pojo2, pojo3);
128128
ResolvableType type = ResolvableType.forClass(Pojo.class);
129129

130-
testEncodeAll(input, type, step -> step
131-
.consumeNextWith(expect(pojo1, Pojo.class))
132-
.consumeNextWith(expect(pojo2, Pojo.class))
133-
.consumeNextWith(expect(pojo3, Pojo.class))
134-
.verifyComplete(),
135-
STREAM_SMILE_MIME_TYPE, null);
130+
Flux<DataBuffer> result = this.encoder
131+
.encode(input, bufferFactory, type, STREAM_SMILE_MIME_TYPE, null);
132+
133+
Mono<MappingIterator<Pojo>> joined = DataBufferUtils.join(result)
134+
.map(buffer -> {
135+
try {
136+
return this.mapper.reader().forType(Pojo.class).readValues(buffer.asInputStream(true));
137+
}
138+
catch (IOException ex) {
139+
throw new UncheckedIOException(ex);
140+
}
141+
});
142+
143+
StepVerifier.create(joined)
144+
.assertNext(iter -> {
145+
assertTrue(iter.hasNext());
146+
assertEquals(pojo1, iter.next());
147+
assertTrue(iter.hasNext());
148+
assertEquals(pojo2, iter.next());
149+
assertTrue(iter.hasNext());
150+
assertEquals(pojo3, iter.next());
151+
assertFalse(iter.hasNext());
152+
})
153+
.verifyComplete();
136154
}
137155

138-
139-
private <T> Consumer<DataBuffer> expect(T expected, Class<T> expectedType) {
140-
return dataBuffer -> {
141-
try {
142-
Object actual = this.mapper.reader().forType(expectedType)
143-
.readValue(dataBuffer.asInputStream());
144-
assertEquals(expected, actual);
145-
}
146-
catch (IOException e) {
147-
throw new UncheckedIOException(e);
148-
}
149-
finally {
150-
release(dataBuffer);
151-
}
152-
};
153-
154-
}
155-
156-
157-
158156
}

0 commit comments

Comments
 (0)