|
20 | 20 | import com.rabbitmq.client.Command; |
21 | 21 | import com.rabbitmq.client.Connection; |
22 | 22 | import com.rabbitmq.client.ConnectionFactory; |
| 23 | +import com.rabbitmq.client.DefaultConsumer; |
| 24 | +import com.rabbitmq.client.Envelope; |
23 | 25 | import com.rabbitmq.client.TrafficListener; |
24 | 26 | import org.junit.Test; |
25 | 27 | import org.junit.runner.RunWith; |
26 | 28 | import org.junit.runners.Parameterized; |
27 | 29 |
|
| 30 | +import java.io.IOException; |
28 | 31 | import java.util.List; |
29 | 32 | import java.util.UUID; |
30 | 33 | import java.util.concurrent.CopyOnWriteArrayList; |
31 | 34 | import java.util.concurrent.CountDownLatch; |
32 | 35 | import java.util.concurrent.TimeUnit; |
33 | | -import java.util.function.Consumer; |
34 | 36 |
|
35 | 37 | import static org.junit.Assert.assertEquals; |
36 | 38 | import static org.junit.Assert.assertTrue; |
|
42 | 44 | public class TrafficListenerTest { |
43 | 45 |
|
44 | 46 | @Parameterized.Parameter |
45 | | - public Consumer<ConnectionFactory> configurator; |
| 47 | + public ConnectionFactoryConfigurator configurator; |
46 | 48 |
|
47 | 49 | @Parameterized.Parameters |
48 | 50 | public static Object[] data() { |
49 | 51 | return new Object[] { automaticRecoveryEnabled(), automaticRecoveryDisabled() }; |
50 | 52 | } |
51 | 53 |
|
52 | | - static Consumer<ConnectionFactory> automaticRecoveryEnabled() { |
53 | | - return cf -> cf.setAutomaticRecoveryEnabled(true); |
| 54 | + static ConnectionFactoryConfigurator automaticRecoveryEnabled() { |
| 55 | + return new ConnectionFactoryConfigurator() { |
| 56 | + |
| 57 | + @Override |
| 58 | + public void configure(ConnectionFactory cf) { |
| 59 | + cf.setAutomaticRecoveryEnabled(true); |
| 60 | + } |
| 61 | + }; |
54 | 62 | } |
55 | 63 |
|
56 | | - static Consumer<ConnectionFactory> automaticRecoveryDisabled() { |
57 | | - return cf -> cf.setAutomaticRecoveryEnabled(false); |
| 64 | + static ConnectionFactoryConfigurator automaticRecoveryDisabled() { |
| 65 | + return new ConnectionFactoryConfigurator() { |
| 66 | + |
| 67 | + @Override |
| 68 | + public void configure(ConnectionFactory cf) { |
| 69 | + cf.setAutomaticRecoveryEnabled(false); |
| 70 | + } |
| 71 | + }; |
58 | 72 | } |
59 | 73 |
|
60 | 74 | @Test |
61 | 75 | public void trafficListenerIsCalled() throws Exception { |
62 | 76 | ConnectionFactory cf = TestUtils.connectionFactory(); |
63 | 77 | TestTrafficListener testTrafficListener = new TestTrafficListener(); |
64 | 78 | cf.setTrafficListener(testTrafficListener); |
65 | | - configurator.accept(cf); |
66 | | - try (Connection c = cf.newConnection()) { |
| 79 | + configurator.configure(cf); |
| 80 | + Connection c = cf.newConnection(); |
| 81 | + try { |
67 | 82 | Channel ch = c.createChannel(); |
68 | 83 | String queue = ch.queueDeclare().getQueue(); |
69 | | - CountDownLatch latch = new CountDownLatch(1); |
70 | | - ch.basicConsume(queue, true, |
71 | | - (consumerTag, message) -> latch.countDown(), consumerTag -> { |
72 | | - }); |
| 84 | + final CountDownLatch latch = new CountDownLatch(1); |
| 85 | + ch.basicConsume(queue, true, new DefaultConsumer(ch) { |
| 86 | + |
| 87 | + @Override |
| 88 | + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| 89 | + latch.countDown(); |
| 90 | + } |
| 91 | + } |
| 92 | + ); |
73 | 93 | String messageContent = UUID.randomUUID().toString(); |
74 | 94 | ch.basicPublish("", queue, null, messageContent.getBytes()); |
75 | 95 | assertTrue(latch.await(5, TimeUnit.SECONDS)); |
76 | 96 | assertEquals(1, testTrafficListener.outboundContent.size()); |
77 | 97 | assertEquals(messageContent, testTrafficListener.outboundContent.get(0)); |
78 | 98 | assertEquals(1, testTrafficListener.inboundContent.size()); |
79 | 99 | assertEquals(messageContent, testTrafficListener.inboundContent.get(0)); |
| 100 | + } finally { |
| 101 | + TestUtils.close(c); |
80 | 102 | } |
81 | 103 | } |
82 | 104 |
|
| 105 | + interface ConnectionFactoryConfigurator { |
| 106 | + |
| 107 | + void configure(ConnectionFactory connectionFactory); |
| 108 | + } |
| 109 | + |
83 | 110 | private static class TestTrafficListener implements TrafficListener { |
84 | 111 |
|
85 | | - final List<String> outboundContent = new CopyOnWriteArrayList<>(); |
86 | | - final List<String> inboundContent = new CopyOnWriteArrayList<>(); |
| 112 | + final List<String> outboundContent = new CopyOnWriteArrayList<String>(); |
| 113 | + final List<String> inboundContent = new CopyOnWriteArrayList<String>(); |
87 | 114 |
|
88 | 115 | @Override |
89 | 116 | public void write(Command outboundCommand) { |
|
0 commit comments