|
36 | 36 | import com.fasterxml.jackson.databind.SequenceWriter; |
37 | 37 | import com.fasterxml.jackson.databind.exc.InvalidDefinitionException; |
38 | 38 | import com.fasterxml.jackson.databind.ser.FilterProvider; |
39 | | -import org.apache.commons.logging.Log; |
40 | 39 | import org.reactivestreams.Publisher; |
41 | 40 | import reactor.core.publisher.Flux; |
42 | 41 | import reactor.core.publisher.Mono; |
@@ -182,20 +181,23 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe |
182 | 181 | // Do not prepend JSON array prefix until first signal is known, onNext vs onError |
183 | 182 | // Keeps response not committed for error handling |
184 | 183 |
|
185 | | - Flux<DataBuffer> flux1 = helper.getPrefix(bufferFactory, hints, logger) |
186 | | - .concatWith(Flux.just(EMPTY_BUFFER).repeat()); |
| 184 | + dataBufferFlux = Flux.from(inputStream) |
| 185 | + .map(value -> { |
| 186 | + byte[] prefix = helper.getPrefix(); |
| 187 | + byte[] delimiter = helper.getDelimiter(); |
187 | 188 |
|
188 | | - Flux<DataBuffer> flux2 = Flux.from(inputStream).map(value -> encodeStreamingValue( |
189 | | - value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES)); |
| 189 | + DataBuffer dataBuffer = encodeStreamingValue( |
| 190 | + value, bufferFactory, hints, sequenceWriter, byteBuilder, delimiter, EMPTY_BYTES); |
190 | 191 |
|
191 | | - dataBufferFlux = Flux.zip(flux1, flux2, (buffer1, buffer2) -> |
192 | | - (buffer1 != EMPTY_BUFFER ? |
193 | | - bufferFactory.join(Arrays.asList(buffer1, buffer2)) : |
194 | | - buffer2)) |
195 | | - .concatWith(helper.getSuffix(bufferFactory, hints, logger)); |
| 192 | + return (prefix.length > 0 ? |
| 193 | + bufferFactory.join(Arrays.asList(bufferFactory.wrap(prefix), dataBuffer)) : |
| 194 | + dataBuffer); |
| 195 | + }) |
| 196 | + .concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix()))); |
196 | 197 | } |
197 | 198 |
|
198 | 199 | return dataBufferFlux |
| 200 | + .doOnNext(dataBuffer -> Hints.touchDataBuffer(dataBuffer, hints, logger)) |
199 | 201 | .doAfterTerminate(() -> { |
200 | 202 | try { |
201 | 203 | byteBuilder.release(); |
@@ -420,33 +422,22 @@ private static class JsonArrayJoinHelper { |
420 | 422 |
|
421 | 423 | private static final byte[] CLOSE_BRACKET = {']'}; |
422 | 424 |
|
423 | | - |
424 | | - private boolean afterFirstItem = false; |
| 425 | + private boolean firstItemEmitted; |
425 | 426 |
|
426 | 427 | public byte[] getDelimiter() { |
427 | | - if (this.afterFirstItem) { |
| 428 | + if (this.firstItemEmitted) { |
428 | 429 | return COMMA_SEPARATOR; |
429 | 430 | } |
430 | | - this.afterFirstItem = true; |
| 431 | + this.firstItemEmitted = true; |
431 | 432 | return EMPTY_BYTES; |
432 | 433 | } |
433 | 434 |
|
434 | | - public Mono<DataBuffer> getPrefix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) { |
435 | | - return wrapBytes(OPEN_BRACKET, factory, hints, logger); |
436 | | - } |
437 | | - |
438 | | - public Mono<DataBuffer> getSuffix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) { |
439 | | - return wrapBytes(CLOSE_BRACKET, factory, hints, logger); |
| 435 | + public byte[] getPrefix() { |
| 436 | + return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET); |
440 | 437 | } |
441 | 438 |
|
442 | | - private Mono<DataBuffer> wrapBytes( |
443 | | - byte[] bytes, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints, Log logger) { |
444 | | - |
445 | | - return Mono.fromCallable(() -> { |
446 | | - DataBuffer buffer = bufferFactory.wrap(bytes); |
447 | | - Hints.touchDataBuffer(buffer, hints, logger); |
448 | | - return buffer; |
449 | | - }); |
| 439 | + public byte[] getSuffix() { |
| 440 | + return CLOSE_BRACKET; |
450 | 441 | } |
451 | 442 | } |
452 | 443 |
|
|
0 commit comments