3434import com .rabbitmq .client .test .BrokerTestCase ;
3535
3636import java .io .IOException ;
37+ import java .net .Socket ;
38+ import java .net .InetSocketAddress ;
3739
40+ import com .rabbitmq .client .Address ;
41+ import com .rabbitmq .client .ConnectionFactory ;
3842import com .rabbitmq .client .MessageProperties ;
3943import com .rabbitmq .client .QueueingConsumer ;
4044import com .rabbitmq .client .QueueingConsumer .Delivery ;
45+ import com .rabbitmq .client .impl .AMQConnection ;
46+ import com .rabbitmq .client .impl .Frame ;
47+ import com .rabbitmq .client .impl .FrameHandler ;
48+ import com .rabbitmq .client .impl .SocketFrameHandler ;
4149
4250/* Publish a message of size FRAME_MAX. The broker should split this
4351 * into two frames before sending back. */
4452public class FrameMax extends BrokerTestCase {
4553 /* This value for FrameMax is larger than the minimum and less
4654 * than what Rabbit suggests. */
4755 final static int FRAME_MAX = 131008 ;
56+ final static int REAL_FRAME_MAX = FRAME_MAX - 8 ;
4857 final static int TIMEOUT = 3000 ; /* Time to wait for messages. */
4958 final static String EXCHANGE_NAME = "xchg1" ;
5059 final static String ROUTING_KEY = "something" ;
5160
5261 QueueingConsumer consumer ;
5362
63+ public FrameMax () {
64+ connectionFactory = new MyConnectionFactory ();
65+ }
66+
5467 @ Override
5568 protected void setUp ()
5669 throws IOException
@@ -82,14 +95,17 @@ protected void releaseResources()
8295 public void testFrameSizes ()
8396 throws IOException , InterruptedException
8497 {
85- int howMuch = FRAME_MAX ;
98+ /* This should result in at least 3 frames. */
99+ int howMuch = 2 *FRAME_MAX ;
86100 produce (howMuch );
87101 /* Receive everything that was sent out. */
88102 while (howMuch > 0 ) {
89- Delivery delivery = consumer .nextDelivery (TIMEOUT );
90- int received = delivery .getBody ().length ;
91- assertTrue (received <= FRAME_MAX - 8 );
92- howMuch -= received ;
103+ try {
104+ Delivery delivery = consumer .nextDelivery (TIMEOUT );
105+ howMuch -= delivery .getBody ().length ;
106+ } catch (RuntimeException e ) {
107+ fail (e .toString ());
108+ }
93109 }
94110 }
95111
@@ -98,9 +114,9 @@ protected void produce(int howMuch)
98114 throws IOException
99115 {
100116 while (howMuch > 0 ) {
101- int size = (howMuch <= (FRAME_MAX - 8 )) ? howMuch : (FRAME_MAX - 8 );
117+ int size = (howMuch <= (REAL_FRAME_MAX )) ? howMuch : (REAL_FRAME_MAX );
102118 publish (new byte [size ]);
103- howMuch -= (FRAME_MAX - 8 );
119+ howMuch -= (REAL_FRAME_MAX );
104120 }
105121 }
106122
@@ -113,4 +129,61 @@ private void publish(byte[] msg)
113129 MessageProperties .MINIMAL_BASIC ,
114130 msg );
115131 }
132+
133+ /* ConnectionFactory that uses MyFrameHandler rather than
134+ * SocketFrameHandler. */
135+ private static class MyConnectionFactory extends ConnectionFactory {
136+ protected FrameHandler createFrameHandler (Address addr )
137+ throws IOException
138+ {
139+ String hostName = addr .getHost ();
140+ int portNumber = portOrDefault (addr .getPort ());
141+ Socket socket = getSocketFactory ().createSocket ();
142+ configureSocket (socket );
143+ socket .connect (new InetSocketAddress (hostName , portNumber ));
144+ return new MyFrameHandler (socket );
145+ }
146+
147+ /* Copy-pasted from ConnectionFactory. Should be protected,
148+ * rather than private. */
149+ private int portOrDefault (int port ){
150+ if (port != USE_DEFAULT_PORT ) return port ;
151+ else if (isSSL ()) return DEFAULT_AMQP_OVER_SSL_PORT ;
152+ else return DEFAULT_AMQP_PORT ;
153+ }
154+ }
155+
156+ /* FrameHandler with added frame-max error checking. */
157+ private static class MyFrameHandler extends SocketFrameHandler {
158+ public MyFrameHandler (Socket socket )
159+ throws IOException
160+ {
161+ super (socket );
162+ }
163+
164+ public Frame readFrame () throws IOException {
165+ Frame f = super .readFrame ();
166+ int size = f .getPayload ().length ;
167+ if (size > REAL_FRAME_MAX )
168+ throw new FrameTooLargeException (size , REAL_FRAME_MAX );
169+ //System.out.printf("Received a frame of size %d.\n", f.getPayload().length);
170+ return f ;
171+ }
172+ }
173+
174+ private static class FrameTooLargeException extends RuntimeException {
175+ private int _frameSize ;
176+ private int _maxSize ;
177+
178+ public FrameTooLargeException (int frameSize , int maxSize ) {
179+ _frameSize = frameSize ;
180+ _maxSize = maxSize ;
181+ }
182+
183+ @ Override
184+ public String toString () {
185+ return "Received frame of size " + _frameSize
186+ + ", which exceeds " + _maxSize + "." ;
187+ }
188+ }
116189}
0 commit comments