@@ -47,33 +47,26 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
4747
4848 @ Override
4949 public void onSubscribe (Subscription s ) {
50- //System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s);
5150 wrappedSubscriber .onSubscribe (new Subscription () {
5251 @ Override
5352 public void request (long n ) {
54- //System.out.println("[CipherSubscriber] Request received for " + n + " items");
5553 s .request (n );
5654 }
5755
5856 @ Override
5957 public void cancel () {
60- //System.out.println("[CipherSubscriber] Subscription cancelled");
6158 s .cancel ();
6259 }
6360 });
6461 }
6562
6663 @ Override
6764 public void onNext (ByteBuffer byteBuffer ) {
68- //System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength);
6965 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
70- //System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer);
7166
7267 if (amountToReadFromByteBuffer > 0 ) {
7368 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
74- //System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer");
7569 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
76- //System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes");
7770
7871 if (outputBuffer == null || outputBuffer .length == 0 ) {
7972 // The underlying data is too short to fill in the block cipher.
@@ -82,13 +75,11 @@ public void onNext(ByteBuffer byteBuffer) {
8275 // null OR length == 0.
8376 if (contentRead .get () + tagLength >= contentLength ) {
8477 // All content has been read, so complete to get the final bytes
85- //System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes");
8678 finalBytes ();
8779 return ;
8880 }
8981 // Otherwise, wait for more bytes. To avoid blocking,
9082 // send an empty buffer to the wrapped subscriber.
91- //System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber");
9283 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
9384 } else {
9485 // Check if stream has read all expected content.
@@ -108,23 +99,19 @@ public void onNext(ByteBuffer byteBuffer) {
10899 // including the result of cipher.doFinal(), if applicable.
109100 // Calling `wrappedSubscriber.onNext` more than once for `request(1)`
110101 // violates the Reactive Streams specification and can cause exceptions downstream.
111- //System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength);
112102 if (contentRead .get () + tagLength >= contentLength ) {
113103 // All content has been read; complete the stream.
114104 // (Signalling onComplete from here is Reactive Streams-spec compliant;
115105 // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
116- //System.out.println("[CipherSubscriber] Content read threshold (" + contentRead.get() + ") reached, proceeding to finalBytes");
117106 finalBytes ();
118107 } else {
119108 // Needs to read more data, so send the data downstream,
120109 // expecting that downstream will continue to request more data.
121- //System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber");
122110 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
123111 }
124112 }
125113 } else {
126114 // Do nothing
127- //System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly");
128115 wrappedSubscriber .onNext (byteBuffer );
129116 }
130117 }
@@ -133,37 +120,30 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
133120 // If content length is null, we should include everything in the cipher because the stream is essentially
134121 // unbounded.
135122 if (contentLength == null ) {
136- //System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining());
137123 return byteBuffer .remaining ();
138124 }
139125
140126 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
141127 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
142- //System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining());
143128
144129 if (amountRemaining > byteBuffer .remaining ()) {
145- //System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining());
146130 return byteBuffer .remaining ();
147131 } else {
148- //System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining);
149132 return Math .toIntExact (amountRemaining );
150133 }
151134 }
152135
153136 @ Override
154137 public void onError (Throwable t ) {
155- //System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage());
156138 wrappedSubscriber .onError (t );
157139 }
158140
159141 @ Override
160142 public void onComplete () {
161- //System.out.println("[CipherSubscriber] onComplete called");
143+ // In rare cases, e.g. when the last part of a low-level MPU has 0 length,
144+ // onComplete will be called before onNext is called once.
162145 if (contentRead .get () < contentLength ) {
163- //System.out.println("[CipherSubscriber] onComplete called prematurely! The content read is " + contentRead.get() + " but the contentLength is " + contentLength);
164- //System.out.println("try just calling finalBytes() straight up");
165146 finalBytes ();
166- //System.out.println("now let it complete");
167147 }
168148 wrappedSubscriber .onComplete ();
169149 }
@@ -172,9 +152,7 @@ public void finalBytes() {
172152 // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
173153 // doFinal requires that all parts have been processed to compute the tag,
174154 // so the tag will only be computed when the last part is processed.
175- //System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart);
176155 if (!isLastPart ) {
177- //System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0));
178156 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
179157 return ;
180158 }
@@ -183,11 +161,9 @@ public void finalBytes() {
183161 // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
184162 byte [] finalBytes ;
185163 try {
186- //System.out.println("[CipherSubscriber] Calling cipher.doFinal()");
187164 finalBytes = cipher .doFinal ();
188165 } catch (final GeneralSecurityException exception ) {
189166 // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
190- //System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage());
191167 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
192168 // Forward error, else the wrapped subscriber waits indefinitely
193169 wrappedSubscriber .onError (exception );
@@ -199,22 +175,17 @@ public void finalBytes() {
199175 // This single onNext call must contain both the bytes from outputBuffer and the tag.
200176 byte [] combinedBytes ;
201177 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
202- //System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)");
203178 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
204179 System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
205180 System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
206181 } else if (outputBuffer != null && outputBuffer .length > 0 ) {
207- //System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)");
208182 combinedBytes = outputBuffer ;
209183 } else if (finalBytes != null && finalBytes .length > 0 ) {
210- //System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)");
211184 combinedBytes = finalBytes ;
212185 } else {
213- //System.out.println("[CipherSubscriber] No bytes to send");
214186 combinedBytes = new byte [0 ];
215187 }
216188
217- //System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length);
218189 wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
219190 }
220191
0 commit comments