|
| 1 | +package com.rabbitmq.client.test.server; |
| 2 | + |
| 3 | +import com.rabbitmq.client.GetResponse; |
| 4 | +import com.rabbitmq.client.test.BrokerTestCase; |
| 5 | +import com.rabbitmq.tools.Host; |
| 6 | + |
| 7 | +import java.io.IOException; |
| 8 | +import java.util.List; |
| 9 | +import java.util.Map; |
| 10 | + |
| 11 | +public class Firehose extends BrokerTestCase { |
| 12 | + private String q; |
| 13 | + private String firehose; |
| 14 | + |
| 15 | + @Override |
| 16 | + protected void createResources() throws IOException { |
| 17 | + super.createResources(); |
| 18 | + channel.exchangeDeclare("trace", "fanout", false, true, null); |
| 19 | + channel.exchangeDeclare("test", "fanout", false, true, null); |
| 20 | + q = channel.queueDeclare().getQueue(); |
| 21 | + firehose = channel.queueDeclare().getQueue(); |
| 22 | + channel.queueBind(q, "test", ""); |
| 23 | + channel.queueBind(firehose, "trace", ""); |
| 24 | + } |
| 25 | + |
| 26 | + public void testFirehose() throws IOException { |
| 27 | + publishGet("not traced"); |
| 28 | + enable(); |
| 29 | + GetResponse msg = publishGet("traced"); |
| 30 | + disable(); |
| 31 | + publishGet("not traced"); |
| 32 | + |
| 33 | + GetResponse publish = basicGet(firehose); |
| 34 | + GetResponse deliver = basicGet(firehose); |
| 35 | + |
| 36 | + assertNotNull(publish); |
| 37 | + assertNotNull(deliver); |
| 38 | + assertDelivered(firehose, 0); |
| 39 | + |
| 40 | + // We don't test everything, that would be a bit OTT |
| 41 | + checkHeaders(publish.getProps().getHeaders()); |
| 42 | + |
| 43 | + Map<String,Object> delHeaders = deliver.getProps().getHeaders(); |
| 44 | + checkHeaders(delHeaders); |
| 45 | + assertNotNull(delHeaders.get("delivery_tag")); |
| 46 | + assertNotNull(delHeaders.get("redelivered")); |
| 47 | + |
| 48 | + assertEquals(msg.getBody().length, publish.getBody().length); |
| 49 | + assertEquals(msg.getBody().length, deliver.getBody().length); |
| 50 | + } |
| 51 | + |
| 52 | + private GetResponse publishGet(String key) throws IOException { |
| 53 | + basicPublishVolatile("test", key); |
| 54 | + return basicGet(q); |
| 55 | + } |
| 56 | + |
| 57 | + private void checkHeaders(Map<String, Object> pubHeaders) { |
| 58 | + assertEquals("test", pubHeaders.get("exchange_name").toString()); |
| 59 | + List<Object> routing_keys = (List<Object>) pubHeaders.get("routing_keys"); |
| 60 | + assertEquals("traced", routing_keys.get(0).toString()); |
| 61 | + } |
| 62 | + |
| 63 | + private void enable() throws IOException { |
| 64 | + Host.executeCommand("cd ../rabbitmq-server; ./scripts/rabbitmqctl set_env '{trace_exchange, <<\"/\">>}' '<<\"trace\">>'"); |
| 65 | + } |
| 66 | + |
| 67 | + private void disable() throws IOException { |
| 68 | + Host.executeCommand("cd ../rabbitmq-server; ./scripts/rabbitmqctl unset_env '{trace_exchange, <<\"/\">>}'"); |
| 69 | + } |
| 70 | +} |
0 commit comments