|
1 | 1 | package com.rabbitmq.client.test.functional; |
2 | 2 |
|
3 | 3 | import com.rabbitmq.client.AMQP; |
| 4 | +import com.rabbitmq.client.AMQP.BasicProperties; |
4 | 5 | import com.rabbitmq.client.Channel; |
5 | 6 | import com.rabbitmq.client.DefaultConsumer; |
6 | 7 | import com.rabbitmq.client.Envelope; |
@@ -173,6 +174,35 @@ public void testDeadLetterDeletedDLX() throws Exception { |
173 | 174 | consumeN(DLQ, MSG_COUNT, WithResponse.NULL); |
174 | 175 | } |
175 | 176 |
|
| 177 | + public void testDeadLetterPerMessageTTLRemoved() throws Exception { |
| 178 | + declareQueue(TEST_QUEUE_NAME, DLX, null, null, 1); |
| 179 | + channel.queueBind(TEST_QUEUE_NAME, "amq.direct", "test"); |
| 180 | + channel.queueBind(DLQ, DLX, "test"); |
| 181 | + |
| 182 | + final BasicProperties props = MessageProperties.BASIC; |
| 183 | + props.setExpiration("100"); |
| 184 | + publish(props, "test message"); |
| 185 | + |
| 186 | + // The message's expiration property should have been removed, thus |
| 187 | + // after 100ms of hitting the queue, the message should get routed to |
| 188 | + // the DLQ *AND* should remain there, not getting removed after a subsequent |
| 189 | + // wait time > 100ms |
| 190 | + sleep(500); |
| 191 | + consumeN(DLQ, 1, new WithResponse() { |
| 192 | + @SuppressWarnings("unchecked") |
| 193 | + public void process(GetResponse getResponse) { |
| 194 | + Map<String, Object> headers = getResponse.getProps().getHeaders(); |
| 195 | + assertNotNull(headers); |
| 196 | + ArrayList<Object> death = (ArrayList<Object>)headers.get("x-death"); |
| 197 | + assertNotNull(death); |
| 198 | + assertDeathReason(death, 0, TEST_QUEUE_NAME, "expired"); |
| 199 | + final Map<String, Object> deathHeader = |
| 200 | + (Map<String, Object>)death.get(0); |
| 201 | + assertEquals("100", deathHeader.get("expiration").toString()); |
| 202 | + } |
| 203 | + }); |
| 204 | + } |
| 205 | + |
176 | 206 | public void testDeadLetterExchangeDeleteTwice() |
177 | 207 | throws IOException |
178 | 208 | { |
|
0 commit comments