@@ -20,7 +20,6 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
2020 private final Long contentLength ;
2121 private boolean isLastPart ;
2222 private boolean onCompleteCalled = false ;
23- private final AtomicLong outstandingRequests = new AtomicLong (0 );
2423
2524 private byte [] outputBuffer ;
2625
@@ -38,95 +37,62 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
3837
3938 @ Override
4039 public void onSubscribe (Subscription s ) {
41- System .out .println ("[CipherSubscriber] onSubscribe called with subscription: " + s );
42- wrappedSubscriber .onSubscribe (new Subscription () {
43- @ Override
44- public void request (long n ) {
45- System .out .println ("[CipherSubscriber] New request received for " + n + " items" );
46- outstandingRequests .addAndGet (n );
47- System .out .println ("[CipherSubscriber] Current outstanding requests: " + outstandingRequests .get ());
48- s .request (n );
49- }
50-
51- @ Override
52- public void cancel () {
53- System .out .println ("[CipherSubscriber] Subscription cancelled" );
54- s .cancel ();
55- }
56- });
40+ wrappedSubscriber .onSubscribe (s );
5741 }
5842
5943 @ Override
6044 public void onNext (ByteBuffer byteBuffer ) {
61- System .out .println ("[CipherSubscriber] ByteBuffer content: " + byteBuffer .toString ());
62- // while (byteBuffer.hasRemaining()) {
63- // byte b = byteBuffer.get();
64- // System.out.printf("%02x ", b); // Print as hex
65- // }
66- System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining ());
67- System .out .println ("[CipherSubscriber] isLastPart: " + isLastPart );
6845 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
69- System .out .println ("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer );
7046
7147 if (amountToReadFromByteBuffer > 0 ) {
72- System .out .println ("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer );
7348 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
74- System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
75-
7649 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
77- System .out .println ("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer .length : 0 ));
78-
7950 if (outputBuffer == null || outputBuffer .length == 0 ) {
80- System .out .println ("[CipherSubscriber] No output from cipher update" );
81- System .out .println ("[CipherSubscriber] contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
51+ // The underlying data is too short to fill in the block cipher.
52+ // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
53+ // in practice SunJCE and ACCP return an empty buffer instead, hence checks for
54+ // null OR length == 0.
8255 if (contentRead .get () == contentLength ) {
83- System . out . println ( "[CipherSubscriber] All content read (contentRead: " + contentRead . get () + ", contentLength: " + contentLength + "), calling onComplete" );
56+ // All content has been read, so complete to get the final bytes
8457 this .onComplete ();
8558 }
86- System .out .println ("[CipherSubscriber] Sending empty buffer to wrapped subscriber" );
59+ // Otherwise, wait for more bytes. To avoid blocking,
60+ // send an empty buffer to the wrapped subscriber.
8761 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
8862 } else {
89- System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
90- System .out .println ("[CipherSubscriber] contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
9163 Long amount = isLastPart ? contentLength - 31 : contentLength - 15 ;
9264 if (contentRead .get () < amount ) {
9365 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
9466 } else {
67+ // Done, wait for upstream to signal onComplete
9568 System .out .println ("[CipherSubscriber] All content read (contentRead: " + contentRead .get () + ", contentLength: " + contentLength + "), waiting for onComplete" );
96- // this.onComplete();
9769 }
9870 }
9971 } else {
100- System . out . println ( "[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer" );
72+ // Do nothing
10173 wrappedSubscriber .onNext (byteBuffer );
10274 }
10375 }
10476
10577 private int getAmountToReadFromByteBuffer (ByteBuffer byteBuffer ) {
106- System .out .println ("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer .remaining ());
107- System .out .println ("[CipherSubscriber] Current contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
108-
78+ // If content length is null, we should include everything in the cipher because the stream is essentially
79+ // unbounded.
10980 if (contentLength == null ) {
110- System .out .println ("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer .remaining ());
11181 return byteBuffer .remaining ();
11282 }
11383
11484 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
11585 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
116- System .out .println ("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining );
11786
11887 if (amountRemaining > byteBuffer .remaining ()) {
119- System .out .println ("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer .remaining ());
12088 return byteBuffer .remaining ();
12189 } else {
122- System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
12390 return Math .toIntExact (amountRemaining );
12491 }
12592 }
12693
12794 @ Override
12895 public void onError (Throwable t ) {
129- System .out .println ("[CipherSubscriber] onError called: " + t .getMessage ());
13096 wrappedSubscriber .onError (t );
13197 }
13298
@@ -136,47 +102,39 @@ public void onComplete() {
136102 return ;
137103 }
138104 onCompleteCalled = true ;
139- System .out .println ("[CipherSubscriber] onComplete called, isLastPart: " + isLastPart );
140105 if (!isLastPart ) {
141- System . out . println ( "[CipherSubscriber] Not last part, skipping doFinal" );
106+ // If this isn't the last part, skip doFinal, we aren't done
142107 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
143108 wrappedSubscriber .onComplete ();
144109 return ;
145110 }
146- byte [] finalBytes ;
147111 try {
148- System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
149- finalBytes = cipher .doFinal ();
112+ byte [] finalBytes = cipher .doFinal ();
113+
114+ byte [] combinedBytes ;
115+ if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
116+ combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
117+ System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
118+ System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
119+ } else if (outputBuffer != null && outputBuffer .length > 0 ) {
120+ combinedBytes = outputBuffer ;
121+ } else if (finalBytes != null && finalBytes .length > 0 ) {
122+ combinedBytes = finalBytes ;
123+ } else {
124+ combinedBytes = new byte [0 ];
125+ }
126+
127+ if (combinedBytes .length > 0 ) {
128+ wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
129+ }
130+ wrappedSubscriber .onComplete ();
150131 } catch (final GeneralSecurityException exception ) {
151- System .out .println ("[CipherSubscriber] Error during doFinal: " + exception .getMessage ());
132+ // Forward error, else the wrapped subscriber waits indefinitely
133+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
152134 wrappedSubscriber .onError (exception );
135+ wrappedSubscriber .onComplete ();
153136 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
154137 }
155- System .out .println ("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes .length : 0 ) + " bytes" );
156-
157- byte [] combinedBytes ;
158- if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
159- System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
160- combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
161- System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
162- System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
163- } else if (outputBuffer != null && outputBuffer .length > 0 ) {
164- System .out .println ("[CipherSubscriber] Using only outputBuffer (" + outputBuffer .length + " bytes)" );
165- combinedBytes = outputBuffer ;
166- } else if (finalBytes != null && finalBytes .length > 0 ) {
167- System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
168- combinedBytes = finalBytes ;
169- } else {
170- System .out .println ("[CipherSubscriber] No bytes to send" );
171- combinedBytes = new byte [0 ];
172- }
173-
174- if (combinedBytes .length > 0 ) {
175- System .out .println ("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes .length );
176- wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
177- }
178- System .out .println ("[CipherSubscriber] Completing wrapped subscriber" );
179- wrappedSubscriber .onComplete ();
180138 }
181139
182140}
0 commit comments