|
134 | 134 | import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory; |
135 | 135 | import org.apache.activemq.artemis.tests.util.CFUtil; |
136 | 136 | import org.apache.activemq.artemis.tests.util.Wait; |
| 137 | +import org.apache.activemq.artemis.utils.JsonLoader; |
137 | 138 | import org.apache.activemq.artemis.utils.RandomUtil; |
138 | 139 | import org.apache.activemq.artemis.utils.SecurityFormatter; |
139 | 140 | import org.apache.activemq.artemis.utils.UUIDGenerator; |
@@ -4442,6 +4443,74 @@ public void testListConsumers() throws Exception { |
4442 | 4443 |
|
4443 | 4444 | } |
4444 | 4445 |
|
| 4446 | + @TestTemplate |
| 4447 | + public void testListConsumersLastAckTime() throws Exception { |
| 4448 | + testListConsumersTime(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()); |
| 4449 | + } |
| 4450 | + |
| 4451 | + @TestTemplate |
| 4452 | + public void testListConsumersLastDeliveredTime() throws Exception { |
| 4453 | + testListConsumersTime(ConsumerField.LAST_DELIVERED_TIME.getName()); |
| 4454 | + } |
| 4455 | + |
| 4456 | + private void testListConsumersTime(String field) throws Exception { |
| 4457 | + SimpleString queueName = SimpleString.of(getName()); |
| 4458 | + |
| 4459 | + ActiveMQServerControl serverControl = createManagementControl(); |
| 4460 | + |
| 4461 | + Queue queue; |
| 4462 | + if (legacyCreateQueue) { |
| 4463 | + queue = server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); |
| 4464 | + } else { |
| 4465 | + queue = server.createQueue(QueueConfiguration.of(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false)); |
| 4466 | + } |
| 4467 | + |
| 4468 | + try (ServerLocator locator = createInVMNonHALocator(); |
| 4469 | + ClientSessionFactory csf = createSessionFactory(locator); |
| 4470 | + ClientSession session = csf.createSession()) { |
| 4471 | + |
| 4472 | + ClientConsumer consumer1 = session.createConsumer(queueName); |
| 4473 | + ClientConsumer consumer2 = session.createConsumer(queueName); |
| 4474 | + ClientConsumer consumer3 = session.createConsumer(queueName); |
| 4475 | + session.start(); |
| 4476 | + ClientProducer producer = session.createProducer(queueName); |
| 4477 | + producer.send(session.createMessage(true)); |
| 4478 | + ClientMessage message = consumer1.receiveImmediate(); |
| 4479 | + assertNotNull(message); |
| 4480 | + message.individualAcknowledge(); |
| 4481 | + Thread.sleep(3); |
| 4482 | + producer.send(session.createMessage(true)); |
| 4483 | + message = consumer2.receiveImmediate(); |
| 4484 | + assertNotNull(message); |
| 4485 | + message.individualAcknowledge(); |
| 4486 | + Wait.assertEquals(2L, () -> queue.getMessagesAcknowledged(), 1000, 20); |
| 4487 | + |
| 4488 | + JsonArray array = (JsonArray) JsonUtil.readJsonObject(serverControl.listConsumers(null, -1, -1)).get("data"); |
| 4489 | + long time1 = array.getJsonObject(0).getJsonNumber(field).longValue(); |
| 4490 | + long time2 = array.getJsonObject(1).getJsonNumber(field).longValue(); |
| 4491 | + long time3 = array.getJsonObject(2).getJsonNumber(field).longValue(); |
| 4492 | + |
| 4493 | + verifyConsumerCount(serverControl, createJsonFilter(field, "EQUALS", String.valueOf(time1)), queueName, 1); |
| 4494 | + verifyConsumerCount(serverControl, createJsonFilter(field, "EQUALS", String.valueOf(time2)), queueName, 1); |
| 4495 | + verifyConsumerCount(serverControl, createJsonFilter(field, "EQUALS", String.valueOf(time3)), queueName, 1); |
| 4496 | + verifyConsumerCount(serverControl, createJsonFilter(field, "GREATER_THAN", "0"), queueName, 2); |
| 4497 | + verifyConsumerCount(serverControl, createJsonFilter(field, "LESS_THAN", "1"), queueName, 1); |
| 4498 | + } |
| 4499 | + } |
| 4500 | + |
| 4501 | + private static void verifyConsumerCount(ActiveMQServerControl serverControl, String filterString, SimpleString expectedQueueName, int expectedConsumerCount) throws Exception { |
| 4502 | + String consumersAsJsonString = serverControl.listConsumers(filterString, -1, -1); |
| 4503 | + JsonArray data = (JsonArray) JsonUtil.readJsonObject(consumersAsJsonString).get("data"); |
| 4504 | + int consumerCount = 0; |
| 4505 | + for (int i = 0; i < data.size(); i++) { |
| 4506 | + String consumerQueueName = data.getJsonObject(i).getString(ConsumerField.QUEUE.getName()); |
| 4507 | + if (consumerQueueName != null && consumerQueueName.equals(expectedQueueName.toString())) { |
| 4508 | + consumerCount++; |
| 4509 | + } |
| 4510 | + } |
| 4511 | + assertEquals(expectedConsumerCount, consumerCount, "number of consumers returned from query using filter: " + filterString + "\n" + JsonLoader.prettyPrint(consumersAsJsonString)); |
| 4512 | + } |
| 4513 | + |
4445 | 4514 | @TestTemplate |
4446 | 4515 | public void testListConsumersOrder() throws Exception { |
4447 | 4516 | SimpleString queueName1 = SimpleString.of("my_queue_one"); |
|
0 commit comments