|
17 | 17 | package org.springframework.http.server.reactive;
|
18 | 18 |
|
19 | 19 | import java.nio.charset.StandardCharsets;
|
| 20 | +import java.time.Duration; |
20 | 21 | import java.util.ArrayList;
|
21 | 22 | import java.util.Arrays;
|
22 | 23 | import java.util.List;
|
|
33 | 34 | import reactor.core.publisher.Flux;
|
34 | 35 | import reactor.core.publisher.Mono;
|
35 | 36 | import reactor.core.publisher.Signal;
|
| 37 | +import reactor.test.StepVerifier; |
36 | 38 |
|
37 | 39 | import org.springframework.core.io.buffer.DataBuffer;
|
38 | 40 | import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
|
@@ -156,7 +158,12 @@ public void cancelWhileItemCached() {
|
156 | 158 | }
|
157 | 159 |
|
158 | 160 | @Test // gh-22720
|
159 |
| - public void errorWhileItemCached() { |
| 161 | + public void errorFromWriteSourceWhileItemCached() { |
| 162 | + |
| 163 | + // 1. First item received |
| 164 | + // 2. writeFunction applied and writeCompletionBarrier subscribed to it |
| 165 | + // 3. Write Publisher fails right after that and before request(n) from server |
| 166 | + |
160 | 167 | NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
|
161 | 168 | LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
|
162 | 169 | ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
|
@@ -186,6 +193,30 @@ public void errorWhileItemCached() {
|
186 | 193 | bufferFactory.checkForLeaks();
|
187 | 194 | }
|
188 | 195 |
|
| 196 | + @Test // gh-22720 |
| 197 | + public void errorFromWriteFunctionWhileItemCached() { |
| 198 | + |
| 199 | + // 1. First item received |
| 200 | + // 2. writeFunction applied and writeCompletionBarrier subscribed to it |
| 201 | + // 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server |
| 202 | + |
| 203 | + NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); |
| 204 | + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); |
| 205 | + |
| 206 | + ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>( |
| 207 | + Flux.create(sink -> { |
| 208 | + DataBuffer dataBuffer = bufferFactory.allocateBuffer(); |
| 209 | + dataBuffer.write("foo", StandardCharsets.UTF_8); |
| 210 | + sink.next(dataBuffer); |
| 211 | + }), |
| 212 | + publisher -> { |
| 213 | + publisher.subscribe(new ZeroDemandSubscriber()); |
| 214 | + return Mono.error(new IllegalStateException("err")); |
| 215 | + }); |
| 216 | + |
| 217 | + StepVerifier.create(operator).expectErrorMessage("err").verify(Duration.ofSeconds(5)); |
| 218 | + bufferFactory.checkForLeaks(); |
| 219 | + } |
189 | 220 |
|
190 | 221 | private <T> Mono<Void> sendOperator(Publisher<String> source){
|
191 | 222 | return new ChannelSendOperator<>(source, writer::send);
|
|
0 commit comments