Skip to content

Commit bd4b3ee

Browse files
authored
Merge pull request #810 from rabbitmq/dispatch-only-when-channel-is-open
Dispatch messages only when channel is open
2 parents c2e4371 + f9ec634 commit bd4b3ee

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ private void startBasicGetConsumer() {
214214
try {
215215
GetResponse response = ch.basicGet(queue, autoAck);
216216
if (response != null) {
217-
delegate.handleMessage(
217+
delegate.maybeHandleMessage(
218218
response.getEnvelope(), response.getProps(), response.getBody(), ch);
219219
}
220220
} catch (IOException e) {
@@ -280,7 +280,14 @@ private ConsumerImpl(Channel channel) {
280280
public void handleDelivery(
281281
String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
282282
throws IOException {
283-
this.handleMessage(envelope, properties, body, channel);
283+
this.maybeHandleMessage(envelope, properties, body, channel);
284+
}
285+
286+
private void maybeHandleMessage(
287+
Envelope envelope, BasicProperties properties, byte[] body, Channel ch) throws IOException {
288+
if (ch.isOpen()) {
289+
handleMessage(envelope, properties, body, ch);
290+
}
284291
}
285292

286293
void handleMessage(Envelope envelope, BasicProperties properties, byte[] body, Channel ch)

src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,11 @@ public void consumerCount(int consumersCount, int channelsCount) throws Exceptio
337337
Channel channel =
338338
proxy(
339339
Channel.class,
340+
callback(
341+
"isOpen",
342+
(proxy, method, args) -> {
343+
return true;
344+
}),
340345
callback(
341346
"basicConsume",
342347
(proxy, method, args) -> {
@@ -458,6 +463,11 @@ void shouldAckOneLastTimeWhenQueueIsEmpty() throws Exception {
458463
Channel channel =
459464
proxy(
460465
Channel.class,
466+
callback(
467+
"isOpen",
468+
(proxy, method, args) -> {
469+
return true;
470+
}),
461471
callback(
462472
"basicConsume",
463473
(proxy, method, args) -> {
@@ -924,6 +934,11 @@ public void pollingWithBasicGet(int consumersCount, int channelsCount) throws Ex
924934
Channel channel =
925935
proxy(
926936
Channel.class,
937+
callback(
938+
"isOpen",
939+
(proxy, method, args) -> {
940+
return true;
941+
}),
927942
callback(
928943
"basicGet",
929944
(proxy, method, args) -> {
@@ -1014,6 +1029,11 @@ public void ackNack(String nackParameter) throws Exception {
10141029
Channel channel =
10151030
proxy(
10161031
Channel.class,
1032+
callback(
1033+
"isOpen",
1034+
(proxy, method, args) -> {
1035+
return true;
1036+
}),
10171037
callback(
10181038
"basicConsume",
10191039
(proxy, method, args) -> {

0 commit comments

Comments
 (0)