@@ -42,33 +42,16 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
4242
4343 @ Override
4444 public void onSubscribe (Subscription s ) {
45- System .out .println ("[CipherSubscriber] onSubscribe called with subscription: " + s );
46- wrappedSubscriber .onSubscribe (new Subscription () {
47- @ Override
48- public void request (long n ) {
49- System .out .println ("[CipherSubscriber] Request received for " + n + " items" );
50- s .request (n );
51- }
52-
53- @ Override
54- public void cancel () {
55- System .out .println ("[CipherSubscriber] Subscription cancelled" );
56- s .cancel ();
57- }
58- });
45+ wrappedSubscriber .onSubscribe (s );
5946 }
6047
6148 @ Override
6249 public void onNext (ByteBuffer byteBuffer ) {
63- System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining () + ", contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
6450 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
65- System .out .println ("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer );
6651
6752 if (amountToReadFromByteBuffer > 0 ) {
6853 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
69- System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
7054 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
71- System .out .println ("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer .length : 0 ) + " bytes" );
7255
7356 if (outputBuffer == null || outputBuffer .length == 0 ) {
7457 // The underlying data is too short to fill in the block cipher.
@@ -79,13 +62,11 @@ public void onNext(ByteBuffer byteBuffer) {
7962 // tagLength should only be added on Encrypt
8063 if (contentRead .get () + (isEncrypt ? tagLength : 0 ) >= contentLength ) {
8164 // All content has been read, so complete to get the final bytes
82- System .out .println ("[CipherSubscriber] All content read (" + contentRead .get () + " bytes), proceeding to finalBytes" );
8365 finalBytes ();
8466 return ;
8567 }
8668 // Otherwise, wait for more bytes. To avoid blocking,
8769 // send an empty buffer to the wrapped subscriber.
88- System .out .println ("[CipherSubscriber] Sending empty buffer to wrapped subscriber" );
8970 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
9071 } else {
9172 /*
@@ -107,21 +88,17 @@ public void onNext(ByteBuffer byteBuffer) {
10788 Calling `wrappedSubscriber.onNext` more than once for `request(1)`
10889 violates the Reactive Streams specification and can cause exceptions downstream.
10990 */
110- System .out .println ("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead .get () + ", tagLength=" + tagLength + ", contentLength=" + contentLength );
11191 if (contentRead .get () + (isEncrypt ? tagLength : 0 ) >= contentLength ) {
11292 // All content has been read; complete the stream.
113- System .out .println ("[CipherSubscriber] Content read threshold (" + contentRead .get () + ") reached, proceeding to finalBytes" );
11493 finalBytes ();
11594 } else {
11695 // Needs to read more data, so send the data downstream,
11796 // expecting that downstream will continue to request more data.
118- System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
11997 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
12098 }
12199 }
122100 } else {
123101 // Do nothing
124- System .out .println ("[CipherSubscriber] No data to process, forwarding buffer directly" );
125102 wrappedSubscriber .onNext (byteBuffer );
126103 }
127104 }
@@ -130,40 +107,32 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
130107 // If content length is null, we should include everything in the cipher because the stream is essentially
131108 // unbounded.
132109 if (contentLength == null ) {
133- System .out .println ("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer .remaining ());
134110 return byteBuffer .remaining ();
135111 }
136112
137113 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
138114 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
139- System .out .println ("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer .remaining ());
140115
141116 if (amountRemaining > byteBuffer .remaining ()) {
142- System .out .println ("[CipherSubscriber] Reading entire buffer: " + byteBuffer .remaining ());
143117 return byteBuffer .remaining ();
144118 } else {
145- System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
146119 return Math .toIntExact (amountRemaining );
147120 }
148121 }
149122
150123 @ Override
151124 public void onError (Throwable t ) {
152- System .out .println ("[CipherSubscriber] Error occurred: " + t .getMessage ());
153125 wrappedSubscriber .onError (t );
154126 }
155127
156128 @ Override
157129 public void onComplete () {
158130 // In rare cases, e.g. when the last part of a low-level MPU has 0 length,
159131 // onComplete will be called before onNext is called once.
160- System .out .println ("[CipherSubscriber] onComplete called" );
161132 // tagLength should only be added on Encrypt
162133 if (contentRead .get () + (isEncrypt ? tagLength : 0 ) >= contentLength ) {
163- System .out .println ("[CipherSubscriber] onComplete called prematurely! The content read is " + contentRead .get () + " but the contentLength is " + contentLength );
164134 finalBytes ();
165135 }
166- System .out .println ("[CipherSubscriber] forward onComplete" );
167136 wrappedSubscriber .onComplete ();
168137 }
169138
@@ -174,17 +143,14 @@ public void onComplete() {
174143 */
175144 private void finalBytes () {
176145 if (!finalBytesCalled .compareAndSet (false , true )) {
177- System .out .println ("[CipherSubscriber] finalBytes already called!" );
178146 // already called, don't repeat
179147 return ;
180148 }
181149
182150 // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
183151 // doFinal requires that all parts have been processed to compute the tag,
184152 // so the tag will only be computed when the last part is processed.
185- System .out .println ("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart );
186153 if (!isLastPart ) {
187- System .out .println ("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer .length : 0 ));
188154 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
189155 return ;
190156 }
@@ -193,11 +159,9 @@ private void finalBytes() {
193159 // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
194160 byte [] finalBytes ;
195161 try {
196- System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
197162 finalBytes = cipher .doFinal ();
198163 } catch (final GeneralSecurityException exception ) {
199164 // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
200- System .out .println ("[CipherSubscriber] Security exception during doFinal: " + exception .getMessage ());
201165 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
202166 // Forward error, else the wrapped subscriber waits indefinitely
203167 wrappedSubscriber .onError (exception );
@@ -209,22 +173,17 @@ private void finalBytes() {
209173 // This single onNext call must contain both the bytes from outputBuffer and the tag.
210174 byte [] combinedBytes ;
211175 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
212- System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
213176 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
214177 System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
215178 System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
216179 } else if (outputBuffer != null && outputBuffer .length > 0 ) {
217- System .out .println ("[CipherSubscriber] Using only outputBuffer (" + outputBuffer .length + " bytes)" );
218180 combinedBytes = outputBuffer ;
219181 } else if (finalBytes != null && finalBytes .length > 0 ) {
220- System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
221182 combinedBytes = finalBytes ;
222183 } else {
223- System .out .println ("[CipherSubscriber] No bytes to send" );
224184 combinedBytes = new byte [0 ];
225185 }
226186
227- System .out .println ("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes .length );
228187 wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
229188 }
230189
0 commit comments