Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ static class AggregateSubscriber extends BaseSubscriber<String> {
*/
private ResponseInfo responseInfo;

volatile boolean hasRequestedDemand = false;

/**
* Creates a new JsonLineSubscriber that will emit parsed JSON-RPC messages.
* @param sink the {@link FluxSink} to emit parsed {@link ResponseEvent} objects
Expand All @@ -236,7 +238,13 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink<ResponseEvent> si

@Override
protected void hookOnSubscribe(Subscription subscription) {
sink.onRequest(subscription::request);

sink.onRequest(n -> {
if (!hasRequestedDemand) {
subscription.request(Long.MAX_VALUE);
}
hasRequestedDemand = true;
});

// Register disposal callback to cancel subscription when Flux is disposed
sink.onDispose(subscription::cancel);
Expand All @@ -249,8 +257,11 @@ protected void hookOnNext(String line) {

@Override
protected void hookOnComplete() {
String data = this.eventBuilder.toString();
this.sink.next(new AggregateResponseEvent(responseInfo, data));

if (hasRequestedDemand) {
String data = this.eventBuilder.toString();
this.sink.next(new AggregateResponseEvent(responseInfo, data));
}

this.sink.complete();
}
Expand All @@ -271,6 +282,8 @@ static class BodilessResponseLineSubscriber extends BaseSubscriber<String> {

private final ResponseInfo responseInfo;

volatile boolean hasRequestedDemand = false;

public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink<ResponseEvent> sink) {
this.sink = sink;
this.responseInfo = responseInfo;
Expand All @@ -280,7 +293,10 @@ public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink<Respon
protected void hookOnSubscribe(Subscription subscription) {

sink.onRequest(n -> {
subscription.request(n);
if (!hasRequestedDemand) {
subscription.request(Long.MAX_VALUE);
}
hasRequestedDemand = true;
});

// Register disposal callback to cancel subscription when Flux is disposed
Expand All @@ -291,11 +307,13 @@ protected void hookOnSubscribe(Subscription subscription) {

@Override
protected void hookOnComplete() {
// emit dummy event to be able to inspect the response info
// this is a shortcut allowing for a more streamlined processing using
// operator composition instead of having to deal with the CompletableFuture
// along the Subscriber for inspecting the result
this.sink.next(new DummyEvent(responseInfo));
if (hasRequestedDemand) {
// emit dummy event to be able to inspect the response info
// this is a shortcut allowing for a more streamlined processing using
// operator composition instead of having to deal with the
// CompletableFuture along the Subscriber for inspecting the result
this.sink.next(new DummyEvent(responseInfo));
}
this.sink.complete();
}

Expand Down