3939import com .rabbitmq .stream .impl .Client .OutboundEntityWriteCallback ;
4040import io .netty .buffer .ByteBuf ;
4141import io .netty .buffer .ByteBufAllocator ;
42+ import io .netty .buffer .UnpooledByteBufAllocator ;
4243import io .netty .channel .Channel ;
4344import io .netty .channel .ChannelFuture ;
4445import java .time .Duration ;
46+ import java .util .Queue ;
4547import java .util .Set ;
4648import java .util .concurrent .ConcurrentHashMap ;
49+ import java .util .concurrent .ConcurrentLinkedQueue ;
4750import java .util .concurrent .CountDownLatch ;
4851import java .util .concurrent .Executors ;
4952import java .util .concurrent .ScheduledExecutorService ;
@@ -70,7 +73,7 @@ public class StreamProducerUnitTest {
7073 @ Mock Channel channel ;
7174 @ Mock ChannelFuture channelFuture ;
7275
73- Set <ByteBuf > buffers = ConcurrentHashMap . newKeySet ();
76+ Queue <ByteBuf > buffers = new ConcurrentLinkedQueue <> ();
7477
7578 ScheduledExecutorService executorService ;
7679 Clock clock = new Clock ();
@@ -82,15 +85,16 @@ public class StreamProducerUnitTest {
8285 void init () {
8386 mocks = MockitoAnnotations .openMocks (this );
8487 executorService = Executors .newScheduledThreadPool (2 );
85- when (channel .alloc ()).thenReturn (Utils .byteBufAllocator ());
88+ ByteBufAllocator allocator = new UnpooledByteBufAllocator (false );
89+ when (channel .alloc ()).thenReturn (allocator );
8690 when (channel .writeAndFlush (Mockito .any ())).thenReturn (channelFuture );
8791 when (client .allocateNoCheck (any (ByteBufAllocator .class ), anyInt ()))
8892 .thenAnswer (
8993 (Answer <ByteBuf >)
9094 invocation -> {
91- ByteBufAllocator allocator = invocation .getArgument (0 );
95+ ByteBufAllocator alloc = invocation .getArgument (0 );
9296 int capacity = invocation .getArgument (1 );
93- ByteBuf buffer = allocator .buffer (capacity );
97+ ByteBuf buffer = alloc .buffer (capacity );
9498 buffers .add (buffer );
9599 return buffer ;
96100 });
0 commit comments