@@ -223,6 +223,8 @@ static class AggregateSubscriber extends BaseSubscriber<String> {
223
223
*/
224
224
private ResponseInfo responseInfo ;
225
225
226
+ volatile boolean hasRequestedDemand = false ;
227
+
226
228
/**
227
229
* Creates a new JsonLineSubscriber that will emit parsed JSON-RPC messages.
228
230
* @param sink the {@link FluxSink} to emit parsed {@link ResponseEvent} objects
@@ -236,7 +238,13 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink<ResponseEvent> si
236
238
237
239
@ Override
238
240
protected void hookOnSubscribe (Subscription subscription ) {
239
- sink .onRequest (subscription ::request );
241
+
242
+ sink .onRequest (n -> {
243
+ if (!hasRequestedDemand ) {
244
+ subscription .request (Long .MAX_VALUE );
245
+ }
246
+ hasRequestedDemand = true ;
247
+ });
240
248
241
249
// Register disposal callback to cancel subscription when Flux is disposed
242
250
sink .onDispose (subscription ::cancel );
@@ -249,8 +257,11 @@ protected void hookOnNext(String line) {
249
257
250
258
@ Override
251
259
protected void hookOnComplete () {
252
- String data = this .eventBuilder .toString ();
253
- this .sink .next (new AggregateResponseEvent (responseInfo , data ));
260
+
261
+ if (hasRequestedDemand ) {
262
+ String data = this .eventBuilder .toString ();
263
+ this .sink .next (new AggregateResponseEvent (responseInfo , data ));
264
+ }
254
265
255
266
this .sink .complete ();
256
267
}
@@ -271,6 +282,8 @@ static class BodilessResponseLineSubscriber extends BaseSubscriber<String> {
271
282
272
283
private final ResponseInfo responseInfo ;
273
284
285
+ volatile boolean hasRequestedDemand = false ;
286
+
274
287
public BodilessResponseLineSubscriber (ResponseInfo responseInfo , FluxSink <ResponseEvent > sink ) {
275
288
this .sink = sink ;
276
289
this .responseInfo = responseInfo ;
@@ -280,7 +293,10 @@ public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink<Respon
280
293
protected void hookOnSubscribe (Subscription subscription ) {
281
294
282
295
sink .onRequest (n -> {
283
- subscription .request (n );
296
+ if (!hasRequestedDemand ) {
297
+ subscription .request (Long .MAX_VALUE );
298
+ }
299
+ hasRequestedDemand = true ;
284
300
});
285
301
286
302
// Register disposal callback to cancel subscription when Flux is disposed
@@ -291,11 +307,13 @@ protected void hookOnSubscribe(Subscription subscription) {
291
307
292
308
@ Override
293
309
protected void hookOnComplete () {
294
- // emit dummy event to be able to inspect the response info
295
- // this is a shortcut allowing for a more streamlined processing using
296
- // operator composition instead of having to deal with the CompletableFuture
297
- // along the Subscriber for inspecting the result
298
- this .sink .next (new DummyEvent (responseInfo ));
310
+ if (hasRequestedDemand ) {
311
+ // emit dummy event to be able to inspect the response info
312
+ // this is a shortcut allowing for a more streamlined processing using
313
+ // operator composition instead of having to deal with the
314
+ // CompletableFuture along the Subscriber for inspecting the result
315
+ this .sink .next (new DummyEvent (responseInfo ));
316
+ }
299
317
this .sink .complete ();
300
318
}
301
319
0 commit comments