3535
3636import java .io .IOException ;
3737import java .net .Socket ;
38- import java .net .InetSocketAddress ;
3938
40- import com .rabbitmq .client .Address ;
4139import com .rabbitmq .client .ConnectionFactory ;
42- import com .rabbitmq .client .MessageProperties ;
43- import com .rabbitmq .client .QueueingConsumer ;
44- import com .rabbitmq .client .QueueingConsumer .Delivery ;
45- import com .rabbitmq .client .impl .AMQConnection ;
40+ import com .rabbitmq .client .GetResponse ;
4641import com .rabbitmq .client .impl .Frame ;
4742import com .rabbitmq .client .impl .FrameHandler ;
4843import com .rabbitmq .client .impl .SocketFrameHandler ;
5247public class FrameMax extends BrokerTestCase {
5348 /* This value for FrameMax is larger than the minimum and less
5449 * than what Rabbit suggests. */
55- final static int FRAME_MAX = 131008 ;
50+ final static int FRAME_MAX = 70000 ;
5651 final static int REAL_FRAME_MAX = FRAME_MAX - 8 ;
57- final static int TIMEOUT = 3000 ; /* Time to wait for messages. */
58- final static String EXCHANGE_NAME = "xchg1" ;
5952 final static String ROUTING_KEY = "something" ;
6053
61- QueueingConsumer consumer ;
54+ private String queueName ;
6255
6356 public FrameMax () {
6457 connectionFactory = new MyConnectionFactory ();
65- }
66-
67- @ Override
68- protected void setUp ()
69- throws IOException
70- {
71- super .setUp ();
7258 connectionFactory .setRequestedFrameMax (FRAME_MAX );
7359 }
7460
7561 @ Override
7662 protected void createResources ()
7763 throws IOException
7864 {
79- channel .exchangeDeclare (EXCHANGE_NAME , "direct" );
80- consumer = new QueueingConsumer (channel );
81- String queueName = channel .queueDeclare ().getQueue ();
82- channel .basicConsume (queueName , consumer );
83- channel .queueBind (queueName , EXCHANGE_NAME , ROUTING_KEY );
84- }
85-
86- @ Override
87- protected void releaseResources ()
88- throws IOException
89- {
90- consumer = null ;
91- channel .exchangeDelete (EXCHANGE_NAME );
65+ queueName = channel .queueDeclare ().getQueue ();
66+ channel .queueBind (queueName , "" , ROUTING_KEY );
9267 }
9368
9469 /* Frame content should be less or equal to frame-max - 8. */
@@ -97,59 +72,25 @@ public void testFrameSizes()
9772 {
9873 /* This should result in at least 3 frames. */
9974 int howMuch = 2 *FRAME_MAX ;
100- produce ( howMuch );
75+ basicPublishVolatile ( new byte [ howMuch ], ROUTING_KEY );
10176 /* Receive everything that was sent out. */
10277 while (howMuch > 0 ) {
10378 try {
104- Delivery delivery = consumer . nextDelivery ( TIMEOUT );
105- howMuch -= delivery .getBody ().length ;
106- } catch (RuntimeException e ) {
107- fail (e .toString ());
79+ GetResponse response = channel . basicGet ( queueName , false );
80+ howMuch -= response .getBody ().length ;
81+ } catch (Exception e ) {
82+ fail (e .getCause (). toString ());
10883 }
10984 }
11085 }
11186
112- /* Send out howMuch worth of gibberish */
113- protected void produce (int howMuch )
114- throws IOException
115- {
116- while (howMuch > 0 ) {
117- int size = (howMuch <= (REAL_FRAME_MAX )) ? howMuch : (REAL_FRAME_MAX );
118- publish (new byte [size ]);
119- howMuch -= (REAL_FRAME_MAX );
120- }
121- }
122-
123- /* Publish a non-persistant, non-immediate message. */
124- private void publish (byte [] msg )
125- throws IOException
126- {
127- channel .basicPublish (EXCHANGE_NAME , ROUTING_KEY ,
128- false , false ,
129- MessageProperties .MINIMAL_BASIC ,
130- msg );
131- }
132-
13387 /* ConnectionFactory that uses MyFrameHandler rather than
13488 * SocketFrameHandler. */
13589 private static class MyConnectionFactory extends ConnectionFactory {
136- protected FrameHandler createFrameHandler (Address addr )
90+ protected FrameHandler createFrameHandler (Socket sock )
13791 throws IOException
13892 {
139- String hostName = addr .getHost ();
140- int portNumber = portOrDefault (addr .getPort ());
141- Socket socket = getSocketFactory ().createSocket ();
142- configureSocket (socket );
143- socket .connect (new InetSocketAddress (hostName , portNumber ));
144- return new MyFrameHandler (socket );
145- }
146-
147- /* Copy-pasted from ConnectionFactory. Should be protected,
148- * rather than private. */
149- private int portOrDefault (int port ){
150- if (port != USE_DEFAULT_PORT ) return port ;
151- else if (isSSL ()) return DEFAULT_AMQP_OVER_SSL_PORT ;
152- else return DEFAULT_AMQP_PORT ;
93+ return new MyFrameHandler (sock );
15394 }
15495 }
15596
@@ -165,25 +106,10 @@ public Frame readFrame() throws IOException {
165106 Frame f = super .readFrame ();
166107 int size = f .getPayload ().length ;
167108 if (size > REAL_FRAME_MAX )
168- throw new FrameTooLargeException (size , REAL_FRAME_MAX );
109+ fail ("Received frame of size " + size
110+ + ", which exceeds " + REAL_FRAME_MAX + "." );
169111 //System.out.printf("Received a frame of size %d.\n", f.getPayload().length);
170112 return f ;
171113 }
172114 }
173-
174- private static class FrameTooLargeException extends RuntimeException {
175- private int _frameSize ;
176- private int _maxSize ;
177-
178- public FrameTooLargeException (int frameSize , int maxSize ) {
179- _frameSize = frameSize ;
180- _maxSize = maxSize ;
181- }
182-
183- @ Override
184- public String toString () {
185- return "Received frame of size " + _frameSize
186- + ", which exceeds " + _maxSize + "." ;
187- }
188- }
189115}
0 commit comments