Skip to content

Commit db0b75f

Browse files
committed
Throw close reason if resource already closed
1 parent dac61c0 commit db0b75f

File tree

2 files changed

+58
-47
lines changed

2 files changed

+58
-47
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/ResourceBase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ abstract class ResourceBase implements Resource {
2828

2929
private final AtomicReference<State> state = new AtomicReference<>();
3030
private final StateEventSupport stateEventSupport;
31+
private volatile Throwable closeReason;
3132

3233
ResourceBase(List<StateListener> listeners) {
3334
this.stateEventSupport = new StateEventSupport(listeners);
@@ -36,7 +37,9 @@ abstract class ResourceBase implements Resource {
3637

3738
protected void checkOpen() {
3839
State state = this.state.get();
39-
if (state == CLOSED) {
40+
if (state != OPEN && this.closeReason instanceof AmqpException) {
41+
throw (AmqpException) this.closeReason;
42+
} else if (state == CLOSED) {
4043
throw new AmqpException.AmqpResourceClosedException("Resource is closed");
4144
} else if (state != OPEN) {
4245
throw new AmqpException.AmqpResourceInvalidStateException(
@@ -55,6 +58,9 @@ protected void state(Resource.State state) {
5558
protected void state(Resource.State state, Throwable failureCause) {
5659
Resource.State previousState = this.state.getAndSet(state);
5760
if (state != previousState) {
61+
if ((state == CLOSING || state == CLOSED) && this.closeReason == null) {
62+
this.closeReason = failureCause;
63+
}
5864
this.dispatch(previousState, state, failureCause);
5965
}
6066
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
2222
import static com.rabbitmq.client.amqp.Management.QueueType.*;
2323
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
24+
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
2425
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3;
2526
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2627
import static java.nio.charset.StandardCharsets.*;
@@ -64,7 +65,7 @@ void queueInfoTest() {
6465
management.queue(name).quorum().queue().declare();
6566

6667
Management.QueueInfo queueInfo = management.queueInfo(name);
67-
Assertions.assertThat(queueInfo)
68+
assertThat(queueInfo)
6869
.hasName(name)
6970
.is(QUORUM)
7071
.isDurable()
@@ -100,10 +101,10 @@ void queueDeclareDeletePublishConsume(String subject) {
100101
acceptedCallback(confirmSync));
101102
});
102103

103-
Assertions.assertThat(confirmSync).completes();
104+
assertThat(confirmSync).completes();
104105

105106
Management.QueueInfo queueInfo = connection.management().queueInfo(name);
106-
Assertions.assertThat(queueInfo).hasName(name).hasNoConsumers().hasMessageCount(messageCount);
107+
assertThat(queueInfo).hasName(name).hasNoConsumers().hasMessageCount(messageCount);
107108

108109
AtomicReference<String> receivedSubject = new AtomicReference<>();
109110
Sync consumeSync = TestUtils.sync(messageCount);
@@ -119,11 +120,13 @@ void queueDeclareDeletePublishConsume(String subject) {
119120
})
120121
.build();
121122

122-
Assertions.assertThat(consumeSync).completes();
123-
assertThat(receivedSubject).doesNotHaveNullValue().hasValue(subject);
123+
assertThat(consumeSync).completes();
124+
org.assertj.core.api.Assertions.assertThat(receivedSubject)
125+
.doesNotHaveNullValue()
126+
.hasValue(subject);
124127

125128
queueInfo = connection.management().queueInfo(name);
126-
Assertions.assertThat(queueInfo).hasConsumerCount(1).isEmpty();
129+
assertThat(queueInfo).hasConsumerCount(1).isEmpty();
127130

128131
consumer.close();
129132
publisher.close();
@@ -181,7 +184,7 @@ void binding(String prefix, boolean addBindingArguments, TestInfo info) {
181184
range(0, messageCount).forEach(ignored -> publish.accept(publisher1));
182185
range(0, messageCount).forEach(ignored -> publish.accept(publisher2));
183186

184-
Assertions.assertThat(confirmSync).completes();
187+
assertThat(confirmSync).completes();
185188

186189
Sync consumeSync = sync(messageCount * 2);
187190
com.rabbitmq.client.amqp.Consumer consumer =
@@ -194,7 +197,7 @@ void binding(String prefix, boolean addBindingArguments, TestInfo info) {
194197
consumeSync.down();
195198
})
196199
.build();
197-
Assertions.assertThat(consumeSync).completes();
200+
assertThat(consumeSync).completes();
198201
publisher1.close();
199202
publisher2.close();
200203
consumer.close();
@@ -239,8 +242,10 @@ void sameTypeMessagesInQueue() {
239242
publisher.publish(publisher.message("one".getBytes(UTF_8)), ctx -> {});
240243
publisher.publish(publisher.message("two".getBytes(UTF_8)), ctx -> {});
241244

242-
com.rabbitmq.client.amqp.impl.Assertions.assertThat(consumeLatch).completes();
243-
assertThat(messageBodies).hasSize(2).containsOnly("one".getBytes(UTF_8), "two".getBytes(UTF_8));
245+
assertThat(consumeLatch).completes();
246+
org.assertj.core.api.Assertions.assertThat(messageBodies)
247+
.hasSize(2)
248+
.containsOnly("one".getBytes(UTF_8), "two".getBytes(UTF_8));
244249
}
245250

246251
@Test
@@ -252,7 +257,7 @@ void pauseShouldStopMessageArrivalUnpauseShouldResumeIt() {
252257
Publisher.Callback callback = ctx -> publishLatch.countDown();
253258
range(0, messageCount).forEach(ignored -> publisher.publish(publisher.message(), callback));
254259

255-
com.rabbitmq.client.amqp.impl.Assertions.assertThat(publishLatch).completes();
260+
assertThat(publishLatch).completes();
256261

257262
int initialCredits = 10;
258263
Set<com.rabbitmq.client.amqp.Consumer.Context> messageContexts = ConcurrentHashMap.newKeySet();
@@ -266,10 +271,10 @@ void pauseShouldStopMessageArrivalUnpauseShouldResumeIt() {
266271

267272
waitAtMost(() -> messageContexts.size() == initialCredits);
268273

269-
Assertions.assertThat(connection.management().queueInfo(q))
270-
.hasMessageCount(messageCount - initialCredits);
274+
assertThat(connection.management().queueInfo(q)).hasMessageCount(messageCount - initialCredits);
271275

272-
assertThat(Cli.queueInfo(q).unackedMessageCount()).isEqualTo(initialCredits);
276+
org.assertj.core.api.Assertions.assertThat(Cli.queueInfo(q).unackedMessageCount())
277+
.isEqualTo(initialCredits);
273278

274279
consumer.pause();
275280
new ArrayList<>(messageContexts).forEach(com.rabbitmq.client.amqp.Consumer.Context::accept);
@@ -306,7 +311,7 @@ void publisherSendingShouldThrowWhenExchangeHasBeenDeleted() {
306311
connection.management().binding().sourceExchange(name).destinationQueue(q).bind();
307312
Sync sync = sync();
308313
publisher.publish(publisher.message(), acceptedCallback(sync));
309-
Assertions.assertThat(sync).completes();
314+
assertThat(sync).completes();
310315
} finally {
311316
connection.management().exchangeDeletion().delete(name);
312317
}
@@ -321,11 +326,11 @@ void publisherSendingShouldThrowWhenExchangeHasBeenDeleted() {
321326
return true;
322327
}
323328
});
324-
Assertions.assertThat(closedSync).completes();
329+
assertThat(closedSync).completes();
325330
of(exception.get(), closedException.get())
326331
.forEach(
327332
e ->
328-
assertThat(e)
333+
org.assertj.core.api.Assertions.assertThat(e)
329334
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class)
330335
.hasMessageContaining(name)
331336
.hasMessageContaining(ExceptionUtils.ERROR_NOT_FOUND));
@@ -354,7 +359,7 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() {
354359
try {
355360
Sync sync = sync();
356361
publisher.publish(publisher.message(), acceptedCallback(sync));
357-
Assertions.assertThat(sync).completes();
362+
assertThat(sync).completes();
358363
} finally {
359364
connection.management().queueDeletion().delete(name);
360365
}
@@ -369,11 +374,11 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() {
369374
return true;
370375
}
371376
});
372-
Assertions.assertThat(closedSync).completes();
377+
assertThat(closedSync).completes();
373378
of(exception.get(), closedException.get())
374379
.forEach(
375380
e ->
376-
assertThat(e)
381+
org.assertj.core.api.Assertions.assertThat(e)
377382
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class)
378383
.hasMessageContaining(ExceptionUtils.ERROR_RESOURCE_DELETED));
379384
}
@@ -397,8 +402,8 @@ void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToPropert
397402
Sync acceptedSync = sync();
398403
publisher.publish(
399404
publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down());
400-
Assertions.assertThat(acceptedSync).completes();
401-
Assertions.assertThat(consumedSync).completes();
405+
assertThat(acceptedSync).completes();
406+
assertThat(consumedSync).completes();
402407

403408
acceptedSync.reset();
404409
consumedSync.reset();
@@ -407,13 +412,13 @@ void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToPropert
407412
publisher.publish(
408413
publisher.message().toAddress().exchange(doesNotExist).message(),
409414
ctx -> rejectedSync.down());
410-
Assertions.assertThat(rejectedSync).completes();
415+
assertThat(rejectedSync).completes();
411416

412-
Assertions.assertThat(consumedSync).hasNotCompleted();
417+
assertThat(consumedSync).hasNotCompleted();
413418
publisher.publish(
414419
publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down());
415-
Assertions.assertThat(acceptedSync).completes();
416-
Assertions.assertThat(consumedSync).completes();
420+
assertThat(acceptedSync).completes();
421+
assertThat(consumedSync).completes();
417422
}
418423

419424
@Test
@@ -455,10 +460,10 @@ void consumerShouldGetClosedWhenQueueIsDeleted() {
455460
.build();
456461
Publisher publisher = connection.publisherBuilder().queue(name).build();
457462
publisher.publish(publisher.message(), ctx -> {});
458-
Assertions.assertThat(consumeSync).completes();
463+
assertThat(consumeSync).completes();
459464
connection.management().queueDeletion().delete(name);
460-
Assertions.assertThat(closedSync).completes();
461-
assertThat(exception.get())
465+
assertThat(closedSync).completes();
466+
org.assertj.core.api.Assertions.assertThat(exception.get())
462467
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class)
463468
.hasMessageContaining(ExceptionUtils.ERROR_RESOURCE_DELETED);
464469
}
@@ -472,7 +477,7 @@ void consumerPauseThenClose() {
472477
Sync publishSync = sync(messageCount);
473478
range(0, messageCount)
474479
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
475-
Assertions.assertThat(publishSync).completes();
480+
assertThat(publishSync).completes();
476481

477482
AtomicInteger receivedCount = new AtomicInteger(0);
478483
Set<com.rabbitmq.client.amqp.Consumer.Context> unsettledMessages =
@@ -493,13 +498,13 @@ void consumerPauseThenClose() {
493498
.build();
494499

495500
int unsettledMessageCount = waitUntilStable(unsettledMessages::size);
496-
assertThat(unsettledMessageCount).isNotZero();
501+
org.assertj.core.api.Assertions.assertThat(unsettledMessageCount).isNotZero();
497502
consumer.pause();
498503
int receivedCountAfterPausing = receivedCount.get();
499504
unsettledMessages.forEach(com.rabbitmq.client.amqp.Consumer.Context::accept);
500505
consumer.close();
501-
assertThat(receivedCount).hasValue(receivedCountAfterPausing);
502-
Assertions.assertThat(connection.management().queueInfo(name))
506+
org.assertj.core.api.Assertions.assertThat(receivedCount).hasValue(receivedCountAfterPausing);
507+
assertThat(connection.management().queueInfo(name))
503508
.hasMessageCount(messageCount - receivedCount.get());
504509
}
505510

@@ -512,7 +517,7 @@ void consumerGracefulShutdownExample() {
512517
Sync publishSync = sync(messageCount);
513518
range(0, messageCount)
514519
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
515-
Assertions.assertThat(publishSync).completes();
520+
assertThat(publishSync).completes();
516521

517522
AtomicInteger receivedCount = new AtomicInteger(0);
518523
Random random = new Random();
@@ -534,7 +539,7 @@ void consumerGracefulShutdownExample() {
534539
consumer.pause();
535540
waitAtMost(() -> consumer.unsettledMessageCount() == 0);
536541
consumer.close();
537-
Assertions.assertThat(connection.management().queueInfo(name))
542+
assertThat(connection.management().queueInfo(name))
538543
.hasMessageCount(messageCount - receivedCount.get());
539544
}
540545

@@ -548,7 +553,7 @@ void consumerUnsettledMessagesGoBackToQueueAfterClosing() {
548553
Sync publishSync = sync(messageCount);
549554
range(0, messageCount)
550555
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
551-
Assertions.assertThat(publishSync).completes();
556+
assertThat(publishSync).completes();
552557

553558
AtomicInteger receivedCount = new AtomicInteger(0);
554559
com.rabbitmq.client.amqp.Consumer consumer =
@@ -567,7 +572,7 @@ void consumerUnsettledMessagesGoBackToQueueAfterClosing() {
567572

568573
waitAtMost(() -> receivedCount.get() > settledCount);
569574
consumer.close();
570-
Assertions.assertThat(connection.management().queueInfo(name))
575+
assertThat(connection.management().queueInfo(name))
571576
.hasMessageCount(messageCount - settledCount);
572577
}
573578

@@ -612,17 +617,17 @@ void consumerWithHigherPriorityShouldGetMessagesFirst() {
612617

613618
publish.run();
614619

615-
Assertions.assertThat(consumeSync).completes();
616-
assertThat(lowCount).hasValue(0);
617-
assertThat(highCount).hasValue(messageCount);
620+
assertThat(consumeSync).completes();
621+
org.assertj.core.api.Assertions.assertThat(lowCount).hasValue(0);
622+
org.assertj.core.api.Assertions.assertThat(highCount).hasValue(messageCount);
618623

619624
highPriorityConsumer.close();
620625

621626
consumeSync.reset(messageCount);
622627
publish.run();
623-
Assertions.assertThat(consumeSync).completes();
624-
assertThat(lowCount).hasValue(messageCount);
625-
assertThat(highCount).hasValue(messageCount);
628+
assertThat(consumeSync).completes();
629+
org.assertj.core.api.Assertions.assertThat(lowCount).hasValue(messageCount);
630+
org.assertj.core.api.Assertions.assertThat(highCount).hasValue(messageCount);
626631

627632
lowPriorityConsumer.close();
628633
}
@@ -649,7 +654,7 @@ void redeclareExchangesWithDifferentArguments() {
649654
management.exchange(name).type(FANOUT).declare();
650655
fail("Declaring an existing exchange with different arguments should trigger an exception");
651656
} catch (AmqpException e) {
652-
assertThat(e).hasMessageContaining("409");
657+
org.assertj.core.api.Assertions.assertThat(e).hasMessageContaining("409");
653658
// OK
654659
} finally {
655660
management.exchangeDeletion().delete(name);
@@ -670,7 +675,7 @@ void declareQueueWithUnsupportedArgument() {
670675
operation.accept(management);
671676
fail("Creating a queue with unsupported arguments should trigger an exception");
672677
} catch (AmqpException e) {
673-
assertThat(e).hasMessageContaining("409");
678+
org.assertj.core.api.Assertions.assertThat(e).hasMessageContaining("409");
674679
}
675680
});
676681
}
@@ -696,7 +701,7 @@ void publishedMessageShouldBeRejectedWhenQueueLimitIsReached(TestInfo info) {
696701
Publisher publisher = connection.publisherBuilder().queue(q).build();
697702
IntStream.range(0, maxLength + 1)
698703
.forEach(ignored -> publisher.publish(publisher.message(), callback));
699-
Assertions.assertThat(rejectedLatch).completes();
704+
assertThat(rejectedLatch).completes();
700705
} finally {
701706
management.queueDeletion().delete(q);
702707
}

0 commit comments

Comments
 (0)