3535
3636import java .io .IOException ;
3737import java .net .Socket ;
38- import java .net .InetSocketAddress ;
3938
40- import com .rabbitmq .client .Address ;
4139import com .rabbitmq .client .ConnectionFactory ;
42- import com .rabbitmq .client .MessageProperties ;
43- import com .rabbitmq .client .QueueingConsumer ;
44- import com .rabbitmq .client .QueueingConsumer .Delivery ;
45- import com .rabbitmq .client .impl .AMQConnection ;
40+ import com .rabbitmq .client .GetResponse ;
4641import com .rabbitmq .client .impl .Frame ;
4742import com .rabbitmq .client .impl .FrameHandler ;
4843import com .rabbitmq .client .impl .SocketFrameHandler ;
@@ -54,11 +49,9 @@ public class FrameMax extends BrokerTestCase {
5449 * than what Rabbit suggests. */
5550 final static int FRAME_MAX = 131008 ;
5651 final static int REAL_FRAME_MAX = FRAME_MAX - 8 ;
57- final static int TIMEOUT = 3000 ; /* Time to wait for messages. */
58- final static String EXCHANGE_NAME = "xchg1" ;
5952 final static String ROUTING_KEY = "something" ;
6053
61- QueueingConsumer consumer ;
54+ private String queueName ;
6255
6356 public FrameMax () {
6457 connectionFactory = new MyConnectionFactory ();
@@ -76,19 +69,8 @@ protected void setUp()
7669 protected void createResources ()
7770 throws IOException
7871 {
79- channel .exchangeDeclare (EXCHANGE_NAME , "direct" );
80- consumer = new QueueingConsumer (channel );
81- String queueName = channel .queueDeclare ().getQueue ();
82- channel .basicConsume (queueName , consumer );
83- channel .queueBind (queueName , EXCHANGE_NAME , ROUTING_KEY );
84- }
85-
86- @ Override
87- protected void releaseResources ()
88- throws IOException
89- {
90- consumer = null ;
91- channel .exchangeDelete (EXCHANGE_NAME );
72+ queueName = channel .queueDeclare ().getQueue ();
73+ channel .queueBind (queueName , "" , ROUTING_KEY );
9274 }
9375
9476 /* Frame content should be less or equal to frame-max - 8. */
@@ -101,8 +83,8 @@ public void testFrameSizes()
10183 /* Receive everything that was sent out. */
10284 while (howMuch > 0 ) {
10385 try {
104- Delivery delivery = consumer . nextDelivery ( TIMEOUT );
105- howMuch -= delivery .getBody ().length ;
86+ GetResponse response = channel . basicGet ( queueName , false );
87+ howMuch -= response .getBody ().length ;
10688 } catch (RuntimeException e ) {
10789 fail (e .toString ());
10890 }
@@ -115,41 +97,18 @@ protected void produce(int howMuch)
11597 {
11698 while (howMuch > 0 ) {
11799 int size = (howMuch <= (REAL_FRAME_MAX )) ? howMuch : (REAL_FRAME_MAX );
118- publish (new byte [size ]);
119- howMuch -= ( REAL_FRAME_MAX ) ;
100+ basicPublishVolatile (new byte [size ], ROUTING_KEY );
101+ howMuch -= size ;
120102 }
121103 }
122104
123- /* Publish a non-persistant, non-immediate message. */
124- private void publish (byte [] msg )
125- throws IOException
126- {
127- channel .basicPublish (EXCHANGE_NAME , ROUTING_KEY ,
128- false , false ,
129- MessageProperties .MINIMAL_BASIC ,
130- msg );
131- }
132-
133105 /* ConnectionFactory that uses MyFrameHandler rather than
134106 * SocketFrameHandler. */
135107 private static class MyConnectionFactory extends ConnectionFactory {
136- protected FrameHandler createFrameHandler (Address addr )
108+ protected FrameHandler createFrameHandler (Socket sock )
137109 throws IOException
138110 {
139- String hostName = addr .getHost ();
140- int portNumber = portOrDefault (addr .getPort ());
141- Socket socket = getSocketFactory ().createSocket ();
142- configureSocket (socket );
143- socket .connect (new InetSocketAddress (hostName , portNumber ));
144- return new MyFrameHandler (socket );
145- }
146-
147- /* Copy-pasted from ConnectionFactory. Should be protected,
148- * rather than private. */
149- private int portOrDefault (int port ){
150- if (port != USE_DEFAULT_PORT ) return port ;
151- else if (isSSL ()) return DEFAULT_AMQP_OVER_SSL_PORT ;
152- else return DEFAULT_AMQP_PORT ;
111+ return new MyFrameHandler (sock );
153112 }
154113 }
155114
0 commit comments