@@ -20,6 +20,7 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
2020 private final Long contentLength ;
2121 private boolean isLastPart ;
2222 private boolean onCompleteCalled = false ;
23+ private final AtomicLong outstandingRequests = new AtomicLong (0 );
2324
2425 private byte [] outputBuffer ;
2526
@@ -37,53 +38,90 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
3738
3839 @ Override
3940 public void onSubscribe (Subscription s ) {
40- wrappedSubscriber .onSubscribe (s );
41+ System .out .println ("[CipherSubscriber] onSubscribe called with subscription: " + s );
42+ wrappedSubscriber .onSubscribe (new Subscription () {
43+ @ Override
44+ public void request (long n ) {
45+ System .out .println ("[CipherSubscriber] New request received for " + n + " items" );
46+ outstandingRequests .addAndGet (n );
47+ System .out .println ("[CipherSubscriber] Current outstanding requests: " + outstandingRequests .get ());
48+ s .request (n );
49+ }
50+
51+ @ Override
52+ public void cancel () {
53+ System .out .println ("[CipherSubscriber] Subscription cancelled" );
54+ s .cancel ();
55+ }
56+ });
4157 }
4258
4359 @ Override
4460 public void onNext (ByteBuffer byteBuffer ) {
61+ System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining ());
62+ System .out .println ("[CipherSubscriber] isLastPart: " + isLastPart );
4563 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
64+ System .out .println ("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer );
4665
4766 if (amountToReadFromByteBuffer > 0 ) {
67+ System .out .println ("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer );
4868 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
69+ System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
4970
5071 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
72+ System .out .println ("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer .length : 0 ));
5173
5274 if (outputBuffer == null || outputBuffer .length == 0 ) {
75+ System .out .println ("[CipherSubscriber] No output from cipher update" );
76+ System .out .println ("[CipherSubscriber] contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
5377 if (contentRead .get () == contentLength ) {
78+ System .out .println ("[CipherSubscriber] All content read (contentRead: " + contentRead .get () + ", contentLength: " + contentLength + "), calling onComplete" );
5479 this .onComplete ();
5580 }
81+ System .out .println ("[CipherSubscriber] Sending empty buffer to wrapped subscriber" );
5682 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
5783 } else {
84+ System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
85+ System .out .println ("[CipherSubscriber] contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
5886 Long amount = isLastPart ? contentLength - 31 : contentLength - 15 ;
5987 if (contentRead .get () < amount ) {
6088 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
6189 } else {
90+ System .out .println ("[CipherSubscriber] All content read (contentRead: " + contentRead .get () + ", contentLength: " + contentLength + "), calling onComplete" );
6291 this .onComplete ();
6392 }
6493 }
6594 } else {
95+ System .out .println ("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer" );
6696 wrappedSubscriber .onNext (byteBuffer );
6797 }
6898 }
6999
70100 private int getAmountToReadFromByteBuffer (ByteBuffer byteBuffer ) {
101+ System .out .println ("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer .remaining ());
102+ System .out .println ("[CipherSubscriber] Current contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
103+
71104 if (contentLength == null ) {
105+ System .out .println ("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer .remaining ());
72106 return byteBuffer .remaining ();
73107 }
74108
75109 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
76110 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
111+ System .out .println ("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining );
77112
78113 if (amountRemaining > byteBuffer .remaining ()) {
114+ System .out .println ("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer .remaining ());
79115 return byteBuffer .remaining ();
80116 } else {
117+ System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
81118 return Math .toIntExact (amountRemaining );
82119 }
83120 }
84121
85122 @ Override
86123 public void onError (Throwable t ) {
124+ System .out .println ("[CipherSubscriber] onError called: " + t .getMessage ());
87125 wrappedSubscriber .onError (t );
88126 }
89127
@@ -93,34 +131,45 @@ public void onComplete() {
93131 return ;
94132 }
95133 onCompleteCalled = true ;
134+ System .out .println ("[CipherSubscriber] onComplete called, isLastPart: " + isLastPart );
96135 if (!isLastPart ) {
136+ System .out .println ("[CipherSubscriber] Not last part, skipping doFinal" );
97137 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
98138 wrappedSubscriber .onComplete ();
99139 return ;
100140 }
101141 try {
142+ System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
102143 byte [] finalBytes = cipher .doFinal ();
144+ System .out .println ("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes .length : 0 ) + " bytes" );
103145
104146 byte [] combinedBytes ;
105147 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
148+ System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
106149 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
107150 System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
108151 System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
109152 } else if (outputBuffer != null && outputBuffer .length > 0 ) {
153+ System .out .println ("[CipherSubscriber] Using only outputBuffer (" + outputBuffer .length + " bytes)" );
110154 combinedBytes = outputBuffer ;
111155 } else if (finalBytes != null && finalBytes .length > 0 ) {
156+ System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
112157 combinedBytes = finalBytes ;
113158 } else {
159+ System .out .println ("[CipherSubscriber] No bytes to send" );
114160 combinedBytes = new byte [0 ];
115161 }
116162
117163 if (combinedBytes .length > 0 ) {
164+ System .out .println ("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes .length );
118165 wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
119166 }
120167 } catch (final GeneralSecurityException exception ) {
168+ System .out .println ("[CipherSubscriber] Error during doFinal: " + exception .getMessage ());
121169 wrappedSubscriber .onError (exception );
122170 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
123171 }
172+ System .out .println ("[CipherSubscriber] Completing wrapped subscriber" );
124173 wrappedSubscriber .onComplete ();
125174 }
126175
0 commit comments