File tree Expand file tree Collapse file tree 3 files changed +32
-4
lines changed
main/java/com/rabbitmq/client/amqp
test/java/com/rabbitmq/client/amqp/impl Expand file tree Collapse file tree 3 files changed +32
-4
lines changed Original file line number Diff line number Diff line change @@ -71,8 +71,9 @@ public interface Management extends AutoCloseable {
7171 * Purge (delete all messages) from a queue.
7272 *
7373 * @param queue queue to delete messages from
74+ * @return the status of the purge operation
7475 */
75- void queuePurge (String queue );
76+ PurgeStatus queuePurge (String queue );
7677
7778 /**
7879 * Start exchange specification.
@@ -948,4 +949,14 @@ interface QueueInfo {
948949 */
949950 int consumerCount ();
950951 }
952+
953+ interface PurgeStatus {
954+
955+ /**
956+ * The number of messages purged from the queue.
957+ *
958+ * @return the number of messages purged
959+ */
960+ long messageCount ();
961+ }
951962}
Original file line number Diff line number Diff line change @@ -183,11 +183,13 @@ public UnbindSpecification unbind() {
183183 }
184184
185185 @ Override
186- public void queuePurge (String queue ) {
186+ public PurgeStatus queuePurge (String queue ) {
187187 Map <String , Object > responseBody = delete (queueLocation (queue ) + "/messages" , CODE_200 );
188- if (!responseBody .containsKey ("message_count" )) {
188+ if (!responseBody .containsKey ("message_count" )
189+ && !(responseBody .get ("message_count" ) instanceof Number )) {
189190 throw new AmqpException ("Response body should contain message_count" );
190191 }
192+ return new DefaultPurgeStatus (((Number ) responseBody .get ("message_count" )).longValue ());
191193 }
192194
193195 void setToken (String token ) {
@@ -856,4 +858,18 @@ enum State {
856858 UNAVAILABLE ,
857859 CLOSED
858860 }
861+
862+ private static final class DefaultPurgeStatus implements PurgeStatus {
863+
864+ private final long messageCount ;
865+
866+ private DefaultPurgeStatus (long messageCount ) {
867+ this .messageCount = messageCount ;
868+ }
869+
870+ @ Override
871+ public long messageCount () {
872+ return this .messageCount ;
873+ }
874+ }
859875}
Original file line number Diff line number Diff line change @@ -725,7 +725,8 @@ void queuePurgeShouldRemoveAllMessages(TestInfo info) {
725725 .forEach (ignored -> publisher .publish (publisher .message (), callback ));
726726 assertThat (publishSync ).completes ();
727727 assertThat (management .queueInfo (q )).hasMessageCount (messageCount );
728- management .queuePurge (q );
728+ Management .PurgeStatus purgeStatus = management .queuePurge (q );
729+ org .assertj .core .api .Assertions .assertThat (purgeStatus .messageCount ()).isEqualTo (messageCount );
729730 assertThat (management .queueInfo (q )).isEmpty ();
730731 }
731732
You can’t perform that action at this time.
0 commit comments