Skip to content

Commit 6f86a8b

Browse files
authored
Merge pull request #177 from rabbitmq/create-super-stream-with-stream-protocol
Create super stream with stream protocol
2 parents 31011b3 + e3927d0 commit 6f86a8b

File tree

5 files changed

+22
-170
lines changed

5 files changed

+22
-170
lines changed

pom.xml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646

4747
<properties>
4848
<stream-client.version>0.20.0</stream-client.version>
49-
<amqp-client.version>5.22.0</amqp-client.version>
5049
<slf4j.version>2.0.16</slf4j.version>
5150
<logback.version>1.5.12</logback.version>
5251
<netty.version>4.1.115.Final</netty.version>
@@ -99,12 +98,6 @@
9998
<version>${stream-client.version}</version>
10099
</dependency>
101100

102-
<dependency>
103-
<groupId>com.rabbitmq</groupId>
104-
<artifactId>amqp-client</artifactId>
105-
<version>${amqp-client.version}</version>
106-
</dependency>
107-
108101
<dependency>
109102
<groupId>org.slf4j</groupId>
110103
<artifactId>slf4j-api</artifactId>

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 21 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import static java.time.Duration.ofMillis;
2323

2424
import com.google.common.util.concurrent.RateLimiter;
25-
import com.rabbitmq.client.Channel;
26-
import com.rabbitmq.client.Connection;
2725
import com.rabbitmq.stream.*;
2826
import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration;
2927
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
@@ -77,7 +75,6 @@
7775
import java.util.concurrent.TimeUnit;
7876
import java.util.concurrent.atomic.AtomicInteger;
7977
import java.util.concurrent.atomic.AtomicLong;
80-
import java.util.concurrent.atomic.AtomicReference;
8178
import java.util.function.BiFunction;
8279
import java.util.function.Function;
8380
import java.util.function.Supplier;
@@ -369,13 +366,13 @@ public class StreamPerfTest implements Callable<Integer> {
369366

370367
@CommandLine.Option(
371368
names = {"--super-streams", "-sst"},
372-
description = "use super streams",
369+
description = "use super streams (RabbitMQ 3.13+)",
373370
defaultValue = "false")
374371
private boolean superStreams;
375372

376373
@CommandLine.Option(
377374
names = {"--super-stream-partitions", "-ssp"},
378-
description = "number of partitions for the super streams",
375+
description = "number of partitions for the super streams (RabbitMQ 3.13+)",
379376
defaultValue = "3",
380377
converter = Utils.PositiveIntegerTypeConverter.class)
381378
private int superStreamsPartitions;
@@ -909,65 +906,33 @@ public Integer call() throws Exception {
909906

910907
streams = Utils.streams(this.streamCount, this.streams);
911908

912-
AtomicReference<Channel> amqpChannel = new AtomicReference<>();
913-
Connection amqpConnection;
914-
if (this.superStreams) {
915-
amqpConnection = Utils.amqpConnection(this.amqpUri, uris, tls, this.sniServerNames);
916-
if (this.deleteStreams) {
917-
// we keep it open for deletion, so adding a close step
918-
shutdownService.wrap(
919-
closeStep("Closing AMQP connection for super streams", () -> amqpConnection.close()));
920-
}
921-
amqpChannel.set(amqpConnection.createChannel());
922-
} else {
923-
amqpConnection = null;
924-
}
925-
926909
for (String stream : streams) {
927-
if (this.superStreams) {
928-
List<String> partitions =
929-
Utils.superStreamPartitions(stream, this.superStreamsPartitions);
930-
for (String partition : partitions) {
931-
createStream(environment, partition);
932-
}
933-
934-
Utils.declareSuperStreamExchangeAndBindings(amqpChannel.get(), stream, partitions);
935-
936-
} else {
937-
createStream(environment, stream);
938-
}
910+
createStream(environment, stream);
939911
}
940912

941913
if (this.deleteStreams) {
942914
shutdownService.wrap(
943915
closeStep(
944916
"Deleting stream(s)",
945917
() -> {
918+
java.util.function.Consumer<String> delete =
919+
s -> {
920+
if (this.superStreams) {
921+
environment.deleteSuperStream(s);
922+
} else {
923+
environment.deleteStream(s);
924+
}
925+
};
946926
for (String stream : streams) {
947-
if (this.superStreams) {
948-
List<String> partitions =
949-
Utils.superStreamPartitions(stream, this.superStreamsPartitions);
950-
for (String partition : partitions) {
951-
environment.deleteStream(partition);
952-
}
953-
Utils.deleteSuperStreamExchange(amqpChannel.get(), stream);
954-
955-
} else {
956-
LOGGER.debug("Deleting {}", stream);
957-
try {
958-
environment.deleteStream(stream);
959-
LOGGER.debug("Deleted {}", stream);
960-
} catch (Exception e) {
961-
LOGGER.warn("Could not delete stream {}: {}", stream, e.getMessage());
962-
}
927+
LOGGER.debug("Deleting {}", stream);
928+
try {
929+
delete.accept(stream);
930+
LOGGER.debug("Deleted {}", stream);
931+
} catch (Exception e) {
932+
LOGGER.warn("Could not delete stream {}: {}", stream, e.getMessage());
963933
}
964934
}
965935
}));
966-
} else {
967-
if (this.superStreams) {
968-
// we don't want to delete the super streams at the end, so we close the AMQP connection
969-
amqpConnection.close();
970-
}
971936
}
972937

973938
List<Producer> producers = Collections.synchronizedList(new ArrayList<>(this.producers));
@@ -1308,6 +1273,10 @@ private void createStream(Environment environment, String stream) {
13081273
streamCreator.initialMemberCount(this.initialMemberCount);
13091274
}
13101275

1276+
if (this.superStreams) {
1277+
streamCreator.superStream().partitions(this.superStreamsPartitions);
1278+
}
1279+
13111280
try {
13121281
streamCreator.create();
13131282
} catch (StreamException e) {

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@
1717
import static java.time.Duration.ofSeconds;
1818

1919
import com.codahale.metrics.MetricRegistry;
20-
import com.rabbitmq.client.BuiltinExchangeType;
21-
import com.rabbitmq.client.Channel;
22-
import com.rabbitmq.client.Connection;
23-
import com.rabbitmq.client.ConnectionFactory;
24-
import com.rabbitmq.client.SocketConfigurator;
25-
import com.rabbitmq.client.SocketConfigurators;
2620
import com.rabbitmq.stream.BackOffDelayPolicy;
2721
import com.rabbitmq.stream.ByteCapacity;
2822
import com.rabbitmq.stream.OffsetSpecification;
@@ -42,8 +36,6 @@
4236
import java.lang.reflect.Field;
4337
import java.lang.reflect.InvocationTargetException;
4438
import java.lang.reflect.Method;
45-
import java.net.URI;
46-
import java.security.SecureRandom;
4739
import java.security.cert.X509Certificate;
4840
import java.text.CharacterIterator;
4941
import java.text.StringCharacterIterator;
@@ -69,13 +61,8 @@
6961
import java.util.function.*;
7062
import java.util.stream.Collectors;
7163
import java.util.stream.IntStream;
72-
import javax.net.ssl.KeyManager;
7364
import javax.net.ssl.SNIHostName;
7465
import javax.net.ssl.SNIServerName;
75-
import javax.net.ssl.SSLContext;
76-
import javax.net.ssl.SSLParameters;
77-
import javax.net.ssl.SSLSocket;
78-
import javax.net.ssl.TrustManager;
7966
import javax.net.ssl.X509TrustManager;
8067
import org.slf4j.Logger;
8168
import org.slf4j.LoggerFactory;
@@ -306,86 +293,6 @@ static CommandSpec buildCommandSpec(Object... commands) {
306293
return spec;
307294
}
308295

309-
static void declareSuperStreamExchangeAndBindings(
310-
Channel channel, String superStream, List<String> streams) throws Exception {
311-
channel.exchangeDeclare(
312-
superStream,
313-
BuiltinExchangeType.DIRECT,
314-
true,
315-
false,
316-
Collections.singletonMap("x-super-stream", true));
317-
318-
for (int i = 0; i < streams.size(); i++) {
319-
channel.queueBind(
320-
streams.get(i),
321-
superStream,
322-
String.valueOf(i),
323-
Collections.singletonMap("x-stream-partition-order", i));
324-
}
325-
}
326-
327-
static void deleteSuperStreamExchange(Channel channel, String superStream) throws Exception {
328-
channel.exchangeDelete(superStream);
329-
}
330-
331-
static List<String> superStreamPartitions(String superStream, int partitionCount) {
332-
int digits = String.valueOf(partitionCount - 1).length();
333-
String format = superStream + "-%0" + digits + "d";
334-
return IntStream.range(0, partitionCount)
335-
.mapToObj(i -> String.format(format, i))
336-
.collect(Collectors.toList());
337-
}
338-
339-
static Connection amqpConnection(
340-
String amqpUri, List<String> streamUris, boolean isTls, List<SNIServerName> sniServerNames)
341-
throws Exception {
342-
ConnectionFactory connectionFactory = new ConnectionFactory();
343-
if (amqpUri == null || amqpUri.trim().isEmpty()) {
344-
String streamUriString = streamUris.get(0);
345-
if (isTls) {
346-
streamUriString = streamUriString.replaceFirst("rabbitmq-stream\\+tls", "amqps");
347-
} else {
348-
streamUriString = streamUriString.replaceFirst("rabbitmq-stream", "amqp");
349-
}
350-
URI streamUri = new URI(streamUriString);
351-
int streamPort = streamUri.getPort();
352-
if (streamPort != -1) {
353-
int defaultAmqpPort =
354-
isTls
355-
? ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
356-
: ConnectionFactory.DEFAULT_AMQP_PORT;
357-
streamUriString = streamUriString.replaceFirst(":" + streamPort, ":" + defaultAmqpPort);
358-
}
359-
connectionFactory.setUri(streamUriString);
360-
} else {
361-
connectionFactory.setUri(amqpUri);
362-
}
363-
if (isTls) {
364-
SSLContext sslContext = SSLContext.getInstance("TLS");
365-
sslContext.init(
366-
new KeyManager[] {},
367-
new TrustManager[] {TRUST_EVERYTHING_TRUST_MANAGER},
368-
new SecureRandom());
369-
connectionFactory.useSslProtocol(sslContext);
370-
if (!sniServerNames.isEmpty()) {
371-
SocketConfigurator socketConfigurator =
372-
socket -> {
373-
if (socket instanceof SSLSocket) {
374-
SSLSocket sslSocket = (SSLSocket) socket;
375-
SSLParameters sslParameters = sslSocket.getSSLParameters();
376-
sslParameters.setServerNames(sniServerNames);
377-
sslSocket.setSSLParameters(sslParameters);
378-
} else {
379-
LOGGER.warn("SNI parameter set on a non-TLS connection");
380-
}
381-
};
382-
connectionFactory.setSocketConfigurator(
383-
SocketConfigurators.defaultConfigurator().andThen(socketConfigurator));
384-
}
385-
}
386-
return connectionFactory.newConnection("stream-perf-test-amqp-connection");
387-
}
388-
389296
static String commandLineMetrics(String[] args) {
390297
Map<String, Boolean> filteredOptions = new HashMap<>();
391298
filteredOptions.put("--uris", true);

src/test/java/com/rabbitmq/stream/perf/Host.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,7 @@ static Process rabbitmqctl(String command) throws IOException {
9494
}
9595

9696
static String rabbitmqctlCommand() {
97-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
98-
if (rabbitmqCtl == null) {
99-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
100-
}
97+
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin", DOCKER_PREFIX);
10198
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
10299
String containerId = rabbitmqCtl.split(":")[1];
103100
return "docker exec " + containerId + " rabbitmqctl";

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package com.rabbitmq.stream.perf;
1616

1717
import static com.rabbitmq.stream.perf.Utils.commandLineMetrics;
18-
import static com.rabbitmq.stream.perf.Utils.superStreamPartitions;
1918
import static java.lang.String.format;
2019
import static java.util.stream.Collectors.toList;
2120
import static org.assertj.core.api.Assertions.assertThat;
@@ -327,19 +326,6 @@ void optionToEnvironmentVariable(String option, String envVariable) {
327326
assertThat(Utils.OPTION_TO_ENVIRONMENT_VARIABLE.apply(option)).isEqualTo(envVariable);
328327
}
329328

330-
@Test
331-
void superStreamPartitionsTest() {
332-
assertThat(superStreamPartitions("stream", 3))
333-
.hasSize(3)
334-
.containsExactly("stream-0", "stream-1", "stream-2");
335-
assertThat(superStreamPartitions("stream", 20))
336-
.hasSize(20)
337-
.contains("stream-00", "stream-01", "stream-10", "stream-19");
338-
assertThat(superStreamPartitions("stream-1", 20))
339-
.hasSize(20)
340-
.contains("stream-1-00", "stream-1-01", "stream-1-10", "stream-1-19");
341-
}
342-
343329
@Test
344330
void commandLineMetricsTest() {
345331
assertThat(

0 commit comments

Comments
 (0)