Skip to content

Commit 4aee61b

Browse files
authored
Merge pull request #846 from rabbitmq/no-locator-reconnect-if-closed
Stop reconnecting locator is environment is closed
2 parents 081a6cb + 5849049 commit 4aee61b

File tree

4 files changed

+46
-23
lines changed

4 files changed

+46
-23
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import java.util.concurrent.atomic.AtomicBoolean;
8888
import java.util.concurrent.atomic.AtomicReference;
8989
import java.util.function.BiFunction;
90+
import java.util.function.BooleanSupplier;
9091
import java.util.function.Consumer;
9192
import java.util.function.Function;
9293
import java.util.function.LongConsumer;
@@ -103,6 +104,7 @@ class StreamEnvironment implements Environment {
103104
private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class);
104105

105106
private final EventLoopGroup eventLoopGroup;
107+
private final boolean privateEventLoopGroup;
106108
private final ScheduledExecutorService scheduledExecutorService;
107109
private final ScheduledExecutorService locatorReconnectionScheduledExecutorService;
108110
private final boolean privateScheduleExecutorService;
@@ -272,17 +274,16 @@ class StreamEnvironment implements Environment {
272274

273275
if (clientParametersPrototype.eventLoopGroup == null) {
274276
this.eventLoopGroup = Utils.eventLoopGroup();
277+
this.privateEventLoopGroup = true;
275278
shutdownService.wrap(() -> closeEventLoopGroup(this.eventLoopGroup));
276-
this.clientParametersPrototype =
277-
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
278279
} else {
279-
this.eventLoopGroup = null;
280-
this.clientParametersPrototype =
281-
clientParametersPrototype
282-
.duplicate()
283-
.eventLoopGroup(clientParametersPrototype.eventLoopGroup);
280+
this.eventLoopGroup = clientParametersPrototype.eventLoopGroup;
281+
this.privateEventLoopGroup = false;
284282
}
285283

284+
this.clientParametersPrototype =
285+
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
286+
286287
this.producersCoordinator =
287288
new ProducersCoordinator(
288289
this,
@@ -356,13 +357,19 @@ class StreamEnvironment implements Environment {
356357
clientFactory,
357358
this.locatorReconnectionScheduledExecutorService,
358359
this.recoveryBackOffDelayPolicy,
359-
l.label());
360+
l.label(),
361+
this::shouldTryLocatorConnection);
360362
}
361363
});
362364
}
363365
};
364366
if (lazyInit) {
365-
this.locatorInitializationSequence = locatorInitSequence;
367+
this.locatorInitializationSequence =
368+
() -> {
369+
if (this.shouldTryLocatorConnection()) {
370+
locatorInitSequence.run();
371+
}
372+
};
366373
} else {
367374
locatorInitSequence.run();
368375
locatorsInitialized.set(true);
@@ -408,7 +415,8 @@ private ShutdownListener shutdownListener(
408415
clientFactory,
409416
this.locatorReconnectionScheduledExecutorService,
410417
delayPolicy,
411-
label);
418+
label,
419+
this::shouldTryLocatorConnection);
412420
} else {
413421
LOGGER.debug("Locator connection '{}' closing normally", label);
414422
}
@@ -425,7 +433,8 @@ private static void scheduleLocatorConnection(
425433
Function<Client.ClientParameters, Client> clientFactory,
426434
ScheduledExecutorService scheduler,
427435
BackOffDelayPolicy delayPolicy,
428-
String locatorLabel) {
436+
String locatorLabel,
437+
BooleanSupplier shouldRetry) {
429438
LOGGER.debug(
430439
"Scheduling locator '{}' connection with delay policy {}", locatorLabel, delayPolicy);
431440
try {
@@ -452,6 +461,7 @@ private static void scheduleLocatorConnection(
452461
.description("Locator '%s' connection", locatorLabel)
453462
.scheduler(scheduler)
454463
.delayPolicy(delayPolicy)
464+
.retry(ignored -> shouldRetry.getAsBoolean())
455465
.build()
456466
.thenAccept(locator::client)
457467
.exceptionally(
@@ -777,14 +787,19 @@ public void close() {
777787
if (this.locatorReconnectionScheduledExecutorService != null) {
778788
this.locatorReconnectionScheduledExecutorService.shutdownNow();
779789
}
780-
closeEventLoopGroup(this.eventLoopGroup);
790+
if (this.privateEventLoopGroup) {
791+
closeEventLoopGroup(this.eventLoopGroup);
792+
}
781793
}
782794
}
783795

796+
private boolean shouldTryLocatorConnection() {
797+
return !this.closed.get() && !this.eventLoopGroup.isShuttingDown();
798+
}
799+
784800
private static void closeEventLoopGroup(EventLoopGroup eventLoopGroup) {
785801
try {
786-
if (eventLoopGroup != null
787-
&& (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown())) {
802+
if (!eventLoopGroup.isShuttingDown()) {
788803
LOGGER.debug("Closing Netty event loop group");
789804
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
790805
}

src/test/java/com/rabbitmq/stream/impl/DedupIdempotencyTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.function.Supplier;
4444
import java.util.stream.Collectors;
4545
import java.util.stream.IntStream;
46+
import org.junit.jupiter.api.AfterEach;
4647
import org.junit.jupiter.api.BeforeEach;
4748
import org.junit.jupiter.api.Test;
4849

@@ -57,7 +58,12 @@ public class DedupIdempotencyTest {
5758
void init() {
5859
EnvironmentBuilder environmentBuilder =
5960
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
60-
environment = environmentBuilder.build();
61+
this.environment = environmentBuilder.build();
62+
}
63+
64+
@AfterEach
65+
void tearDown() {
66+
this.environment.close();
6167
}
6268

6369
@Test

src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,8 @@
5353
import org.junit.jupiter.params.ParameterizedTest;
5454
import org.junit.jupiter.params.provider.MethodSource;
5555

56-
@ExtendWith({
57-
TestUtils.StreamTestInfrastructureExtension.class,
58-
BrokerVersionAtLeast311Condition.class
59-
})
56+
@ExtendWith({BrokerVersionAtLeast311Condition.class})
57+
@StreamTestInfrastructure
6058
@SingleActiveConsumer
6159
public class SacSuperStreamConsumerTest {
6260
EventLoopGroup eventLoopGroup;

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@
3939
import com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback;
4040
import io.netty.buffer.ByteBuf;
4141
import io.netty.buffer.ByteBufAllocator;
42+
import io.netty.buffer.UnpooledByteBufAllocator;
4243
import io.netty.channel.Channel;
4344
import io.netty.channel.ChannelFuture;
4445
import java.time.Duration;
46+
import java.util.Queue;
4547
import java.util.Set;
4648
import java.util.concurrent.ConcurrentHashMap;
49+
import java.util.concurrent.ConcurrentLinkedQueue;
4750
import java.util.concurrent.CountDownLatch;
4851
import java.util.concurrent.Executors;
4952
import java.util.concurrent.ScheduledExecutorService;
@@ -70,7 +73,7 @@ public class StreamProducerUnitTest {
7073
@Mock Channel channel;
7174
@Mock ChannelFuture channelFuture;
7275

73-
Set<ByteBuf> buffers = ConcurrentHashMap.newKeySet();
76+
Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();
7477

7578
ScheduledExecutorService executorService;
7679
Clock clock = new Clock();
@@ -82,15 +85,16 @@ public class StreamProducerUnitTest {
8285
void init() {
8386
mocks = MockitoAnnotations.openMocks(this);
8487
executorService = Executors.newScheduledThreadPool(2);
85-
when(channel.alloc()).thenReturn(Utils.byteBufAllocator());
88+
ByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
89+
when(channel.alloc()).thenReturn(allocator);
8690
when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
8791
when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt()))
8892
.thenAnswer(
8993
(Answer<ByteBuf>)
9094
invocation -> {
91-
ByteBufAllocator allocator = invocation.getArgument(0);
95+
ByteBufAllocator alloc = invocation.getArgument(0);
9296
int capacity = invocation.getArgument(1);
93-
ByteBuf buffer = allocator.buffer(capacity);
97+
ByteBuf buffer = alloc.buffer(capacity);
9498
buffers.add(buffer);
9599
return buffer;
96100
});

0 commit comments

Comments
 (0)