@@ -42,15 +42,11 @@ public void onSubscribe(Subscription s) {
4242
4343 @ Override
4444 public void onNext (ByteBuffer byteBuffer ) {
45- System .out .println ("[CipherSubscriber] onNext called with buffer size: " + byteBuffer .remaining ());
4645 int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer (byteBuffer );
47- System .out .println ("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer );
4846
4947 if (amountToReadFromByteBuffer > 0 ) {
5048 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
51- System .out .println ("[CipherSubscriber] Copied " + buf .length + " bytes from input buffer" );
5249 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
53- System .out .println ("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer .length : 0 ) + " bytes" );
5450
5551 if (outputBuffer == null || outputBuffer .length == 0 ) {
5652 // The underlying data is too short to fill in the block cipher.
@@ -59,29 +55,30 @@ public void onNext(ByteBuffer byteBuffer) {
5955 // null OR length == 0.
6056 if (contentRead .get () == contentLength ) {
6157 // All content has been read, so complete to get the final bytes
62- System .out .println ("[CipherSubscriber] All content read, calling onComplete" );
6358 this .onComplete ();
6459 }
6560 // Otherwise, wait for more bytes. To avoid blocking,
6661 // send an empty buffer to the wrapped subscriber.
67- System .out .println ("[CipherSubscriber] Sending empty buffer to wrapped subscriber" );
6862 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
6963 } else {
70- Long amount = isLastPart ? contentLength - (cipher .getBlockSize ()) : contentLength - (cipher .getBlockSize ());
71- System .out .println ("[CipherSubscriber] Amount: " + amount );
72- System .out .println ("[CipherSubscriber] Content read: " + contentRead .get ());
73- System .out .println ("content.get() - amount: " + (contentRead .get () - amount ));
64+ // cipher.update will only return a block of data if it has been provided a full block of data.
65+ // If it has been provided a partial block of data, it will not return partial data.
66+ // If the CipherSubscriber is done sending data, but the total amount of data is not a multiple of the block size,
67+ // the amount of content returned by the cipher will be less than the contentLength by at most the block size.
68+ // Calling `doFinal` will return the remaining bytes along with the tag.
69+ Long amount = contentLength - cipher .getBlockSize ();
7470 if (contentRead .get () < amount ) {
75- System .out .println ("[CipherSubscriber] Sending output buffer of size " + outputBuffer .length + " to wrapped subscriber" );
71+ // If the amount of data read so far is less than the amount of data that should have been read,
72+ // send the data downstream, expecting that downstream will request more data.
7673 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
7774 } else {
78- System .out .println ("[CipherSubscriber] Content read threshold reached, calling onComplete" );
75+ // If the amount of data read so far is at least the amount of data that should have been read,
76+ // complete the stream, as downstream will not request any more data.
7977 this .onComplete ();
8078 }
8179 }
8280 } else {
8381 // Do nothing
84- System .out .println ("[CipherSubscriber] No data to process, forwarding buffer directly" );
8582 wrappedSubscriber .onNext (byteBuffer );
8683 }
8784 }
@@ -90,79 +87,70 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
9087 // If content length is null, we should include everything in the cipher because the stream is essentially
9188 // unbounded.
9289 if (contentLength == null ) {
93- System .out .println ("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer .remaining ());
9490 return byteBuffer .remaining ();
9591 }
9692
9793 long amountReadSoFar = contentRead .getAndAdd (byteBuffer .remaining ());
9894 long amountRemaining = Math .max (0 , contentLength - amountReadSoFar );
99- System .out .println ("[CipherSubscriber] Amount read so far: " + amountReadSoFar + ", remaining: " + amountRemaining );
10095
10196 if (amountRemaining > byteBuffer .remaining ()) {
102- System .out .println ("[CipherSubscriber] Reading entire buffer: " + byteBuffer .remaining ());
10397 return byteBuffer .remaining ();
10498 } else {
105- System .out .println ("[CipherSubscriber] Reading partial buffer: " + amountRemaining );
10699 return Math .toIntExact (amountRemaining );
107100 }
108101 }
109102
110103 @ Override
111104 public void onError (Throwable t ) {
112- System .out .println ("[CipherSubscriber] Error occurred: " + t .getMessage ());
113105 wrappedSubscriber .onError (t );
114106 }
115107
116108 @ Override
117109 public void onComplete () {
118- System .out .println ("[CipherSubscriber] onComplete called" );
119110 if (onCompleteCalled ) {
120- System .out .println ("[CipherSubscriber] onComplete already called, returning" );
121111 return ;
122112 }
123113 onCompleteCalled = true ;
124114 if (!isLastPart ) {
125115 // If this isn't the last part, skip doFinal, we aren't done
126- System .out .println ("[CipherSubscriber] Not last part, skipping doFinal" );
127116 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
128117 wrappedSubscriber .onComplete ();
129118 return ;
130119 }
131- try {
132- System .out .println ("[CipherSubscriber] Calling cipher.doFinal()" );
133- byte [] finalBytes = cipher .doFinal ();
134- System .out .println ("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes .length : 0 ) + " bytes" );
135-
136- byte [] combinedBytes ;
137- if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
138- System .out .println ("[CipherSubscriber] Combining outputBuffer (" + outputBuffer .length + " bytes) with finalBytes (" + finalBytes .length + " bytes)" );
139- combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
140- System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
141- System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
142- } else if (outputBuffer != null && outputBuffer .length > 0 ) {
143- System .out .println ("[CipherSubscriber] Using only outputBuffer (" + outputBuffer .length + " bytes)" );
144- combinedBytes = outputBuffer ;
145- } else if (finalBytes != null && finalBytes .length > 0 ) {
146- System .out .println ("[CipherSubscriber] Using only finalBytes (" + finalBytes .length + " bytes)" );
147- combinedBytes = finalBytes ;
148- } else {
149- System .out .println ("[CipherSubscriber] No bytes to send" );
150- combinedBytes = new byte [0 ];
151- }
152120
153- if (combinedBytes .length > 0 ) {
154- System .out .println ("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes .length );
155- wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
156- }
157- wrappedSubscriber .onComplete ();
121+ byte [] finalBytes = null ;
122+ try {
123+ finalBytes = cipher .doFinal ();
158124 } catch (final GeneralSecurityException exception ) {
159- // Forward error, else the wrapped subscriber waits indefinitely
160- System .out .println ("[CipherSubscriber] Security exception during doFinal: " + exception .getMessage ());
125+ // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
161126 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
127+ // Forward error, else the wrapped subscriber waits indefinitely
162128 wrappedSubscriber .onError (exception );
129+ // Even though doFinal failed, downstream still expects to receive onComplete signal
163130 wrappedSubscriber .onComplete ();
164131 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
165132 }
133+
134+ // Combine the bytes from outputBuffer and finalBytes into one onNext call.
135+ // Downstream has requested `1` in its request method, so this class can only call onNext once.
136+ // This onNext call must contain both the bytes from outputBuffer and the tag.
137+ byte [] combinedBytes ;
138+ if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
139+ combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
140+ System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
141+ System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
142+ } else if (outputBuffer != null && outputBuffer .length > 0 ) {
143+ combinedBytes = outputBuffer ;
144+ } else if (finalBytes != null && finalBytes .length > 0 ) {
145+ combinedBytes = finalBytes ;
146+ } else {
147+ combinedBytes = new byte [0 ];
148+ }
149+
150+ if (combinedBytes .length > 0 ) {
151+ wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
152+ }
153+ wrappedSubscriber .onComplete ();
166154 }
167155
168156}
0 commit comments