1111
1212import org .elasticsearch .common .bytes .CompositeBytesReference ;
1313import org .elasticsearch .common .bytes .ReleasableBytesReference ;
14+ import org .elasticsearch .core .CheckedConsumer ;
1415import org .elasticsearch .core .Releasable ;
1516import org .elasticsearch .core .Releasables ;
1617
1718import java .io .IOException ;
1819import java .util .ArrayDeque ;
19- import java .util .ArrayList ;
2020import java .util .function .BiConsumer ;
2121import java .util .function .LongSupplier ;
2222
2323public class InboundPipeline implements Releasable {
2424
25- private static final ThreadLocal <ArrayList <Object >> fragmentList = ThreadLocal .withInitial (ArrayList ::new );
2625 private static final InboundMessage PING_MESSAGE = new InboundMessage (null , true );
2726
2827 private final LongSupplier relativeTimeInMillis ;
@@ -56,81 +55,74 @@ public void close() {
5655
5756 public void handleBytes (TcpChannel channel , ReleasableBytesReference reference ) throws IOException {
5857 if (uncaughtException != null ) {
58+ reference .close ();
5959 throw new IllegalStateException ("Pipeline state corrupted by uncaught exception" , uncaughtException );
6060 }
6161 try {
62- doHandleBytes (channel , reference );
62+ channel .getChannelStats ().markAccessed (relativeTimeInMillis .getAsLong ());
63+ statsTracker .markBytesRead (reference .length ());
64+ if (isClosed ) {
65+ reference .close ();
66+ return ;
67+ }
68+ pending .add (reference );
69+ doHandleBytes (channel );
6370 } catch (Exception e ) {
6471 uncaughtException = e ;
6572 throw e ;
6673 }
6774 }
6875
69- public void doHandleBytes (TcpChannel channel , ReleasableBytesReference reference ) throws IOException {
70- channel .getChannelStats ().markAccessed (relativeTimeInMillis .getAsLong ());
71- statsTracker .markBytesRead (reference .length ());
72- pending .add (reference .retain ());
73-
74- final ArrayList <Object > fragments = fragmentList .get ();
75- boolean continueHandling = true ;
76-
77- while (continueHandling && isClosed == false ) {
78- boolean continueDecoding = true ;
79- while (continueDecoding && pending .isEmpty () == false ) {
80- try (ReleasableBytesReference toDecode = getPendingBytes ()) {
81- final int bytesDecoded = decoder .decode (toDecode , fragments ::add );
82- if (bytesDecoded != 0 ) {
83- releasePendingBytes (bytesDecoded );
84- if (fragments .isEmpty () == false && endOfMessage (fragments .get (fragments .size () - 1 ))) {
85- continueDecoding = false ;
86- }
87- } else {
88- continueDecoding = false ;
89- }
76+ private void doHandleBytes (TcpChannel channel ) throws IOException {
77+ do {
78+ CheckedConsumer <Object , IOException > decodeConsumer = f -> forwardFragment (channel , f );
79+ int bytesDecoded = decoder .decode (pending .peekFirst (), decodeConsumer );
80+ if (bytesDecoded == 0 && pending .size () > 1 ) {
81+ final ReleasableBytesReference [] bytesReferences = new ReleasableBytesReference [pending .size ()];
82+ int index = 0 ;
83+ for (ReleasableBytesReference pendingReference : pending ) {
84+ bytesReferences [index ] = pendingReference .retain ();
85+ ++index ;
86+ }
87+ try (
88+ ReleasableBytesReference toDecode = new ReleasableBytesReference (
89+ CompositeBytesReference .of (bytesReferences ),
90+ () -> Releasables .closeExpectNoException (bytesReferences )
91+ )
92+ ) {
93+ bytesDecoded = decoder .decode (toDecode , decodeConsumer );
9094 }
9195 }
92-
93- if (fragments .isEmpty ()) {
94- continueHandling = false ;
96+ if (bytesDecoded != 0 ) {
97+ releasePendingBytes (bytesDecoded );
9598 } else {
96- try {
97- forwardFragments (channel , fragments );
98- } finally {
99- for (Object fragment : fragments ) {
100- if (fragment instanceof ReleasableBytesReference ) {
101- ((ReleasableBytesReference ) fragment ).close ();
102- }
103- }
104- fragments .clear ();
105- }
99+ break ;
106100 }
107- }
101+ } while ( pending . isEmpty () == false );
108102 }
109103
110- private void forwardFragments (TcpChannel channel , ArrayList <Object > fragments ) throws IOException {
111- for (Object fragment : fragments ) {
112- if (fragment instanceof Header ) {
113- headerReceived ((Header ) fragment );
114- } else if (fragment instanceof Compression .Scheme ) {
115- assert aggregator .isAggregating ();
116- aggregator .updateCompressionScheme ((Compression .Scheme ) fragment );
117- } else if (fragment == InboundDecoder .PING ) {
118- assert aggregator .isAggregating () == false ;
119- messageHandler .accept (channel , PING_MESSAGE );
120- } else if (fragment == InboundDecoder .END_CONTENT ) {
121- assert aggregator .isAggregating ();
122- InboundMessage aggregated = aggregator .finishAggregation ();
123- try {
124- statsTracker .markMessageReceived ();
125- messageHandler .accept (channel , aggregated );
126- } finally {
127- aggregated .decRef ();
128- }
129- } else {
130- assert aggregator .isAggregating ();
131- assert fragment instanceof ReleasableBytesReference ;
132- aggregator .aggregate ((ReleasableBytesReference ) fragment );
104+ private void forwardFragment (TcpChannel channel , Object fragment ) throws IOException {
105+ if (fragment instanceof Header ) {
106+ headerReceived ((Header ) fragment );
107+ } else if (fragment instanceof Compression .Scheme ) {
108+ assert aggregator .isAggregating ();
109+ aggregator .updateCompressionScheme ((Compression .Scheme ) fragment );
110+ } else if (fragment == InboundDecoder .PING ) {
111+ assert aggregator .isAggregating () == false ;
112+ messageHandler .accept (channel , PING_MESSAGE );
113+ } else if (fragment == InboundDecoder .END_CONTENT ) {
114+ assert aggregator .isAggregating ();
115+ InboundMessage aggregated = aggregator .finishAggregation ();
116+ try {
117+ statsTracker .markMessageReceived ();
118+ messageHandler .accept (channel , aggregated );
119+ } finally {
120+ aggregated .decRef ();
133121 }
122+ } else {
123+ assert aggregator .isAggregating ();
124+ assert fragment instanceof ReleasableBytesReference ;
125+ aggregator .aggregate ((ReleasableBytesReference ) fragment );
134126 }
135127 }
136128
@@ -139,25 +131,6 @@ protected void headerReceived(Header header) {
139131 aggregator .headerReceived (header );
140132 }
141133
142- private static boolean endOfMessage (Object fragment ) {
143- return fragment == InboundDecoder .PING || fragment == InboundDecoder .END_CONTENT || fragment instanceof Exception ;
144- }
145-
146- private ReleasableBytesReference getPendingBytes () {
147- if (pending .size () == 1 ) {
148- return pending .peekFirst ().retain ();
149- } else {
150- final ReleasableBytesReference [] bytesReferences = new ReleasableBytesReference [pending .size ()];
151- int index = 0 ;
152- for (ReleasableBytesReference pendingReference : pending ) {
153- bytesReferences [index ] = pendingReference .retain ();
154- ++index ;
155- }
156- final Releasable releasable = () -> Releasables .closeExpectNoException (bytesReferences );
157- return new ReleasableBytesReference (CompositeBytesReference .of (bytesReferences ), releasable );
158- }
159- }
160-
161134 private void releasePendingBytes (int bytesConsumed ) {
162135 int bytesToRelease = bytesConsumed ;
163136 while (bytesToRelease != 0 ) {
0 commit comments