Skip to content

Commit 61d1315

Browse files
committed
Merge #2756 into 1.1.6
Signed-off-by: Oleh Dokuka <[email protected]>
2 parents 4f5dbad + d335c33 commit 61d1315

File tree

2 files changed

+46
-6
lines changed

2 files changed

+46
-6
lines changed

reactor-netty-core/src/main/java/reactor/netty/channel/FluxReceive.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.nio.channels.ClosedChannelException;
1919
import java.util.ArrayDeque;
2020
import java.util.Queue;
21-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2221
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2322
import java.util.function.IntConsumer;
2423

@@ -61,9 +60,7 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable
6160

6261
volatile IntConsumer receiverCancel;
6362

64-
volatile int once;
65-
static final AtomicIntegerFieldUpdater<FluxReceive> ONCE =
66-
AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");
63+
boolean subscribedOnce;
6764

6865
// Please note, in this specific case WIP is non-volatile since all operation that
6966
// involves work-in-progress pattern is within Netty Event-Loops which guarantees
@@ -153,11 +150,12 @@ public void subscribe(CoreSubscriber<? super Object> s) {
153150
}
154151

155152
final void startReceiver(CoreSubscriber<? super Object> s) {
156-
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
153+
if (!subscribedOnce) {
154+
subscribedOnce = true;
157155
if (log.isDebugEnabled()) {
158156
log.debug(format(channel, "{}: subscribing inbound receiver"), this);
159157
}
160-
if (inboundDone && getPending() == 0) {
158+
if ((inboundDone && getPending() == 0) || isCancelled()) {
161159
if (inboundError != null) {
162160
Operators.error(s, inboundError);
163161
return;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package reactor.netty.channel;
17+
18+
import java.time.Duration;
19+
20+
import io.netty.channel.embedded.EmbeddedChannel;
21+
import org.junit.jupiter.api.Test;
22+
import reactor.netty.NettyInbound;
23+
import reactor.netty.NettyOutbound;
24+
import reactor.test.subscriber.TestSubscriber;
25+
import reactor.test.util.RaceTestUtils;
26+
27+
public class FluxReceiveTest {
28+
29+
@Test
30+
void disposeAndSubscribeRaceTest() {
31+
for (int i = 0; i < 100; i++) {
32+
ChannelOperations<NettyInbound, NettyOutbound> operations =
33+
new ChannelOperations<>(EmbeddedChannel::new, (connection, newState) -> {
34+
});
35+
FluxReceive receive = new FluxReceive(operations);
36+
TestSubscriber<Object> subscriber = TestSubscriber.create();
37+
RaceTestUtils.race(receive::dispose, () -> receive.subscribe(subscriber));
38+
39+
subscriber.block(Duration.ofSeconds(5));
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)