@@ -20,7 +20,6 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
2020 private final Long contentLength ;
2121 private boolean isLastPart ;
2222 private int tagLength ;
23- private boolean onCompleteCalled = false ;
2423
2524 private byte [] outputBuffer ;
2625
@@ -48,33 +47,16 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
4847
4948 @ Override
5049 public void onSubscribe (Subscription s ) {
51- System .out .println ("[CipherSubscriber] onSubscribe called with subscription: " + s );
52- wrappedSubscriber .onSubscribe (new Subscription () {
53- @ Override
54- public void request (long n ) {
55- System .out .println ("[CipherSubscriber] Request received for " + n + " items" );
56- s .request (n );
57- }
58-
59- @ Override
60- public void cancel () {
61- System .out .println ("[CipherSubscriber] Subscription cancelled" );
62- s .cancel ();
63- }
64- });
50+ wrappedSubscriber .onSubscribe (s );
6551 }
6652
6753 @ Override
6854 public void onNext (ByteBuffer byteBuffer ) {
69- System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining () + ", contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
7055 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
71- System .out .println ("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer );
7256
7357 if (amountToReadFromByteBuffer > 0 ) {
7458 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
75- System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
7659 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
77- System .out .println ("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer .length : 0 ) + " bytes" );
7860
7961 if (outputBuffer == null || outputBuffer .length == 0 ) {
8062 // The underlying data is too short to fill in the block cipher.
@@ -83,14 +65,12 @@ public void onNext(ByteBuffer byteBuffer) {
8365 // null OR length == 0.
8466 if (contentRead .get () + tagLength >= contentLength ) {
8567 // All content has been read, so complete to get the final bytes
86- System .out .println ("[CipherSubscriber] All content read (" + contentRead .get () + " bytes), proceeding to finalBytes" );
8768 finalBytes ();
8869 return ;
8970 }
9071 // Otherwise, wait for more bytes. To avoid blocking,
9172 // send an empty buffer to the wrapped subscriber.
92- System .out .println ("[CipherSubscriber] Sending empty buffer to wrapped subscriber" );
93- // wrappedSubscriber.onNext(ByteBuffer.allocate(0));
73+ wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
9474 } else {
9575 // Check if stream has read all expected content.
9676 // Once all content has been read, call onComplete.
@@ -109,23 +89,19 @@ public void onNext(ByteBuffer byteBuffer) {
10989 // including the result of cipher.doFinal(), if applicable.
11090 // Calling `wrappedSubscriber.onNext` more than once for `request(1)`
11191 // violates the Reactive Streams specification and can cause exceptions downstream.
112- System .out .println ("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead .get () + ", tagLength=" + tagLength + ", contentLength=" + contentLength );
11392 if (contentRead .get () + tagLength >= contentLength ) {
11493 // All content has been read; complete the stream.
11594 // (Signalling onComplete from here is Reactive Streams-spec compliant;
11695 // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
117- System .out .println ("[CipherSubscriber] Content read threshold reached, proceeding to finalBytes" );
11896 finalBytes ();
11997 } else {
12098 // Needs to read more data, so send the data downstream,
12199 // expecting that downstream will continue to request more data.
122- System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
123100 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
124101 }
125102 }
126103 } else {
127104 // Do nothing
128- System .out .println ("[CipherSubscriber] No data to process, forwarding buffer directly" );
129105 wrappedSubscriber .onNext (byteBuffer );
130106 }
131107 }
@@ -134,73 +110,48 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
134110 // If content length is null, we should include everything in the cipher because the stream is essentially
135111 // unbounded.
136112 if (contentLength == null ) {
137- System .out .println ("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer .remaining ());
138113 return byteBuffer .remaining ();
139114 }
140115
141116 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
142117 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
143- System .out .println ("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer .remaining ());
144118
145119 if (amountRemaining > byteBuffer .remaining ()) {
146- System .out .println ("[CipherSubscriber] Reading entire buffer: " + byteBuffer .remaining ());
147120 return byteBuffer .remaining ();
148121 } else {
149- System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
150122 return Math .toIntExact (amountRemaining );
151123 }
152124 }
153125
154126 @ Override
155127 public void onError (Throwable t ) {
156- System .out .println ("[CipherSubscriber] Error occurred: " + t .getMessage ());
157128 wrappedSubscriber .onError (t );
158129 }
159130
160131 @ Override
161132 public void onComplete () {
162- System .out .println ("[CipherSubscriber] onComplete called" );
163133 wrappedSubscriber .onComplete ();
164134 }
165135
166136 public void finalBytes () {
167- System .out .println ("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart + ", onCompleteCalled: " + onCompleteCalled );
168- // onComplete can be signalled to CipherSubscriber multiple times,
169- // but additional calls should be deduped to avoid calling onNext multiple times
170- // and raising exceptions.
171- if (onCompleteCalled ) {
172- System .out .println ("[CipherSubscriber] finalBytes already called, returning" );
173- return ;
174- }
175- onCompleteCalled = true ;
176-
177137 // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
178138 // doFinal requires that all parts have been processed to compute the tag,
179139 // so the tag will only be computed when the last part is processed.
180140 if (!isLastPart ) {
181- System .out .println ("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer .length : 0 ));
182- // First, propagate the bytes that were in outputBuffer downstream.
183141 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
184- // Then, propagate the onComplete signal downstream.
185- // wrappedSubscriber.onComplete();
186142 return ;
187143 }
188144
189145 // If this is the last part, compute doFinal and include its result in the value sent downstream.
190146 // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
191- byte [] finalBytes = null ;
147+ byte [] finalBytes ;
192148 try {
193- System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
194149 finalBytes = cipher .doFinal ();
195- System .out .println ("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes .length : 0 ) + " bytes" );
196150 } catch (final GeneralSecurityException exception ) {
197151 // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
198- System .out .println ("[CipherSubscriber] Security exception during doFinal: " + exception .getMessage ());
199152 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
200153 // Forward error, else the wrapped subscriber waits indefinitely
201154 wrappedSubscriber .onError (exception );
202- // Even though doFinal failed, propagate the onComplete signal downstream
203- // wrappedSubscriber.onComplete();
204155 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
205156 }
206157
@@ -209,24 +160,18 @@ public void finalBytes() {
209160 // This single onNext call must contain both the bytes from outputBuffer and the tag.
210161 byte [] combinedBytes ;
211162 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
212- System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
213163 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
214164 System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
215165 System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
216166 } else if (outputBuffer != null && outputBuffer .length > 0 ) {
217- System .out .println ("[CipherSubscriber] Using only outputBuffer (" + outputBuffer .length + " bytes)" );
218167 combinedBytes = outputBuffer ;
219168 } else if (finalBytes != null && finalBytes .length > 0 ) {
220- System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
221169 combinedBytes = finalBytes ;
222170 } else {
223- System .out .println ("[CipherSubscriber] No bytes to send" );
224171 combinedBytes = new byte [0 ];
225172 }
226173
227- System .out .println ("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes .length );
228174 wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
229- // wrappedSubscriber.onComplete();
230175 }
231176
232177}
0 commit comments