44import com .rabbitmq .client .impl .nio .BlockingQueueNioQueue ;
55import com .rabbitmq .client .impl .nio .DefaultByteBufferFactory ;
66import com .rabbitmq .client .impl .nio .NioParams ;
7+ import org .assertj .core .api .Condition ;
78import org .junit .After ;
89import org .junit .Before ;
910import org .junit .Test ;
1415import java .util .concurrent .*;
1516import java .util .concurrent .atomic .AtomicInteger ;
1617
17- import static org .hamcrest .Matchers .hasSize ;
18- import static org .hamcrest .Matchers .isOneOf ;
18+ import static org .assertj .core .api .Assertions .assertThat ;
1919import static org .junit .Assert .assertEquals ;
20- import static org .junit .Assert .assertThat ;
2120import static org .junit .Assert .assertTrue ;
2221
2322/**
@@ -125,19 +124,21 @@ public void shutdownCompleted(ShutdownSignalException cause) {
125124 public void nioLoopCleaning () throws Exception {
126125 ConnectionFactory connectionFactory = new ConnectionFactory ();
127126 connectionFactory .useNio ();
128- for (int i = 0 ; i < 10 ; i ++) {
127+ for (int i = 0 ; i < 10 ; i ++) {
129128 Connection connection = connectionFactory .newConnection ();
130129 connection .abort ();
131130 }
132131 }
133132
134- @ Test public void messageSize () throws Exception {
133+ @ Test
134+ public void messageSize () throws Exception {
135135 for (int i = 0 ; i < 50 ; i ++) {
136136 sendAndVerifyMessage (testConnection , 76390 );
137137 }
138138 }
139139
140- @ Test public void byteBufferFactory () throws Exception {
140+ @ Test
141+ public void byteBufferFactory () throws Exception {
141142 ConnectionFactory cf = new ConnectionFactory ();
142143 cf .useNio ();
143144 int baseCapacity = 32768 ;
@@ -155,12 +156,15 @@ public void nioLoopCleaning() throws Exception {
155156 sendAndVerifyMessage (c , 100 );
156157 }
157158
158- assertThat (byteBuffers , hasSize (2 ));
159- assertThat (byteBuffers .get (0 ).capacity (), isOneOf (nioParams .getReadByteBufferSize (), nioParams .getWriteByteBufferSize ()));
160- assertThat (byteBuffers .get (1 ).capacity (), isOneOf (nioParams .getReadByteBufferSize (), nioParams .getWriteByteBufferSize ()));
159+ assertThat (byteBuffers ).hasSize (2 );
160+ Condition <Integer > condition = new Condition <>(c -> c == nioParams .getReadByteBufferSize () ||
161+ c == nioParams .getWriteByteBufferSize (), "capacity set by factory" );
162+ assertThat (byteBuffers .get (0 ).capacity ()).is (condition );
163+ assertThat (byteBuffers .get (1 ).capacity ()).is (condition );
161164 }
162165
163- @ Test public void directByteBuffers () throws Exception {
166+ @ Test
167+ public void directByteBuffers () throws Exception {
164168 ConnectionFactory cf = new ConnectionFactory ();
165169 cf .useNio ();
166170 cf .setNioParams (new NioParams ().setByteBufferFactory (new DefaultByteBufferFactory (capacity -> ByteBuffer .allocateDirect (capacity ))));
@@ -169,15 +173,16 @@ public void nioLoopCleaning() throws Exception {
169173 }
170174 }
171175
172- @ Test public void customWriteQueue () throws Exception {
176+ @ Test
177+ public void customWriteQueue () throws Exception {
173178 ConnectionFactory cf = new ConnectionFactory ();
174179 cf .useNio ();
175180 AtomicInteger count = new AtomicInteger (0 );
176181 cf .setNioParams (new NioParams ().setWriteQueueFactory (ctx -> {
177182 count .incrementAndGet ();
178183 return new BlockingQueueNioQueue (
179- new LinkedBlockingQueue <>(ctx .getNioParams ().getWriteQueueCapacity ()),
180- ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
184+ new LinkedBlockingQueue <>(ctx .getNioParams ().getWriteQueueCapacity ()),
185+ ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
181186 );
182187 }));
183188 try (Connection c = cf .newConnection ()) {
@@ -193,7 +198,7 @@ private void sendAndVerifyMessage(Connection connection, int size) throws Except
193198 }
194199
195200 private Connection basicGetBasicConsume (ConnectionFactory connectionFactory , String queue , final CountDownLatch latch )
196- throws IOException , TimeoutException {
201+ throws IOException , TimeoutException {
197202 Connection connection = connectionFactory .newConnection ();
198203 Channel channel = connection .createChannel ();
199204 channel .queueDeclare (queue , false , false , false , null );
@@ -213,7 +218,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
213218 }
214219
215220 private boolean basicGetBasicConsume (Connection connection , String queue , final CountDownLatch latch , int msgSize )
216- throws Exception {
221+ throws Exception {
217222 Channel channel = connection .createChannel ();
218223 channel .queueDeclare (queue , false , false , false , null );
219224 channel .queuePurge (queue );
0 commit comments