@@ -40,16 +40,33 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
4040
4141 @ Override
4242 public void onSubscribe (Subscription s ) {
43- wrappedSubscriber .onSubscribe (s );
43+ System .out .println ("[CipherSubscriber] onSubscribe called with subscription: " + s );
44+ wrappedSubscriber .onSubscribe (new Subscription () {
45+ @ Override
46+ public void request (long n ) {
47+ System .out .println ("[CipherSubscriber] Request received for " + n + " items" );
48+ s .request (n );
49+ }
50+
51+ @ Override
52+ public void cancel () {
53+ System .out .println ("[CipherSubscriber] Subscription cancelled" );
54+ s .cancel ();
55+ }
56+ });
4457 }
4558
4659 @ Override
4760 public void onNext (ByteBuffer byteBuffer ) {
61+ System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining () + ", contentRead: " + contentRead .get () + ", contentLength: " + contentLength );
4862 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
63+ System .out .println ("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer );
4964
5065 if (amountToReadFromByteBuffer > 0 ) {
5166 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
67+ System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
5268 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
69+ System .out .println ("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer .length : 0 ) + " bytes" );
5370
5471 if (outputBuffer == null || outputBuffer .length == 0 ) {
5572 // The underlying data is too short to fill in the block cipher.
@@ -58,11 +75,13 @@ public void onNext(ByteBuffer byteBuffer) {
5875 // null OR length == 0.
5976 if (contentRead .get () + tagLength >= contentLength ) {
6077 // All content has been read, so complete to get the final bytes
78+ System .out .println ("[CipherSubscriber] All content read (" + contentRead .get () + " bytes), proceeding to finalBytes" );
6179 finalBytes ();
6280 return ;
6381 }
6482 // Otherwise, wait for more bytes. To avoid blocking,
6583 // send an empty buffer to the wrapped subscriber.
84+ System .out .println ("[CipherSubscriber] Sending empty buffer to wrapped subscriber" );
6685 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
6786 } else {
6887 /*
@@ -84,17 +103,21 @@ public void onNext(ByteBuffer byteBuffer) {
84103 Calling `wrappedSubscriber.onNext` more than once for `request(1)`
85104 violates the Reactive Streams specification and can cause exceptions downstream.
86105 */
106+ System .out .println ("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead .get () + ", tagLength=" + tagLength + ", contentLength=" + contentLength );
87107 if (contentRead .get () + tagLength >= contentLength ) {
88108 // All content has been read; complete the stream.
109+ System .out .println ("[CipherSubscriber] Content read threshold (" + contentRead .get () + ") reached, proceeding to finalBytes" );
89110 finalBytes ();
90111 } else {
91112 // Needs to read more data, so send the data downstream,
92113 // expecting that downstream will continue to request more data.
114+ System .out .println ("[CipherSubscriber] Sending " + outputBuffer .length + " bytes to wrapped subscriber" );
93115 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
94116 }
95117 }
96118 } else {
97119 // Do nothing
120+ System .out .println ("[CipherSubscriber] No data to process, forwarding buffer directly" );
98121 wrappedSubscriber .onNext (byteBuffer );
99122 }
100123 }
@@ -103,31 +126,39 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
103126 // If content length is null, we should include everything in the cipher because the stream is essentially
104127 // unbounded.
105128 if (contentLength == null ) {
129+ System .out .println ("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer .remaining ());
106130 return byteBuffer .remaining ();
107131 }
108132
109133 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
110134 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
135+ System .out .println ("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer .remaining ());
111136
112137 if (amountRemaining > byteBuffer .remaining ()) {
138+ System .out .println ("[CipherSubscriber] Reading entire buffer: " + byteBuffer .remaining ());
113139 return byteBuffer .remaining ();
114140 } else {
141+ System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
115142 return Math .toIntExact (amountRemaining );
116143 }
117144 }
118145
119146 @ Override
120147 public void onError (Throwable t ) {
148+ System .out .println ("[CipherSubscriber] Error occurred: " + t .getMessage ());
121149 wrappedSubscriber .onError (t );
122150 }
123151
124152 @ Override
125153 public void onComplete () {
126154 // In rare cases, e.g. when the last part of a low-level MPU has 0 length,
127155 // onComplete will be called before onNext is called once.
156+ System .out .println ("[CipherSubscriber] onComplete called" );
128157 if (contentRead .get () + tagLength <= contentLength ) {
158+ System .out .println ("[CipherSubscriber] onComplete called prematurely! The content read is " + contentRead .get () + " but the contentLength is " + contentLength );
129159 finalBytes ();
130160 }
161+ System .out .println ("[CipherSubscriber] forward onComplete" );
131162 wrappedSubscriber .onComplete ();
132163 }
133164
@@ -138,14 +169,17 @@ public void onComplete() {
138169 */
139170 private void finalBytes () {
140171 if (!finalBytesCalled .compareAndSet (false , true )) {
172+ System .out .println ("[CipherSubscriber] finalBytes already called!" );
141173 // already called, don't repeat
142174 return ;
143175 }
144176
145177 // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
146178 // doFinal requires that all parts have been processed to compute the tag,
147179 // so the tag will only be computed when the last part is processed.
180+ System .out .println ("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart );
148181 if (!isLastPart ) {
182+ System .out .println ("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer .length : 0 ));
149183 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
150184 return ;
151185 }
@@ -154,9 +188,11 @@ private void finalBytes() {
154188 // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
155189 byte [] finalBytes ;
156190 try {
191+ System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
157192 finalBytes = cipher .doFinal ();
158193 } catch (final GeneralSecurityException exception ) {
159194 // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
195+ System .out .println ("[CipherSubscriber] Security exception during doFinal: " + exception .getMessage ());
160196 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
161197 // Forward error, else the wrapped subscriber waits indefinitely
162198 wrappedSubscriber .onError (exception );
@@ -168,17 +204,22 @@ private void finalBytes() {
168204 // This single onNext call must contain both the bytes from outputBuffer and the tag.
169205 byte [] combinedBytes ;
170206 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
207+ System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
171208 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
172209 System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
173210 System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
174211 } else if (outputBuffer != null && outputBuffer .length > 0 ) {
212+ System .out .println ("[CipherSubscriber] Using only outputBuffer (" + outputBuffer .length + " bytes)" );
175213 combinedBytes = outputBuffer ;
176214 } else if (finalBytes != null && finalBytes .length > 0 ) {
215+ System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
177216 combinedBytes = finalBytes ;
178217 } else {
218+ System .out .println ("[CipherSubscriber] No bytes to send" );
179219 combinedBytes = new byte [0 ];
180220 }
181221
222+ System .out .println ("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes .length );
182223 wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
183224 }
184225
0 commit comments