1616import org .elasticsearch .common .bytes .ReleasableBytesReference ;
1717import org .elasticsearch .common .io .stream .StreamInput ;
1818import org .elasticsearch .common .recycler .Recycler ;
19- import org .elasticsearch .common .unit .ByteSizeUnit ;
20- import org .elasticsearch .common .unit .ByteSizeValue ;
2119import org .elasticsearch .core .CheckedConsumer ;
2220import org .elasticsearch .core .Releasable ;
2321import org .elasticsearch .core .Releasables ;
@@ -36,21 +34,17 @@ public class InboundDecoder implements Releasable {
3634 private int bytesConsumed = 0 ;
3735 private boolean isCompressed = false ;
3836 private boolean isClosed = false ;
39- private final ByteSizeValue maxHeaderSize ;
40- private final ChannelType channelType ;
37+ private final int maxHeaderSize ;
38+ private final boolean isServerChannel ;
4139
4240 public InboundDecoder (Recycler <BytesRef > recycler ) {
43- this (recycler , ByteSizeValue . of ( 2 , ByteSizeUnit . GB ), ChannelType . MIX );
41+ this (recycler , Integer . MAX_VALUE , false );
4442 }
4543
46- public InboundDecoder (Recycler <BytesRef > recycler , ChannelType channelType ) {
47- this (recycler , ByteSizeValue .of (2 , ByteSizeUnit .GB ), channelType );
48- }
49-
50- public InboundDecoder (Recycler <BytesRef > recycler , ByteSizeValue maxHeaderSize , ChannelType channelType ) {
44+ public InboundDecoder (Recycler <BytesRef > recycler , int maxHeaderSize , boolean isServerChannel ) {
5145 this .recycler = recycler ;
5246 this .maxHeaderSize = maxHeaderSize ;
53- this .channelType = channelType ;
47+ this .isServerChannel = isServerChannel ;
5448 }
5549
5650 public int decode (ReleasableBytesReference reference , CheckedConsumer <Object , IOException > fragmentConsumer ) throws IOException {
@@ -73,13 +67,13 @@ public int internalDecode(ReleasableBytesReference reference, CheckedConsumer<Ob
7367 fragmentConsumer .accept (PING );
7468 return 6 ;
7569 } else {
76- int headerBytesToRead = headerBytesToRead (reference , maxHeaderSize );
70+ int headerBytesToRead = headerBytesToRead (reference );
7771 if (headerBytesToRead == 0 ) {
7872 return 0 ;
7973 } else {
8074 totalNetworkSize = messageLength + TcpHeader .BYTES_REQUIRED_FOR_MESSAGE_SIZE ;
8175
82- Header header = readHeader (messageLength , reference , channelType );
76+ Header header = readHeader (messageLength , reference );
8377 bytesConsumed += headerBytesToRead ;
8478 if (header .isCompressed ()) {
8579 isCompressed = true ;
@@ -160,11 +154,7 @@ private boolean isDone() {
160154 return bytesConsumed == totalNetworkSize ;
161155 }
162156
163- private static int headerBytesToRead (BytesReference reference , ByteSizeValue maxHeaderSize ) throws StreamCorruptedException {
164- if (reference .length () < TcpHeader .BYTES_REQUIRED_FOR_VERSION ) {
165- return 0 ;
166- }
167-
157+ private int headerBytesToRead (BytesReference reference ) throws StreamCorruptedException {
168158 if (reference .length () <= TcpHeader .HEADER_SIZE ) {
169159 return 0 ;
170160 } else {
@@ -173,7 +163,7 @@ private static int headerBytesToRead(BytesReference reference, ByteSizeValue max
173163 throw new StreamCorruptedException ("invalid negative variable header size: " + variableHeaderSize );
174164 }
175165 int totalHeaderSize = TcpHeader .HEADER_SIZE + variableHeaderSize ;
176- if (totalHeaderSize > maxHeaderSize . getBytes () ) {
166+ if (totalHeaderSize > maxHeaderSize ) {
177167 throw new StreamCorruptedException ("header size [" + totalHeaderSize + "] exceeds limit of [" + maxHeaderSize + "]" );
178168 }
179169 if (totalHeaderSize > reference .length ()) {
@@ -184,18 +174,16 @@ private static int headerBytesToRead(BytesReference reference, ByteSizeValue max
184174 }
185175 }
186176
187- private static Header readHeader (int networkMessageSize , BytesReference bytesReference , ChannelType channelType ) throws IOException {
177+ private Header readHeader (int networkMessageSize , BytesReference bytesReference ) throws IOException {
188178 try (StreamInput streamInput = bytesReference .streamInput ()) {
189179 streamInput .skip (TcpHeader .BYTES_REQUIRED_FOR_MESSAGE_SIZE );
190180 long requestId = streamInput .readLong ();
191181 byte status = streamInput .readByte ();
192182 int remoteVersion = streamInput .readInt ();
193183
194184 Header header = new Header (networkMessageSize , requestId , status , TransportVersion .fromId (remoteVersion ));
195- if (channelType == ChannelType . SERVER && header .isResponse ()) {
185+ if (isServerChannel && header .isResponse ()) {
196186 throw new IllegalArgumentException ("server channels do not accept inbound responses, only requests, closing channel" );
197- } else if (channelType == ChannelType .CLIENT && header .isRequest ()) {
198- throw new IllegalArgumentException ("client channels do not accept inbound requests, only responses, closing channel" );
199187 }
200188 if (header .isHandshake ()) {
201189 checkHandshakeVersionCompatibility (header .getVersion ());
@@ -243,9 +231,4 @@ static void checkVersionCompatibility(TransportVersion remoteVersion) {
243231 }
244232 }
245233
246- public enum ChannelType {
247- SERVER ,
248- CLIENT ,
249- MIX
250- }
251234}
0 commit comments