1111import javax .crypto .Cipher ;
1212import java .nio .ByteBuffer ;
1313import java .security .GeneralSecurityException ;
14- import java .util .Arrays ;
1514import java .util .concurrent .atomic .AtomicLong ;
1615
1716public class CipherSubscriber implements Subscriber <ByteBuffer > {
@@ -20,15 +19,16 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
2019 private Cipher cipher ;
2120 private final Long contentLength ;
2221 private boolean isLastPart ;
22+ private boolean finalized ;
2323
2424 private byte [] outputBuffer ;
2525
2626 CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv , boolean isLastPart ) {
27- System .out .println ("[CipherSubscriber] Constructor called with contentLength: " + contentLength + ", isLastPart: " + isLastPart );
2827 this .wrappedSubscriber = wrappedSubscriber ;
2928 this .contentLength = contentLength ;
3029 cipher = materials .getCipher (iv );
3130 this .isLastPart = isLastPart ;
31+ this .finalized = false ;
3232 }
3333
3434 CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv ) {
@@ -39,17 +39,16 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
3939 @ Override
4040 public void onSubscribe (Subscription s ) {
4141 System .out .println ("[CipherSubscriber] onSubscribe called with subscription: " + s );
42-
4342 wrappedSubscriber .onSubscribe (new Subscription () {
4443 @ Override
4544 public void request (long n ) {
46- System .out .println ("[CipherSubscriber] Request called for " + n + " items" );
45+ System .out .println ("[CipherSubscriber] New request received for " + n + " items" );
4746 s .request (n );
4847 }
4948
5049 @ Override
5150 public void cancel () {
52- System .out .println ("[CipherSubscriber] Cancel called " );
51+ System .out .println ("[CipherSubscriber] Subscription cancelled " );
5352 s .cancel ();
5453 }
5554 });
@@ -58,6 +57,7 @@ public void cancel() {
5857 @ Override
5958 public void onNext (ByteBuffer byteBuffer ) {
6059 System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining ());
60+ System .out .println ("[CipherSubscriber] isLastPart: " + isLastPart );
6161 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
6262 System .out .println ("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer );
6363
@@ -69,50 +69,53 @@ public void onNext(ByteBuffer byteBuffer) {
6969 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
7070 System .out .println ("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer .length : 0 ));
7171
72- boolean atEnd = isLastPart && contentRead .get () >= contentLength - 16 ;
73- System .out .println ("[CipherSubscriber] atEnd: " + atEnd + " (isLastPart: " + isLastPart + ", contentRead: " + contentRead .get () + ", contentLength: " + contentLength + ")" );
74-
75- if (atEnd ) {
76- System .out .println ("[CipherSubscriber] Processing final bytes" );
77- // The final bytes must be sent with the final onNext call, not during the onComplete call.
78- byte [] finalBytes ;
79- try {
80- finalBytes = cipher .doFinal ();
81- System .out .println ("[CipherSubscriber] Cipher doFinal produced " + finalBytes .length + " bytes" );
82- } catch (final GeneralSecurityException exception ) {
83- System .out .println ("[CipherSubscriber] Error during doFinal: " + exception .getMessage ());
84- wrappedSubscriber .onError (exception );
85- throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
86- }
87-
88- // Combine outputBuffer and finalBytes if both exist
89- byte [] combinedBuffer ;
90- if (outputBuffer != null && outputBuffer .length > 0 ) {
91- System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
92- combinedBuffer = new byte [outputBuffer .length + finalBytes .length ];
93- System .arraycopy (outputBuffer , 0 , combinedBuffer , 0 , outputBuffer .length );
94- System .arraycopy (finalBytes , 0 , combinedBuffer , outputBuffer .length , finalBytes .length );
95- System .out .println ("[CipherSubscriber] Combined buffer total length: " + combinedBuffer .length );
96- } else {
97- System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
98- combinedBuffer = finalBytes ;
99- }
100-
101- System .out .println ("[CipherSubscriber] Sending combined buffer to wrapped subscriber" );
102- wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBuffer ));
103- return ;
104- } else if (outputBuffer == null || outputBuffer .length == 0 ) {
105- System .out .println ("[CipherSubscriber] No bytes from cipher update, sending empty buffer" );
72+ if (outputBuffer == null || outputBuffer .length == 0 ) {
73+ System .out .println ("[CipherSubscriber] No output from cipher update" );
10674 // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber.
107- return ;
75+ wrappedSubscriber . onNext ( ByteBuffer . allocate ( 0 )) ;
10876 } else {
109- System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
110- // Not at end; send content so far
111- wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
77+ boolean atEnd = isLastPart && contentRead .get () + amountToReadFromByteBuffer >= contentLength ;
78+ System .out .println ("[CipherSubscriber] atEnd: " + atEnd + " (contentRead: " + contentRead .get () + ", contentLength: " + contentLength + ")" );
79+
80+ if (atEnd ) {
81+ System .out .println ("[CipherSubscriber] Processing final bytes" );
82+ // If all content has been read, send the final bytes in this onNext call.
83+ // The final bytes must be sent with the final onNext call, not during the onComplete call.
84+ byte [] finalBytes ;
85+ try {
86+ finalBytes = cipher .doFinal ();
87+ finalized = true ;
88+ System .out .println ("[CipherSubscriber] Cipher doFinal produced " + finalBytes .length + " bytes" );
89+ } catch (final GeneralSecurityException exception ) {
90+ System .out .println ("[CipherSubscriber] Error during doFinal: " + exception .getMessage ());
91+ wrappedSubscriber .onError (exception );
92+ throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
93+ }
94+
95+ // Combine outputBuffer and finalBytes if both exist
96+ byte [] combinedBuffer ;
97+ if (outputBuffer != null && outputBuffer .length > 0 ) {
98+ System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
99+ combinedBuffer = new byte [outputBuffer .length + finalBytes .length ];
100+ System .arraycopy (outputBuffer , 0 , combinedBuffer , 0 , outputBuffer .length );
101+ System .arraycopy (finalBytes , 0 , combinedBuffer , outputBuffer .length , finalBytes .length );
102+ System .out .println ("[CipherSubscriber] Combined buffer total length: " + combinedBuffer .length );
103+ } else {
104+ System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
105+ combinedBuffer = finalBytes ;
106+ }
107+ System .out .println ("[CipherSubscriber] Sending combined buffer to wrapped subscriber" );
108+ wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBuffer ));
109+ } else {
110+ System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
111+ // Not at end; send content so far
112+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
113+ }
112114 }
113115 } else {
114- System .out .println ("[CipherSubscriber] No bytes to read from input buffer" );
115- return ;
116+ System .out .println ("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer" );
117+ // Do nothing
118+ wrappedSubscriber .onNext (byteBuffer );
116119 }
117120 }
118121
@@ -148,7 +151,23 @@ public void onError(Throwable t) {
148151
149152 @ Override
150153 public void onComplete () {
151- System .out .println ("[CipherSubscriber] onComplete called" );
154+ if (!isLastPart ) {
155+ // If this isn't the last part, skip doFinal, we aren't done
156+ wrappedSubscriber .onComplete ();
157+ return ;
158+ } if (finalized ) {
159+ wrappedSubscriber .onComplete ();
160+ return ;
161+ }
162+ try {
163+ outputBuffer = cipher .doFinal ();
164+ // Send the final bytes to the wrapped subscriber
165+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
166+ } catch (final GeneralSecurityException exception ) {
167+ // Forward error, else the wrapped subscriber waits indefinitely
168+ wrappedSubscriber .onError (exception );
169+ throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
170+ }
152171 wrappedSubscriber .onComplete ();
153172 }
154173
0 commit comments