@@ -47,16 +47,33 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
4747
4848 @ Override
4949 public void onSubscribe (Subscription s ) {
50- wrappedSubscriber .onSubscribe (s );
50+ //System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s);
51+ wrappedSubscriber .onSubscribe (new Subscription () {
52+ @ Override
53+ public void request (long n ) {
54+ //System.out.println("[CipherSubscriber] Request received for " + n + " items");
55+ s .request (n );
56+ }
57+
58+ @ Override
59+ public void cancel () {
60+ //System.out.println("[CipherSubscriber] Subscription cancelled");
61+ s .cancel ();
62+ }
63+ });
5164 }
5265
5366 @ Override
5467 public void onNext (ByteBuffer byteBuffer ) {
68+ //System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength);
5569 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
70+ //System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer);
5671
5772 if (amountToReadFromByteBuffer > 0 ) {
5873 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
74+ //System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer");
5975 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
76+ //System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes");
6077
6178 if (outputBuffer == null || outputBuffer .length == 0 ) {
6279 // The underlying data is too short to fill in the block cipher.
@@ -65,11 +82,13 @@ public void onNext(ByteBuffer byteBuffer) {
6582 // null OR length == 0.
6683 if (contentRead .get () + tagLength >= contentLength ) {
6784 // All content has been read, so complete to get the final bytes
85+ //System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes");
6886 finalBytes ();
6987 return ;
7088 }
7189 // Otherwise, wait for more bytes. To avoid blocking,
7290 // send an empty buffer to the wrapped subscriber.
91+ //System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber");
7392 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
7493 } else {
7594 // Check if stream has read all expected content.
@@ -89,19 +108,23 @@ public void onNext(ByteBuffer byteBuffer) {
89108 // including the result of cipher.doFinal(), if applicable.
90109 // Calling `wrappedSubscriber.onNext` more than once for `request(1)`
91110 // violates the Reactive Streams specification and can cause exceptions downstream.
111+ //System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength);
92112 if (contentRead .get () + tagLength >= contentLength ) {
93113 // All content has been read; complete the stream.
94114 // (Signalling onComplete from here is Reactive Streams-spec compliant;
95115 // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
116+ //System.out.println("[CipherSubscriber] Content read threshold (" + contentRead.get() + ") reached, proceeding to finalBytes");
96117 finalBytes ();
97118 } else {
98119 // Needs to read more data, so send the data downstream,
99120 // expecting that downstream will continue to request more data.
121+ //System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber");
100122 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
101123 }
102124 }
103125 } else {
104126 // Do nothing
127+ //System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly");
105128 wrappedSubscriber .onNext (byteBuffer );
106129 }
107130 }
@@ -110,34 +133,48 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
110133 // If content length is null, we should include everything in the cipher because the stream is essentially
111134 // unbounded.
112135 if (contentLength == null ) {
136+ //System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining());
113137 return byteBuffer .remaining ();
114138 }
115139
116140 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
117141 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
142+ //System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining());
118143
119144 if (amountRemaining > byteBuffer .remaining ()) {
145+ //System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining());
120146 return byteBuffer .remaining ();
121147 } else {
148+ //System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining);
122149 return Math .toIntExact (amountRemaining );
123150 }
124151 }
125152
126153 @ Override
127154 public void onError (Throwable t ) {
155+ //System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage());
128156 wrappedSubscriber .onError (t );
129157 }
130158
131159 @ Override
132160 public void onComplete () {
161+ //System.out.println("[CipherSubscriber] onComplete called");
162+ if (contentRead .get () < contentLength ) {
163+ //System.out.println("[CipherSubscriber] onComplete called prematurely! The content read is " + contentRead.get() + " but the contentLength is " + contentLength);
164+ //System.out.println("try just calling finalBytes() straight up");
165+ finalBytes ();
166+ //System.out.println("now let it complete");
167+ }
133168 wrappedSubscriber .onComplete ();
134169 }
135170
136171 public void finalBytes () {
137172 // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
138173 // doFinal requires that all parts have been processed to compute the tag,
139174 // so the tag will only be computed when the last part is processed.
175+ //System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart);
140176 if (!isLastPart ) {
177+ //System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0));
141178 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
142179 return ;
143180 }
@@ -146,9 +183,11 @@ public void finalBytes() {
146183 // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
147184 byte [] finalBytes ;
148185 try {
186+ //System.out.println("[CipherSubscriber] Calling cipher.doFinal()");
149187 finalBytes = cipher .doFinal ();
150188 } catch (final GeneralSecurityException exception ) {
151189 // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
190+ //System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage());
152191 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
153192 // Forward error, else the wrapped subscriber waits indefinitely
154193 wrappedSubscriber .onError (exception );
@@ -160,17 +199,22 @@ public void finalBytes() {
160199 // This single onNext call must contain both the bytes from outputBuffer and the tag.
161200 byte [] combinedBytes ;
162201 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
202+ //System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)");
163203 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
164204 System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
165205 System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
166206 } else if (outputBuffer != null && outputBuffer .length > 0 ) {
207+ //System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)");
167208 combinedBytes = outputBuffer ;
168209 } else if (finalBytes != null && finalBytes .length > 0 ) {
210+ //System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)");
169211 combinedBytes = finalBytes ;
170212 } else {
213+ //System.out.println("[CipherSubscriber] No bytes to send");
171214 combinedBytes = new byte [0 ];
172215 }
173216
217+ //System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length);
174218 wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
175219 }
176220
0 commit comments