Skip to content

Commit 92f920e

Browse files
committed
Use 1-thread executor per connection to dispatch messages to consumers
1 parent 991640c commit 92f920e

File tree

4 files changed

+64
-49
lines changed

4 files changed

+64
-49
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
7373
private final String name;
7474
private final Lock instanceLock = new ReentrantLock();
7575
private final boolean filterExpressionsSupported;
76+
private volatile ExecutorService dispatchingExecutorService;
7677

7778
AmqpConnection(AmqpConnectionBuilder builder) {
7879
super(builder.listeners());
@@ -547,22 +548,21 @@ Session nativeSession(boolean check) {
547548
if (check) {
548549
checkOpen();
549550
}
551+
550552
Session result = this.nativeSession;
551-
if (result == null) {
552-
this.instanceLock.lock();
553-
try {
554-
result = this.nativeSession;
555-
if (result == null) {
556-
if (check) {
557-
checkOpen();
558-
}
559-
this.nativeSession = result = this.openSession(this.nativeConnection);
560-
}
561-
} finally {
562-
this.instanceLock.unlock();
553+
if (result != null) {
554+
return result;
555+
}
556+
557+
this.instanceLock.lock();
558+
try {
559+
if (this.nativeSession == null) {
560+
this.nativeSession = this.openSession(this.nativeConnection);
563561
}
562+
return this.nativeSession;
563+
} finally {
564+
this.instanceLock.unlock();
564565
}
565-
return result;
566566
}
567567

568568
private Session openSession(org.apache.qpid.protonj2.client.Connection connection) {
@@ -585,6 +585,27 @@ ScheduledExecutorService scheduledExecutorService() {
585585
return this.environment.scheduledExecutorService();
586586
}
587587

588+
ExecutorService dispatchingExecutorService() {
589+
checkOpen();
590+
591+
ExecutorService result = this.dispatchingExecutorService;
592+
if (result != null) {
593+
return result;
594+
}
595+
596+
this.instanceLock.lock();
597+
try {
598+
if (this.dispatchingExecutorService == null) {
599+
this.dispatchingExecutorService =
600+
Executors.newSingleThreadExecutor(
601+
Utils.threadFactory("dispatching-" + this.name + "-"));
602+
}
603+
return this.dispatchingExecutorService;
604+
} finally {
605+
this.instanceLock.unlock();
606+
}
607+
}
608+
588609
Clock clock() {
589610
return this.environment.clock();
590611
}
@@ -714,6 +735,14 @@ private void close(Throwable cause) {
714735
for (AmqpConsumer consumer : this.consumers) {
715736
consumer.close();
716737
}
738+
try {
739+
this.dispatchingExecutorService.shutdownNow();
740+
} catch (Exception e) {
741+
LOGGER.info(
742+
"Error while shutting down dispatching executor service for connection '{}': {}",
743+
this.name(),
744+
e.getMessage());
745+
}
717746
try {
718747
org.apache.qpid.protonj2.client.Connection nc = this.nativeConnection;
719748
if (nc != null) {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
6969
private final SessionHandler sessionHandler;
7070
private final AtomicLong unsettledMessageCount = new AtomicLong(0);
7171
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
72-
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
72+
private final ExecutorService dispatchingExecutorService;
7373
private final java.util.function.Consumer<Delivery> nativeHandler;
7474
private final java.util.function.Consumer<ClientException> nativeReceiverCloseHandler;
7575
// native receiver internal state, accessed only in the native executor/scheduler
@@ -99,10 +99,11 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
9999
this.connection = builder.connection();
100100
this.sessionHandler = this.connection.createSessionHandler();
101101

102+
this.dispatchingExecutorService = connection.dispatchingExecutorService();
102103
this.nativeHandler = createNativeHandler(messageHandler);
103104
this.nativeReceiverCloseHandler =
104105
e ->
105-
this.executorService.submit(
106+
this.dispatchingExecutorService.submit(
106107
() -> {
107108
// get result to make spotbugs happy
108109
boolean ignored = maybeCloseConsumerOnException(this, e);
@@ -228,7 +229,7 @@ private java.util.function.Consumer<Delivery> createNativeHandler(MessageHandler
228229
return delivery -> {
229230
this.unsettledMessageCount.incrementAndGet();
230231
this.metricsCollector.consume();
231-
this.executorService.submit(
232+
this.dispatchingExecutorService.submit(
232233
() -> {
233234
AmqpMessage message;
234235
try {
@@ -330,11 +331,6 @@ private void close(Throwable cause) {
330331
if (this.closed.compareAndSet(false, true)) {
331332
this.state(CLOSING, cause);
332333
this.connection.removeConsumer(this);
333-
try {
334-
this.executorService.shutdownNow();
335-
} catch (Exception e) {
336-
LOGGER.warn("Error while closing consumer executor service");
337-
}
338334
try {
339335
this.nativeReceiver.close();
340336
this.sessionHandler.close();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ class AmqpManagement implements Management {
8282
private final TopologyListener topologyListener;
8383
private final Supplier<String> nameSupplier;
8484
private final AtomicReference<State> state = new AtomicReference<>(CREATED);
85-
// private final AtomicBoolean initializing = new AtomicBoolean(false);
8685
private volatile boolean initializing = false;
8786
private final Lock initializationLock = new ReentrantLock();
8887
private final Duration receiveLoopIdleTimeout;
@@ -208,7 +207,7 @@ void init() {
208207
if (!this.initializing) {
209208
try {
210209
initializationLock.lock();
211-
if (!this.initializing) {
210+
if (!this.initializing && this.state() != OPEN) {
212211
this.initializing = true;
213212
LOGGER.debug("Initializing management ({}).", this);
214213
this.state(UNAVAILABLE);

src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
1818
package com.rabbitmq.client.amqp.perf;
1919

20+
import static com.rabbitmq.client.amqp.Management.ExchangeType.DIRECT;
2021
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
2122
import static com.rabbitmq.client.amqp.impl.TestUtils.environmentBuilder;
2223

2324
import com.rabbitmq.client.amqp.*;
25+
import com.rabbitmq.client.amqp.impl.TestUtils;
2426
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
2527
import com.rabbitmq.client.amqp.metrics.MicrometerMetricsCollector;
2628
import com.sun.net.httpserver.HttpServer;
@@ -33,7 +35,6 @@
3335
import java.nio.charset.StandardCharsets;
3436
import java.util.concurrent.*;
3537
import java.util.concurrent.atomic.AtomicBoolean;
36-
import java.util.concurrent.atomic.AtomicInteger;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

@@ -55,9 +56,8 @@ public static void main(String[] args) throws IOException {
5556
PrintWriter out = new PrintWriter(System.out, true);
5657
PerformanceMetrics metrics = new PerformanceMetrics(registry, executorService, out);
5758

58-
// String e = TestUtils.name(AmqpPerfTest.class, "main");
59-
// String q = TestUtils.name(AmqpPerfTest.class, "main");
60-
String q = "qq";
59+
String e = TestUtils.name(AmqpPerfTest.class, "main");
60+
String q = TestUtils.name(AmqpPerfTest.class, "main");
6161
String rk = "foo";
6262
Environment environment = environmentBuilder().metricsCollector(collector).build();
6363
Connection connection = environment.connectionBuilder().build();
@@ -76,19 +76,18 @@ public static void main(String[] args) throws IOException {
7676
metrics.close();
7777
executorService.shutdownNow();
7878
shutdownLatch.countDown();
79-
// management.queueDeletion().delete(q);
80-
// management.exchangeDeletion().delete(e);
79+
management.queueDeletion().delete(q);
80+
management.exchangeDeletion().delete(e);
8181
management.close();
8282
}
8383
};
8484

8585
Runtime.getRuntime().addShutdownHook(new Thread(shutdownSequence::run));
8686
try {
87-
// management.exchange().name(e).type(DIRECT).declare();
87+
management.exchange().name(e).type(DIRECT).declare();
8888
management.queue().name(q).type(QUORUM).declare();
89-
// management.binding().sourceExchange(e).destinationQueue(q).key(rk).bind();
89+
management.binding().sourceExchange(e).destinationQueue(q).key(rk).bind();
9090

91-
AtomicInteger count = new AtomicInteger(0);
9291
connection
9392
.consumerBuilder()
9493
.listeners(
@@ -102,17 +101,13 @@ public static void main(String[] args) throws IOException {
102101
.messageHandler(
103102
(context, message) -> {
104103
context.accept();
105-
if (count.incrementAndGet() == 1_000_000) {
106-
shutdownLatch.countDown();
104+
try {
105+
long time = readLong(message.body());
106+
metrics.latency(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
107+
} catch (Exception ex) {
108+
// not able to read the body, maybe not a message from the
109+
// tool
107110
}
108-
// try {
109-
// long time = readLong(message.body());
110-
// metrics.latency(System.currentTimeMillis() - time,
111-
// TimeUnit.MILLISECONDS);
112-
// } catch (Exception ex) {
113-
// // not able to read the body, maybe not a message from the
114-
// // tool
115-
// }
116111
})
117112
.build();
118113

@@ -122,13 +117,12 @@ public static void main(String[] args) throws IOException {
122117
Publisher publisher =
123118
connection
124119
.publisherBuilder()
125-
.queue(q)
126-
// .exchange(e)
127-
// .key(rk)
120+
.exchange(e)
121+
.key(rk)
128122
.listeners(
129123
context -> {
130124
if (context.currentState() == Resource.State.OPEN) {
131-
// shouldPublish.set(true);
125+
shouldPublish.set(true);
132126
} else {
133127
if (context.currentState() == Resource.State.RECOVERING) {
134128
LOGGER.info("Publisher is recovering...");
@@ -146,9 +140,6 @@ public static void main(String[] args) throws IOException {
146140
} catch (Exception ex) {
147141
// not able to read the body, should not happen
148142
}
149-
if (count.incrementAndGet() == 1_000_000) {
150-
shutdownLatch.countDown();
151-
}
152143
};
153144
int msgSize = 10;
154145
while (!Thread.currentThread().isInterrupted()) {

0 commit comments

Comments
 (0)