Skip to content

Commit 5d1ecf2

Browse files
committed
Add locator count setting
1 parent c4d0c48 commit 5d1ecf2

File tree

5 files changed

+35
-10
lines changed

5 files changed

+35
-10
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.rabbitmq.stream.impl.Utils.*;
1919
import static java.lang.String.format;
2020
import static java.util.concurrent.TimeUnit.SECONDS;
21+
import static java.util.stream.Collectors.toList;
2122

2223
import com.rabbitmq.stream.*;
2324
import com.rabbitmq.stream.MessageHandler.Context;
@@ -53,6 +54,7 @@
5354
import java.util.function.LongSupplier;
5455
import java.util.function.Supplier;
5556
import java.util.stream.Collectors;
57+
import java.util.stream.IntStream;
5658
import javax.net.ssl.SSLException;
5759
import org.slf4j.Logger;
5860
import org.slf4j.LoggerFactory;
@@ -81,7 +83,7 @@ class StreamEnvironment implements Environment {
8183
private final ByteBufAllocator byteBufAllocator;
8284
private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false);
8385
private final Runnable locatorInitializationSequence;
84-
private final List<Locator> locators = new CopyOnWriteArrayList<>();
86+
private final List<Locator> locators;
8587
private final ExecutorServiceFactory executorServiceFactory;
8688
private final ObservationCollector<?> observationCollector;
8789

@@ -105,7 +107,8 @@ class StreamEnvironment implements Environment {
105107
boolean forceReplicaForConsumers,
106108
boolean forceLeaderForProducers,
107109
Duration producerNodeRetryDelay,
108-
Duration consumerNodeRetryDelay) {
110+
Duration consumerNodeRetryDelay,
111+
int expectedLocatorCount) {
109112
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
110113
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
111114
this.byteBufAllocator = byteBufAllocator;
@@ -147,7 +150,7 @@ class StreamEnvironment implements Environment {
147150
new Address(
148151
uriItem.getHost() == null ? "localhost" : uriItem.getHost(),
149152
uriItem.getPort() == -1 ? defaultPort : uriItem.getPort()))
150-
.collect(Collectors.toList());
153+
.collect(toList());
151154
}
152155

153156
AddressResolver addressResolverToUse = addressResolver;
@@ -179,7 +182,19 @@ class StreamEnvironment implements Environment {
179182

180183
this.addressResolver = addressResolverToUse;
181184

182-
this.addresses.forEach(address -> this.locators.add(new Locator(address)));
185+
int locatorCount = Math.max(this.addresses.size(), expectedLocatorCount);
186+
LOGGER.debug("Using {} locator connection(s)", locatorCount);
187+
188+
List<Locator> lctrs =
189+
IntStream.range(0, locatorCount)
190+
.mapToObj(
191+
i -> {
192+
Address addr = this.addresses.get(i % this.addresses.size());
193+
return new Locator(addr);
194+
})
195+
.collect(toList());
196+
this.locators = List.copyOf(lctrs);
197+
183198
this.executorServiceFactory =
184199
new DefaultExecutorServiceFactory(
185200
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
7070
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
7171
private Duration producerNodeRetryDelay = Duration.ofMillis(500);
7272
private Duration consumerNodeRetryDelay = Duration.ofMillis(1000);
73+
private int locatorCount = 1;
7374

7475
public StreamEnvironmentBuilder() {}
7576

@@ -315,6 +316,11 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay)
315316
return this;
316317
}
317318

319+
StreamEnvironmentBuilder locatorCount(int locatorCount) {
320+
this.locatorCount = locatorCount;
321+
return this;
322+
}
323+
318324
@Override
319325
public Environment build() {
320326
if (this.compressionCodecFactory == null) {
@@ -349,7 +355,8 @@ public Environment build() {
349355
this.forceReplicaForConsumers,
350356
this.forceLeaderForProducers,
351357
this.producerNodeRetryDelay,
352-
this.consumerNodeRetryDelay);
358+
this.consumerNodeRetryDelay,
359+
this.locatorCount);
353360
}
354361

355362
static final class DefaultTlsConfiguration implements TlsConfiguration {

src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
115115
.addressResolver(addr -> LOAD_BALANCER_ADDRESS);
116116
Duration nodeRetryDelay = Duration.ofMillis(100);
117117
environmentBuilder.forceLeaderForProducers(forceLeader);
118+
((StreamEnvironmentBuilder) environmentBuilder).locatorCount(URIS.size());
118119
// to make the test faster
119120
((StreamEnvironmentBuilder) environmentBuilder).producerNodeRetryDelay(nodeRetryDelay);
120121
((StreamEnvironmentBuilder) environmentBuilder).consumerNodeRetryDelay(nodeRetryDelay);

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,8 +475,7 @@ void manualTrackingConsumerShouldRestartWhereItLeftOff() throws Exception {
475475

476476
@Test
477477
@DisabledIfRabbitMqCtlNotSet
478-
void consumerShouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived()
479-
throws Exception {
478+
void consumerShouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived() {
480479
int messageCountFirstWave = 10_000;
481480
Producer producer = environment.producerBuilder().stream(stream).build();
482481

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ Client.ClientParameters duplicate() {
101101
false,
102102
true,
103103
Duration.ofMillis(100),
104-
Duration.ofMillis(100));
104+
Duration.ofMillis(100),
105+
1);
105106
}
106107

107108
@AfterEach
@@ -169,7 +170,8 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
169170
false,
170171
true,
171172
Duration.ofMillis(100),
172-
Duration.ofMillis(100));
173+
Duration.ofMillis(100),
174+
1);
173175
verify(cf, times(3)).apply(any(Client.ClientParameters.class));
174176
}
175177

@@ -200,7 +202,8 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
200202
false,
201203
true,
202204
Duration.ofMillis(100),
203-
Duration.ofMillis(100));
205+
Duration.ofMillis(100),
206+
1);
204207
verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
205208
}
206209

0 commit comments

Comments
 (0)