@@ -19,7 +19,7 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
1919 private Cipher cipher ;
2020 private final Long contentLength ;
2121 private boolean isLastPart ;
22- private boolean finalized ;
22+ private boolean onCompleteCalled = false ;
2323
2424 private byte [] outputBuffer ;
2525
@@ -28,7 +28,6 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
2828 this .contentLength = contentLength ;
2929 cipher = materials .getCipher (iv );
3030 this .isLastPart = isLastPart ;
31- this .finalized = false ;
3231 }
3332
3433 CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv ) {
@@ -38,144 +37,90 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
3837
3938 @ Override
4039 public void onSubscribe (Subscription s ) {
41- wrappedSubscriber .onSubscribe (new Subscription () {
42- @ Override
43- public void request (long n ) {
44- s .request (n );
45- }
46-
47- @ Override
48- public void cancel () {
49- s .cancel ();
50- }
51- });
40+ wrappedSubscriber .onSubscribe (s );
5241 }
5342
5443 @ Override
5544 public void onNext (ByteBuffer byteBuffer ) {
56- System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining ());
57- System .out .println ("[CipherSubscriber] isLastPart: " + isLastPart );
5845 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
59- System .out .println ("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer );
6046
6147 if (amountToReadFromByteBuffer > 0 ) {
62- System .out .println ("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer );
6348 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
64- System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
6549
6650 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
67- System .out .println ("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer .length : 0 ));
6851
6952 if (outputBuffer == null || outputBuffer .length == 0 ) {
70- System .out .println ("[CipherSubscriber] No output from cipher update" );
71- // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber.
53+ if (contentRead .get () == contentLength ) {
54+ this .onComplete ();
55+ }
7256 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
7357 } else {
74- boolean atEnd = isLastPart && contentRead .get () >= contentLength - 16 ;
75- System .out .println ("[CipherSubscriber] atEnd check - isLastPart: " + isLastPart +
76- ", contentRead: " + contentRead .get () +
77- ", contentLength: " + contentLength +
78- ", atEnd: " + atEnd );
79-
80- if (atEnd ) {
81- System .out .println ("[CipherSubscriber] Processing final bytes" );
82- // If all content has been read, send the final bytes in this onNext call.
83- // The final bytes must be sent with the final onNext call, not during the onComplete call.
84- byte [] finalBytes ;
85- try {
86- finalBytes = cipher .doFinal ();
87- finalized = true ;
88- System .out .println ("[CipherSubscriber] Cipher doFinal produced " + finalBytes .length + " bytes" );
89- } catch (final GeneralSecurityException exception ) {
90- System .out .println ("[CipherSubscriber] Error during doFinal: " + exception .getMessage ());
91- wrappedSubscriber .onError (exception );
92- throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
93- }
94-
95- // Combine outputBuffer and finalBytes if both exist
96- byte [] combinedBuffer ;
97- if (outputBuffer != null && outputBuffer .length > 0 ) {
98- System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
99- combinedBuffer = new byte [outputBuffer .length + finalBytes .length ];
100- System .arraycopy (outputBuffer , 0 , combinedBuffer , 0 , outputBuffer .length );
101- System .arraycopy (finalBytes , 0 , combinedBuffer , outputBuffer .length , finalBytes .length );
102- System .out .println ("[CipherSubscriber] Combined buffer total length: " + combinedBuffer .length );
103- } else {
104- System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
105- combinedBuffer = finalBytes ;
106- }
107- System .out .println ("[CipherSubscriber] Sending combined buffer to wrapped subscriber" );
108- wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBuffer ));
109- } else {
110- System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
111- // Not at end; send content so far
58+ Long amount = isLastPart ? contentLength - 31 : contentLength - 15 ;
59+ if (contentRead .get () < amount ) {
11260 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
61+ } else {
62+ this .onComplete ();
11363 }
11464 }
11565 } else {
116- System .out .println ("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer" );
117- // Do nothing
11866 wrappedSubscriber .onNext (byteBuffer );
11967 }
12068 }
12169
12270 private int getAmountToReadFromByteBuffer (ByteBuffer byteBuffer ) {
123- System .out .println ("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer .remaining ());
124- System .out .println ("[CipherSubscriber] Current contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
125-
126- // If content length is null, we should include everything in the cipher because the stream is essentially
127- // unbounded.
12871 if (contentLength == null ) {
129- System .out .println ("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer .remaining ());
13072 return byteBuffer .remaining ();
13173 }
13274
13375 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
13476 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
135- System .out .println ("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining );
13677
13778 if (amountRemaining > byteBuffer .remaining ()) {
138- System .out .println ("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer .remaining ());
13979 return byteBuffer .remaining ();
14080 } else {
141- System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
14281 return Math .toIntExact (amountRemaining );
14382 }
14483 }
14584
14685 @ Override
14786 public void onError (Throwable t ) {
148- System .out .println ("[CipherSubscriber] onError called: " + t .getMessage ());
14987 wrappedSubscriber .onError (t );
15088 }
15189
15290 @ Override
15391 public void onComplete () {
154- System .out .println ("[CipherSubscriber] onComplete called, isLastPart: " + isLastPart );
155- if (!isLastPart ) {
156- System .out .println ("[CipherSubscriber] Not last part, skipping doFinal" );
157- // If this isn't the last part, skip doFinal, we aren't done
158- wrappedSubscriber .onComplete ();
92+ if (onCompleteCalled ) {
15993 return ;
16094 }
161- if (finalized ) {
162- System .out .println ("[CipherSubscriber] Finalized" );
95+ onCompleteCalled = true ;
96+ if (!isLastPart ) {
97+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
16398 wrappedSubscriber .onComplete ();
16499 return ;
165100 }
166101 try {
167- System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
168- outputBuffer = cipher .doFinal ();
169- System .out .println ("[CipherSubscriber] doFinal produced " + (outputBuffer != null ? outputBuffer .length : 0 ) + " bytes" );
170- // Send the final bytes to the wrapped subscriber
171- wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
102+ byte [] finalBytes = cipher .doFinal ();
103+
104+ byte [] combinedBytes ;
105+ if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
106+ combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
107+ System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
108+ System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
109+ } else if (outputBuffer != null && outputBuffer .length > 0 ) {
110+ combinedBytes = outputBuffer ;
111+ } else if (finalBytes != null && finalBytes .length > 0 ) {
112+ combinedBytes = finalBytes ;
113+ } else {
114+ combinedBytes = new byte [0 ];
115+ }
116+
117+ if (combinedBytes .length > 0 ) {
118+ wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
119+ }
172120 } catch (final GeneralSecurityException exception ) {
173- System .out .println ("[CipherSubscriber] Error during doFinal: " + exception .getMessage ());
174- // Forward error, else the wrapped subscriber waits indefinitely
175121 wrappedSubscriber .onError (exception );
176122 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
177123 }
178- System .out .println ("[CipherSubscriber] Completing wrapped subscriber" );
179124 wrappedSubscriber .onComplete ();
180125 }
181126
0 commit comments