@@ -42,35 +42,46 @@ public void onSubscribe(Subscription s) {
4242
4343 @ Override
4444 public void onNext (ByteBuffer byteBuffer ) {
45+ System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining ());
4546 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
47+ System .out .println ("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer );
4648
4749 if (amountToReadFromByteBuffer > 0 ) {
4850 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
51+ System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
4952 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
53+ System .out .println ("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer .length : 0 ) + " bytes" );
54+
5055 if (outputBuffer == null || outputBuffer .length == 0 ) {
5156 // The underlying data is too short to fill in the block cipher.
5257 // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
5358 // in practice SunJCE and ACCP return an empty buffer instead, hence checks for
5459 // null OR length == 0.
5560 if (contentRead .get () == contentLength ) {
5661 // All content has been read, so complete to get the final bytes
62+ System .out .println ("[CipherSubscriber] All content read, calling onComplete" );
5763 this .onComplete ();
5864 }
5965 // Otherwise, wait for more bytes. To avoid blocking,
6066 // send an empty buffer to the wrapped subscriber.
67+ System .out .println ("[CipherSubscriber] Sending empty buffer to wrapped subscriber" );
6168 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
6269 } else {
63- Long amount = isLastPart ? contentLength - 31 : contentLength - 15 ;
70+ Long amount = isLastPart ? contentLength - (cipher .getBlockSize ()) : contentLength - (cipher .getBlockSize ());
71+ System .out .println ("[CipherSubscriber] Amount: " + amount );
72+ System .out .println ("[CipherSubscriber] Content read: " + contentRead .get ());
73+ System .out .println ("content.get() - amount: " + (contentRead .get () - amount ));
6474 if (contentRead .get () < amount ) {
75+ System .out .println ("[CipherSubscriber] Sending output buffer of size " + outputBuffer .length + " to wrapped subscriber" );
6576 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
6677 } else {
67- // Done, wait for upstream to signal onComplete
68- System .out .println ("[CipherSubscriber] All content read (contentRead: " + contentRead .get () + ", contentLength: " + contentLength + "), waiting for onComplete" );
78+ System .out .println ("[CipherSubscriber] Content read threshold reached, calling onComplete" );
6979 this .onComplete ();
7080 }
7181 }
7282 } else {
7383 // Do nothing
84+ System .out .println ("[CipherSubscriber] No data to process, forwarding buffer directly" );
7485 wrappedSubscriber .onNext (byteBuffer );
7586 }
7687 }
@@ -79,58 +90,74 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
7990 // If content length is null, we should include everything in the cipher because the stream is essentially
8091 // unbounded.
8192 if (contentLength == null ) {
93+ System .out .println ("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer .remaining ());
8294 return byteBuffer .remaining ();
8395 }
8496
8597 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
8698 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
99+ System .out .println ("[CipherSubscriber] Amount read so far: " + amountReadSoFar + ", remaining: " + amountRemaining );
87100
88101 if (amountRemaining > byteBuffer .remaining ()) {
102+ System .out .println ("[CipherSubscriber] Reading entire buffer: " + byteBuffer .remaining ());
89103 return byteBuffer .remaining ();
90104 } else {
105+ System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
91106 return Math .toIntExact (amountRemaining );
92107 }
93108 }
94109
95110 @ Override
96111 public void onError (Throwable t ) {
112+ System .out .println ("[CipherSubscriber] Error occurred: " + t .getMessage ());
97113 wrappedSubscriber .onError (t );
98114 }
99115
100116 @ Override
101117 public void onComplete () {
118+ System .out .println ("[CipherSubscriber] onComplete called" );
102119 if (onCompleteCalled ) {
120+ System .out .println ("[CipherSubscriber] onComplete already called, returning" );
103121 return ;
104122 }
105123 onCompleteCalled = true ;
106124 if (!isLastPart ) {
107125 // If this isn't the last part, skip doFinal, we aren't done
126+ System .out .println ("[CipherSubscriber] Not last part, skipping doFinal" );
108127 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
109128 wrappedSubscriber .onComplete ();
110129 return ;
111130 }
112131 try {
132+ System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
113133 byte [] finalBytes = cipher .doFinal ();
134+ System .out .println ("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes .length : 0 ) + " bytes" );
114135
115136 byte [] combinedBytes ;
116137 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
138+ System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
117139 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
118140 System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
119141 System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
120142 } else if (outputBuffer != null && outputBuffer .length > 0 ) {
143+ System .out .println ("[CipherSubscriber] Using only outputBuffer (" + outputBuffer .length + " bytes)" );
121144 combinedBytes = outputBuffer ;
122145 } else if (finalBytes != null && finalBytes .length > 0 ) {
146+ System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
123147 combinedBytes = finalBytes ;
124148 } else {
149+ System .out .println ("[CipherSubscriber] No bytes to send" );
125150 combinedBytes = new byte [0 ];
126151 }
127152
128153 if (combinedBytes .length > 0 ) {
154+ System .out .println ("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes .length );
129155 wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
130156 }
131157 wrappedSubscriber .onComplete ();
132158 } catch (final GeneralSecurityException exception ) {
133159 // Forward error, else the wrapped subscriber waits indefinitely
160+ System .out .println ("[CipherSubscriber] Security exception during doFinal: " + exception .getMessage ());
134161 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
135162 wrappedSubscriber .onError (exception );
136163 wrappedSubscriber .onComplete ();
0 commit comments