Skip to content

Commit 8e7ab3c

Browse files
author
Oleh Dokuka
committed
Merge #2765 into 1.1.6
Signed-off-by: OlegDokuka <[email protected]>
2 parents 4f642ab + 909c6cd commit 8e7ab3c

File tree

2 files changed

+96
-2
lines changed

2 files changed

+96
-2
lines changed

reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -351,6 +351,17 @@ public void run() {
351351
else {
352352
actual.onError(t);
353353
}
354+
355+
if (sourceMode == ASYNC) {
356+
// notify that queue draining is done and no more interactions are expected for here
357+
//
358+
// This is needed due to ASYNC fusion contract.
359+
// This call notifies upstream that the interaction with the queue is done on the
360+
// downstream side.
361+
// Some upstreams implementations may have `onClose()` mechanism to send notification
362+
// about its termination and finalization of the offered events consumption
363+
queue.clear();
364+
}
354365
return;
355366
}
356367

reactor-netty-core/src/test/java/reactor/netty/channel/MonoSendManyTest.java

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2019-2021 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Queue;
2424
import java.util.concurrent.ArrayBlockingQueue;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526
import java.util.function.Function;
2627
import java.util.stream.Collectors;
2728

@@ -32,14 +33,18 @@
3233
import io.netty.handler.timeout.WriteTimeoutHandler;
3334
import io.netty.util.ReferenceCountUtil;
3435
import io.netty.util.ReferenceCounted;
36+
import org.assertj.core.api.Assertions;
3537
import org.junit.jupiter.params.ParameterizedTest;
3638
import org.junit.jupiter.params.provider.ValueSource;
3739
import org.reactivestreams.Subscription;
40+
import reactor.core.CoreSubscriber;
3841
import reactor.core.Exceptions;
42+
import reactor.core.Fuseable;
3943
import reactor.core.publisher.BaseSubscriber;
4044
import reactor.core.publisher.Flux;
4145
import reactor.core.publisher.Hooks;
4246
import reactor.core.publisher.Mono;
47+
import reactor.core.publisher.Operators;
4348
import reactor.core.publisher.Sinks;
4449
import reactor.test.StepVerifier;
4550
import reactor.test.publisher.TestPublisher;
@@ -259,6 +264,84 @@ void shouldNotLeakIfFusedOnRacingCancelAndOnNext(boolean flushOnEach) {
259264
}
260265
}
261266

267+
268+
@ParameterizedTest
269+
@ValueSource(booleans = {true, false})
270+
void shouldCallQueueClearToNotifyTermination(boolean flushOnEach) {
271+
//use an extra handler
272+
EmbeddedChannel channel = new EmbeddedChannel(true, true, new ChannelHandlerAdapter() {});
273+
AtomicBoolean cleared = new AtomicBoolean();
274+
275+
Sinks.Many<ByteBuf> source = Sinks.many().unicast().onBackpressureBuffer();
276+
MonoSendMany<ByteBuf, ByteBuf> m =
277+
MonoSendMany.byteBufSource(source.asFlux().transform(Operators.<ByteBuf, ByteBuf>lift((__,
278+
downstream) -> new CoreSubscriber<ByteBuf>() {
279+
@Override
280+
public void onSubscribe(Subscription s) {
281+
downstream.onSubscribe(new Fuseable.QueueSubscription<ByteBuf>() {
282+
@Override
283+
public void request(long n) {
284+
s.request(n);
285+
}
286+
287+
@Override
288+
public void cancel() {
289+
s.cancel();
290+
}
291+
292+
@Override
293+
public int size() {
294+
return ((Fuseable.QueueSubscription<ByteBuf>) s).size();
295+
}
296+
297+
@Override
298+
public boolean isEmpty() {
299+
return ((Fuseable.QueueSubscription<ByteBuf>) s).isEmpty();
300+
}
301+
302+
@Override
303+
public void clear() {
304+
cleared.set(true);
305+
((Fuseable.QueueSubscription<ByteBuf>) s).clear();
306+
}
307+
308+
@Override
309+
public ByteBuf poll() {
310+
return ((Fuseable.QueueSubscription<ByteBuf>) s).poll();
311+
}
312+
313+
@Override
314+
public int requestFusion(int requestedMode) {
315+
return ((Fuseable.QueueSubscription<ByteBuf>) s).requestFusion(requestedMode);
316+
}
317+
});
318+
}
319+
320+
@Override
321+
public void onNext(ByteBuf buf) {
322+
downstream.onNext(buf);
323+
}
324+
325+
@Override
326+
public void onError(Throwable t) {
327+
downstream.onError(t);
328+
}
329+
330+
@Override
331+
public void onComplete() {
332+
downstream.onComplete();
333+
}
334+
})), channel, b -> flushOnEach);
335+
m.subscribe();
336+
Queue<Object> messages = channel.outboundMessages();
337+
338+
source.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
339+
340+
channel.flush();
341+
messages.forEach(ReferenceCountUtil::release);
342+
Assertions.assertThat(cleared).isTrue();
343+
}
344+
262345
static void wait(WeakReference<Subscription> ref) {
263346
int duration = 5_000;
264347
int spins = duration / 100;

0 commit comments

Comments
 (0)