Skip to content

Commit 4beb5fa

Browse files
committed
Stop reconnecting locator is environment is closed
1 parent 081a6cb commit 4beb5fa

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -356,13 +356,19 @@ class StreamEnvironment implements Environment {
356356
clientFactory,
357357
this.locatorReconnectionScheduledExecutorService,
358358
this.recoveryBackOffDelayPolicy,
359-
l.label());
359+
l.label(),
360+
this.closed);
360361
}
361362
});
362363
}
363364
};
364365
if (lazyInit) {
365-
this.locatorInitializationSequence = locatorInitSequence;
366+
this.locatorInitializationSequence =
367+
() -> {
368+
if (!this.closed.get()) {
369+
locatorInitSequence.run();
370+
}
371+
};
366372
} else {
367373
locatorInitSequence.run();
368374
locatorsInitialized.set(true);
@@ -391,7 +397,7 @@ private ShutdownListener shutdownListener(
391397
shutdownContext -> {
392398
String label = locator.label();
393399
LOGGER.debug("Locator {} disconnected", label);
394-
if (shutdownContext.isShutdownUnexpected()) {
400+
if (shutdownContext.isShutdownUnexpected() && !this.closed.get()) {
395401
locator.client(null);
396402
BackOffDelayPolicy delayPolicy = recoveryBackOffDelayPolicy;
397403
LOGGER.debug(
@@ -408,7 +414,8 @@ private ShutdownListener shutdownListener(
408414
clientFactory,
409415
this.locatorReconnectionScheduledExecutorService,
410416
delayPolicy,
411-
label);
417+
label,
418+
this.closed);
412419
} else {
413420
LOGGER.debug("Locator connection '{}' closing normally", label);
414421
}
@@ -425,7 +432,8 @@ private static void scheduleLocatorConnection(
425432
Function<Client.ClientParameters, Client> clientFactory,
426433
ScheduledExecutorService scheduler,
427434
BackOffDelayPolicy delayPolicy,
428-
String locatorLabel) {
435+
String locatorLabel,
436+
AtomicBoolean closed) {
429437
LOGGER.debug(
430438
"Scheduling locator '{}' connection with delay policy {}", locatorLabel, delayPolicy);
431439
try {
@@ -452,6 +460,7 @@ private static void scheduleLocatorConnection(
452460
.description("Locator '%s' connection", locatorLabel)
453461
.scheduler(scheduler)
454462
.delayPolicy(delayPolicy)
463+
.retry(ignored -> !closed.get())
455464
.build()
456465
.thenAccept(locator::client)
457466
.exceptionally(

src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,8 @@
5353
import org.junit.jupiter.params.ParameterizedTest;
5454
import org.junit.jupiter.params.provider.MethodSource;
5555

56-
@ExtendWith({
57-
TestUtils.StreamTestInfrastructureExtension.class,
58-
BrokerVersionAtLeast311Condition.class
59-
})
56+
@ExtendWith({BrokerVersionAtLeast311Condition.class})
57+
@StreamTestInfrastructure
6058
@SingleActiveConsumer
6159
public class SacSuperStreamConsumerTest {
6260
EventLoopGroup eventLoopGroup;

0 commit comments

Comments
 (0)