@@ -67,7 +67,7 @@ public void onNext(ByteBuffer byteBuffer) {
6767 } else {
6868 /*
6969 Check if stream has read all expected content.
70- Once all content has been read, call onComplete .
70+ Once all content has been read, call `finalBytes` .
7171
7272 This determines that all content has been read by checking if
7373 the amount of data read so far plus the tag length is at least the content length.
@@ -86,8 +86,6 @@ public void onNext(ByteBuffer byteBuffer) {
8686 */
8787 if (contentRead .get () + tagLength >= contentLength ) {
8888 // All content has been read; complete the stream.
89- // (Signalling onComplete from here is Reactive Streams-spec compliant;
90- // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
9189 finalBytes ();
9290 } else {
9391 // Needs to read more data, so send the data downstream,
@@ -127,7 +125,7 @@ public void onError(Throwable t) {
127125 public void onComplete () {
128126 // In rare cases, e.g. when the last part of a low-level MPU has 0 length,
129127 // onComplete will be called before onNext is called once.
130- if (contentRead .get () < contentLength ) {
128+ if (contentRead .get () + tagLength <= contentLength ) {
131129 finalBytes ();
132130 }
133131 wrappedSubscriber .onComplete ();
0 commit comments