Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
Expand All @@ -103,6 +104,7 @@ class StreamEnvironment implements Environment {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class);

private final EventLoopGroup eventLoopGroup;
private final boolean privateEventLoopGroup;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService locatorReconnectionScheduledExecutorService;
private final boolean privateScheduleExecutorService;
Expand Down Expand Up @@ -272,17 +274,16 @@ class StreamEnvironment implements Environment {

if (clientParametersPrototype.eventLoopGroup == null) {
this.eventLoopGroup = Utils.eventLoopGroup();
this.privateEventLoopGroup = true;
shutdownService.wrap(() -> closeEventLoopGroup(this.eventLoopGroup));
this.clientParametersPrototype =
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
} else {
this.eventLoopGroup = null;
this.clientParametersPrototype =
clientParametersPrototype
.duplicate()
.eventLoopGroup(clientParametersPrototype.eventLoopGroup);
this.eventLoopGroup = clientParametersPrototype.eventLoopGroup;
this.privateEventLoopGroup = false;
}

this.clientParametersPrototype =
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);

this.producersCoordinator =
new ProducersCoordinator(
this,
Expand Down Expand Up @@ -356,13 +357,19 @@ class StreamEnvironment implements Environment {
clientFactory,
this.locatorReconnectionScheduledExecutorService,
this.recoveryBackOffDelayPolicy,
l.label());
l.label(),
this::shouldTryLocatorConnection);
}
});
}
};
if (lazyInit) {
this.locatorInitializationSequence = locatorInitSequence;
this.locatorInitializationSequence =
() -> {
if (this.shouldTryLocatorConnection()) {
locatorInitSequence.run();
}
};
} else {
locatorInitSequence.run();
locatorsInitialized.set(true);
Expand Down Expand Up @@ -408,7 +415,8 @@ private ShutdownListener shutdownListener(
clientFactory,
this.locatorReconnectionScheduledExecutorService,
delayPolicy,
label);
label,
this::shouldTryLocatorConnection);
} else {
LOGGER.debug("Locator connection '{}' closing normally", label);
}
Expand All @@ -425,7 +433,8 @@ private static void scheduleLocatorConnection(
Function<Client.ClientParameters, Client> clientFactory,
ScheduledExecutorService scheduler,
BackOffDelayPolicy delayPolicy,
String locatorLabel) {
String locatorLabel,
BooleanSupplier shouldRetry) {
LOGGER.debug(
"Scheduling locator '{}' connection with delay policy {}", locatorLabel, delayPolicy);
try {
Expand All @@ -452,6 +461,7 @@ private static void scheduleLocatorConnection(
.description("Locator '%s' connection", locatorLabel)
.scheduler(scheduler)
.delayPolicy(delayPolicy)
.retry(ignored -> shouldRetry.getAsBoolean())
.build()
.thenAccept(locator::client)
.exceptionally(
Expand Down Expand Up @@ -777,14 +787,19 @@ public void close() {
if (this.locatorReconnectionScheduledExecutorService != null) {
this.locatorReconnectionScheduledExecutorService.shutdownNow();
}
closeEventLoopGroup(this.eventLoopGroup);
if (this.privateEventLoopGroup) {
closeEventLoopGroup(this.eventLoopGroup);
}
}
}

private boolean shouldTryLocatorConnection() {
return !this.closed.get() && !this.eventLoopGroup.isShuttingDown();
}

private static void closeEventLoopGroup(EventLoopGroup eventLoopGroup) {
try {
if (eventLoopGroup != null
&& (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown())) {
if (!eventLoopGroup.isShuttingDown()) {
LOGGER.debug("Closing Netty event loop group");
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -57,7 +58,12 @@ public class DedupIdempotencyTest {
void init() {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environment = environmentBuilder.build();
this.environment = environmentBuilder.build();
}

@AfterEach
void tearDown() {
this.environment.close();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@ExtendWith({
TestUtils.StreamTestInfrastructureExtension.class,
BrokerVersionAtLeast311Condition.class
})
@ExtendWith({BrokerVersionAtLeast311Condition.class})
@StreamTestInfrastructure
@SingleActiveConsumer
public class SacSuperStreamConsumerTest {
EventLoopGroup eventLoopGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
import com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.time.Duration;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -70,7 +73,7 @@ public class StreamProducerUnitTest {
@Mock Channel channel;
@Mock ChannelFuture channelFuture;

Set<ByteBuf> buffers = ConcurrentHashMap.newKeySet();
Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<>();

ScheduledExecutorService executorService;
Clock clock = new Clock();
Expand All @@ -82,15 +85,16 @@ public class StreamProducerUnitTest {
void init() {
mocks = MockitoAnnotations.openMocks(this);
executorService = Executors.newScheduledThreadPool(2);
when(channel.alloc()).thenReturn(Utils.byteBufAllocator());
ByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
when(channel.alloc()).thenReturn(allocator);
when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt()))
.thenAnswer(
(Answer<ByteBuf>)
invocation -> {
ByteBufAllocator allocator = invocation.getArgument(0);
ByteBufAllocator alloc = invocation.getArgument(0);
int capacity = invocation.getArgument(1);
ByteBuf buffer = allocator.buffer(capacity);
ByteBuf buffer = alloc.buffer(capacity);
buffers.add(buffer);
return buffer;
});
Expand Down
Loading