2727import io .netty .channel .ChannelInboundHandlerAdapter ;
2828import io .netty .channel .group .ChannelGroup ;
2929import java .nio .channels .ClosedChannelException ;
30- import lombok .extern .slf4j .Slf4j ;
31- import org .apache .bookkeeper .conf .ServerConfiguration ;
32- import org .apache .bookkeeper .processor .RequestProcessor ;
3330import java .util .ArrayList ;
3431import java .util .List ;
3532import java .util .concurrent .ArrayBlockingQueue ;
3633import java .util .concurrent .BlockingQueue ;
34+ import lombok .extern .slf4j .Slf4j ;
35+ import org .apache .bookkeeper .conf .ServerConfiguration ;
36+ import org .apache .bookkeeper .processor .RequestProcessor ;
3737
3838/**
3939 * Serverside handler for bookkeeper requests.
4242public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
4343
4444 static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object ();
45+ private static final int CAPACITY = 10_000 ;
4546
4647 private final RequestProcessor requestProcessor ;
4748 private final ChannelGroup allChannels ;
4849
4950 private ChannelHandlerContext ctx ;
50- private BlockingQueue <BookieProtocol .ParsedAddRequest > msgs ;
51+ private final BlockingQueue <BookieProtocol .ParsedAddRequest > msgs ;
5152
5253 private ByteBuf pendingSendResponses = null ;
5354 private int maxPendingResponsesSize ;
5455
5556 BookieRequestHandler (ServerConfiguration conf , RequestProcessor processor , ChannelGroup allChannels ) {
5657 this .requestProcessor = processor ;
5758 this .allChannels = allChannels ;
58- this .msgs = new ArrayBlockingQueue <>(10_000 );
59+ this .msgs = new ArrayBlockingQueue <>(CAPACITY );
5960 }
6061
6162 public ChannelHandlerContext ctx () {
@@ -102,6 +103,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
102103 && ((BookieProtocol .ParsedAddRequest ) msg ).getProtocolVersion () == BookieProtocol .CURRENT_PROTOCOL_VERSION
103104 && !((BookieProtocol .ParsedAddRequest ) msg ).isRecoveryAdd ()) {
104105 msgs .put ((BookieProtocol .ParsedAddRequest ) msg );
106+
107+ if (msgs .size () == CAPACITY ) {
108+ int count = msgs .size ();
109+ List <BookieProtocol .ParsedAddRequest > c = new ArrayList <>(count );
110+ msgs .drainTo (c , count );
111+ requestProcessor .processAddRequest (c , this );
112+ }
105113 } else {
106114 requestProcessor .processRequest (msg , this );
107115 }
@@ -110,8 +118,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
110118 @ Override
111119 public void channelReadComplete (ChannelHandlerContext ctx ) {
112120 if (!msgs .isEmpty ()) {
113- List <BookieProtocol .ParsedAddRequest > c = new ArrayList <>();
114- msgs .drainTo (c );
121+ int count = msgs .size ();
122+ List <BookieProtocol .ParsedAddRequest > c = new ArrayList <>(count );
123+ msgs .drainTo (c , count );
115124 requestProcessor .processAddRequest (c , this );
116125 }
117126 }
0 commit comments