|
1 | | -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. |
| 1 | +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. |
| 2 | +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. |
2 | 3 | // |
3 | 4 | // This software, the RabbitMQ Java client library, is triple-licensed under the |
4 | 5 | // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 |
|
12 | 13 | // |
13 | 14 | // If you have any questions regarding licensing, please contact us at |
14 | 15 | |
15 | | - |
16 | 16 | package com.rabbitmq.client.test; |
17 | 17 |
|
18 | 18 | import static org.junit.jupiter.api.Assertions.assertTrue; |
19 | 19 |
|
20 | | -import java.io.IOException; |
21 | | -import java.util.concurrent.CountDownLatch; |
22 | | -import java.util.concurrent.Executors; |
23 | | -import java.util.concurrent.TimeUnit; |
24 | | -import java.util.concurrent.atomic.AtomicBoolean; |
25 | | - |
26 | | -import javax.net.SocketFactory; |
27 | | - |
28 | | -import org.junit.jupiter.api.Test; |
29 | | - |
30 | 20 | import com.rabbitmq.client.AMQP; |
31 | 21 | import com.rabbitmq.client.Channel; |
32 | 22 | import com.rabbitmq.client.Command; |
|
37 | 27 | import com.rabbitmq.client.impl.AMQConnection; |
38 | 28 | import com.rabbitmq.client.impl.DefaultExceptionHandler; |
39 | 29 | import com.rabbitmq.client.impl.SocketFrameHandler; |
| 30 | +import java.io.IOException; |
| 31 | +import java.util.concurrent.CountDownLatch; |
| 32 | +import java.util.concurrent.Executors; |
| 33 | +import java.util.concurrent.TimeUnit; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 35 | +import javax.net.SocketFactory; |
| 36 | +import org.junit.jupiter.api.Test; |
40 | 37 |
|
41 | | -public class CloseInMainLoop extends BrokerTestCase{ |
42 | | - private final CountDownLatch closeLatch = new CountDownLatch(1); |
43 | | - |
44 | | - private ConnectionFactory specialConnectionFactory() { |
45 | | - ConnectionFactory f = TestUtils.connectionFactory(); |
46 | | - f.setExceptionHandler(new DefaultExceptionHandler(){ |
47 | | - @Override |
48 | | - public void handleConsumerException(Channel channel, |
49 | | - Throwable exception, |
50 | | - Consumer consumer, |
51 | | - String consumerTag, |
52 | | - String methodName) { |
53 | | - try { |
54 | | - // TODO: change this to call 4-parameter close and make 6-parm one private |
55 | | - ((AMQConnection) channel.getConnection()) |
56 | | - .close(AMQP.INTERNAL_ERROR, |
57 | | - "Internal error in Consumer " + consumerTag, |
58 | | - false, |
59 | | - exception, |
60 | | - -1, |
61 | | - false); |
62 | | - } catch (Throwable e) { |
63 | | - // Man, this clearly isn't our day. |
64 | | - // TODO: Log the nested failure |
65 | | - } finally { |
66 | | - closeLatch.countDown(); |
67 | | - } |
| 38 | +public class CloseInMainLoop extends BrokerTestCase { |
| 39 | + private final CountDownLatch closeLatch = new CountDownLatch(1); |
| 40 | + |
| 41 | + private ConnectionFactory specialConnectionFactory() { |
| 42 | + ConnectionFactory f = TestUtils.connectionFactory(); |
| 43 | + f.setExceptionHandler( |
| 44 | + new DefaultExceptionHandler() { |
| 45 | + @Override |
| 46 | + public void handleConsumerException( |
| 47 | + Channel channel, |
| 48 | + Throwable exception, |
| 49 | + Consumer consumer, |
| 50 | + String consumerTag, |
| 51 | + String methodName) { |
| 52 | + try { |
| 53 | + // TODO: change this to call 4-parameter close and make 6-parm one private |
| 54 | + ((AMQConnection) channel.getConnection()) |
| 55 | + .close( |
| 56 | + AMQP.INTERNAL_ERROR, |
| 57 | + "Internal error in Consumer " + consumerTag, |
| 58 | + false, |
| 59 | + exception, |
| 60 | + -1, |
| 61 | + false); |
| 62 | + } catch (Throwable e) { |
| 63 | + // Man, this clearly isn't our day. |
| 64 | + // TODO: Log the nested failure |
| 65 | + } finally { |
| 66 | + closeLatch.countDown(); |
68 | 67 | } |
| 68 | + } |
69 | 69 | }); |
70 | | - return f; |
71 | | - } |
| 70 | + return f; |
| 71 | + } |
72 | 72 |
|
73 | | - class SpecialConnection extends AMQConnection{ |
| 73 | + class SpecialConnection extends AMQConnection { |
74 | 74 | private final AtomicBoolean validShutdown = new AtomicBoolean(false); |
75 | 75 |
|
76 | | - public boolean hadValidShutdown(){ |
77 | | - if(isOpen()) throw new IllegalStateException("hadValidShutdown called while connection is still open"); |
| 76 | + public boolean hadValidShutdown() { |
| 77 | + if (isOpen()) |
| 78 | + throw new IllegalStateException("hadValidShutdown called while connection is still open"); |
78 | 79 | return validShutdown.get(); |
79 | 80 | } |
80 | 81 |
|
81 | 82 | public SpecialConnection() throws Exception { |
82 | | - super(specialConnectionFactory().params(Executors.newFixedThreadPool(1)), |
83 | | - new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT))); |
84 | | - this.start(); |
| 83 | + super( |
| 84 | + specialConnectionFactory().params(Executors.newFixedThreadPool(1)), |
| 85 | + new SocketFrameHandler( |
| 86 | + SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT))); |
| 87 | + this.start(); |
85 | 88 | } |
86 | 89 |
|
87 | 90 | @Override |
88 | | - public boolean processControlCommand(Command c) throws IOException{ |
89 | | - if(c.getMethod() instanceof AMQP.Connection.CloseOk) validShutdown.set(true); |
| 91 | + public boolean processControlCommand(Command c) throws IOException { |
| 92 | + if (c.getMethod() instanceof AMQP.Connection.CloseOk) validShutdown.set(true); |
90 | 93 | return super.processControlCommand(c); |
91 | 94 | } |
92 | 95 | } |
93 | 96 |
|
94 | | - @Test public void closeOKNormallyReceived() throws Exception{ |
| 97 | + @Test |
| 98 | + public void closeOKNormallyReceived() throws Exception { |
95 | 99 | SpecialConnection connection = new SpecialConnection(); |
96 | 100 | connection.close(10_000); |
97 | 101 | assertTrue(connection.hadValidShutdown()); |
98 | 102 | } |
99 | 103 |
|
100 | 104 | // The thrown runtime exception should get intercepted by the |
101 | 105 | // consumer exception handler, and result in a clean shut down. |
102 | | - @Test public void closeWithFaultyConsumer() throws Exception{ |
| 106 | + @Test |
| 107 | + public void closeWithFaultyConsumer() throws Exception { |
| 108 | + String q = generateQueueName(); |
| 109 | + String x = generateExchangeName(); |
103 | 110 | SpecialConnection connection = new SpecialConnection(); |
104 | 111 | Channel channel = connection.createChannel(); |
105 | | - channel.exchangeDeclare("x", "direct"); |
106 | | - channel.queueDeclare("q", false, false, false, null); |
107 | | - channel.queueBind("q", "x", "k"); |
108 | | - |
109 | | - channel.basicConsume("q", true, new DefaultConsumer(channel){ |
110 | | - @Override |
111 | | - public void handleDelivery(String consumerTag, |
112 | | - Envelope envelope, |
113 | | - AMQP.BasicProperties properties, |
114 | | - byte[] body) { |
| 112 | + channel.exchangeDeclare(x, "direct"); |
| 113 | + channel.queueDeclare(q, false, false, false, null); |
| 114 | + channel.queueBind(q, x, "k"); |
| 115 | + |
| 116 | + channel.basicConsume( |
| 117 | + q, |
| 118 | + true, |
| 119 | + new DefaultConsumer(channel) { |
| 120 | + @Override |
| 121 | + public void handleDelivery( |
| 122 | + String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { |
115 | 123 | throw new RuntimeException("I am a bad consumer"); |
116 | | - } |
117 | | - }); |
| 124 | + } |
| 125 | + }); |
118 | 126 |
|
119 | | - channel.basicPublish("x", "k", null, new byte[10]); |
| 127 | + channel.basicPublish(x, "k", null, new byte[10]); |
120 | 128 |
|
121 | 129 | assertTrue(closeLatch.await(1000, TimeUnit.MILLISECONDS)); |
122 | 130 | assertTrue(connection.hadValidShutdown()); |
123 | 131 | } |
124 | | - |
125 | 132 | } |
0 commit comments