Skip to content

Commit 495c608

Browse files
committed
Use dedicated channel to create resources
This avoids re-using the exact same channel between producers/consumers when using queue pattern option and exclusive resources. References #106
1 parent a1e669d commit 495c608

File tree

2 files changed

+16
-28
lines changed

2 files changed

+16
-28
lines changed

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ public Producer createProducer(Connection connection, Stats stats, MulticastSet.
354354
public Consumer createConsumer(Connection connection, Stats stats, MulticastSet.CompletionHandler completionHandler) throws IOException {
355355
TopologyHandlerResult topologyHandlerResult = this.topologyHandler.configureQueuesForClient(connection);
356356
connection = topologyHandlerResult.connection;
357-
Channel channel = topologyHandlerResult.channel;
357+
Channel channel = connection.createChannel();
358358
if (consumerTxSize > 0) channel.txSelect();
359359
if (consumerPrefetch > 0) channel.basicQos(consumerPrefetch);
360360
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
@@ -553,20 +553,13 @@ static class TopologyHandlerResult {
553553
*/
554554
final Connection connection;
555555

556-
/**
557-
* The channel used to create the configured resources.
558-
* Must be used by the agent that asked to declare resources.
559-
*/
560-
final Channel channel;
561-
562556
/**
563557
* The configured queues.
564558
*/
565559
final List<String> configuredQueues;
566560

567-
TopologyHandlerResult(Connection connection, Channel channel, List<String> configuredQueues) {
561+
TopologyHandlerResult(Connection connection, List<String> configuredQueues) {
568562
this.connection = connection;
569-
this.channel = channel;
570563
this.configuredQueues = configuredQueues;
571564
}
572565
}
@@ -601,7 +594,8 @@ protected Connection maybeUseCachedConnection(List<String> queues, Connection co
601594
return connectionToUse;
602595
}
603596

604-
protected List<String> configureQueues(Connection connection, Channel channel, List<String> queues, Runnable afterQueueConfigurationCallback) throws IOException {
597+
protected List<String> configureQueues(Connection connection, List<String> queues, Runnable afterQueueConfigurationCallback) throws IOException {
598+
Channel channel = connection.createChannel();
605599
if (!params.predeclared || !exchangeExists(connection, params.exchangeName)) {
606600
channel.exchangeDeclare(params.exchangeName, params.exchangeType);
607601
}
@@ -632,6 +626,7 @@ protected List<String> configureQueues(Connection connection, Channel channel, L
632626
}
633627
afterQueueConfigurationCallback.run();
634628
}
629+
channel.abort();
635630
return generatedQueueNames;
636631
}
637632

@@ -665,22 +660,20 @@ public String getRoutingKey() {
665660
public TopologyHandlerResult configureQueuesForClient(Connection connection) throws IOException {
666661
if (this.params.isExclusive()) {
667662
Connection connectionToUse = maybeUseCachedConnection(this.queueNames, connection);
668-
Channel channel = connectionToUse.createChannel();
669663
return new TopologyHandlerResult(
670-
connectionToUse, channel, configureQueues(connectionToUse, channel, this.queueNames, () -> {})
664+
connectionToUse, configureQueues(connectionToUse, this.queueNames, () -> {})
671665
);
672666
} else {
673-
Channel channel = connection.createChannel();
674667
return new TopologyHandlerResult(
675-
connection, channel, configureQueues(connection, channel, this.queueNames, () -> {})
668+
connection, configureQueues(connection, this.queueNames, () -> {})
676669
) ;
677670
}
678671
}
679672

680673
@Override
681674
public List<String> configureAllQueues(Connection connection) throws IOException {
682675
if (shouldConfigureQueues() && !this.params.isExclusive()) {
683-
return configureQueues(connection, connection.createChannel(), this.queueNames, () -> {});
676+
return configureQueues(connection, this.queueNames, () -> {});
684677
}
685678
return null;
686679
}
@@ -729,17 +722,13 @@ public String getRoutingKey() {
729722
public TopologyHandlerResult configureQueuesForClient(Connection connection) throws IOException {
730723
if (this.params.isExclusive()) {
731724
Connection connectionToUse = maybeUseCachedConnection(getQueueNamesForClient(), connection);
732-
Channel channel = connectionToUse.createChannel();
733725
return new TopologyHandlerResult(
734726
connectionToUse,
735-
channel,
736-
configureQueues(connectionToUse, channel, getQueueNamesForClient(), () -> {})
727+
configureQueues(connectionToUse, getQueueNamesForClient(), () -> {})
737728
);
738729
} else {
739-
Channel channel = connection.createChannel();
740730
return new TopologyHandlerResult(
741731
connection,
742-
channel,
743732
getQueueNamesForClient()
744733
);
745734
}
@@ -752,8 +741,7 @@ public List<String> configureAllQueues(Connection connection) throws IOException
752741
if (this.params.isExclusive()) {
753742
return null;
754743
} else {
755-
Channel channel = connection.createChannel();
756-
return configureQueues(connection, channel, getQueueNames(), () -> this.next());
744+
return configureQueues(connection, getQueueNames(), () -> this.next());
757745
}
758746
}
759747

src/test/java/com/rabbitmq/perf/TopologyTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void defaultParameters()
143143
set.run();
144144

145145
verify(cf, times(1 + 1 + 1)).newConnection(anyString()); // consumers, producers, configuration (not used)
146-
verify(c, times(0 + 1 + 1)).createChannel(); // queue configuration (none), consumer, producer
146+
verify(c, times(1 + 1 + 1)).createChannel(); // queue configuration, consumer, producer
147147
verify(ch, times(1))
148148
.queueDeclare(eq(""), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
149149
verify(ch, times(1))
@@ -164,7 +164,7 @@ public void nProducersAndConsumer()
164164
set.run();
165165

166166
verify(cf, times(10 + 10 + 1)).newConnection(anyString()); // consumers, producers, configuration (not used)
167-
verify(c, times(0 + 10 + 10)).createChannel(); // queue configuration (none), consumer, producer
167+
verify(c, times(10 + 10 + 10)).createChannel(); // queue configuration, consumer, producer
168168
verify(ch, times(10))
169169
.queueDeclare(eq(""), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
170170
verify(ch, times(10))
@@ -187,7 +187,7 @@ public void producers1Consumers2QueueSpecified() throws Exception {
187187
set.run();
188188

189189
verify(cf, times(2 + 1 + 1)).newConnection(anyString()); // consumers, producers, configuration (not used)
190-
verify(c, times(0 + 2 + 1)).createChannel(); // queue configuration (none), consumer, producer
190+
verify(c, times(2 + 2 + 1)).createChannel(); // queue configuration, consumer, producer
191191
verify(ch, times(2))
192192
.queueDeclare(eq(queue), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
193193
verify(ch, times(2))
@@ -210,7 +210,7 @@ public void producers2Consumers4QueueSpecified() throws Exception {
210210
set.run();
211211

212212
verify(cf, times(4 + 2 + 1)).newConnection(anyString()); // consumers, producers, configuration (not used)
213-
verify(c, times(0 + 4 + 2)).createChannel(); // queue configuration (none), consumer, producer
213+
verify(c, times(4 + 4 + 2)).createChannel(); // queue configuration, consumer, producer
214214
verify(ch, times(4))
215215
.queueDeclare(eq(queue), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
216216
verify(ch, times(4))
@@ -261,7 +261,7 @@ public void qosIsSetOnTheChannel() throws Exception {
261261
set.run();
262262

263263
verify(cf, times(1 + 1 + 1)).newConnection(anyString()); // consumers, producers, configuration (not used)
264-
verify(c, times(0 + 1 + 1)).createChannel(); // queue configuration (none), consumer, producer
264+
verify(c, times(1 + 1 + 1)).createChannel(); // queue configuration, consumer, producer
265265
verify(ch, times(1))
266266
.queueDeclare(eq(""), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
267267
verify(ch, times(1))
@@ -382,7 +382,7 @@ public void exclusiveQueue(String exclusive) throws Exception {
382382
set.run();
383383

384384
verify(cf, times(1 + 1 + 1)).newConnection(anyString()); // consumers, producers, configuration (not used)
385-
verify(c, times(0 + 1 + 1)).createChannel(); // queue configuration (none), consumer, producer
385+
verify(c, times(1 + 1 + 1)).createChannel(); // queue configuration, consumer, producer
386386
verify(ch, times(1))
387387
.queueDeclare(eq(""), anyBoolean(), eq(valueOf(exclusive)), anyBoolean(), isNull());
388388
verify(ch, times(1))

0 commit comments

Comments
 (0)