@@ -19,6 +19,7 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
1919 private Cipher cipher ;
2020 private final Long contentLength ;
2121 private boolean isLastPart ;
22+ private int tagLength ;
2223 private boolean onCompleteCalled = false ;
2324
2425 private byte [] outputBuffer ;
@@ -28,6 +29,15 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
2829 this .contentLength = contentLength ;
2930 cipher = materials .getCipher (iv );
3031 this .isLastPart = isLastPart ;
32+
33+ // Determine the tag length based on the cipher algorithm
34+ if (cipher .getAlgorithm ().contains ("GCM" )) {
35+ tagLength = 16 ;
36+ } else if (cipher .getAlgorithm ().contains ("CBC" ) || cipher .getAlgorithm ().contains ("CTR" )) {
37+ tagLength = 0 ;
38+ } else {
39+ throw new IllegalArgumentException ("Unsupported cipher type: " + cipher .getAlgorithm ());
40+ }
3141 }
3242
3343 CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv ) {
@@ -61,20 +71,19 @@ public void onNext(ByteBuffer byteBuffer) {
6171 // send an empty buffer to the wrapped subscriber.
6272 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
6373 } else {
64- // cipher.update will only return a block of data if it has been provided a full block of data.
65- // If it has been provided a partial block of data, it will not return partial data.
66- // If the CipherSubscriber is done sending data, but the total amount of data is not a multiple of the block size,
67- // the amount of content returned by the cipher will be less than the contentLength by at most the block size.
68- // Calling `doFinal` will return the remaining bytes along with the tag.
69- Long amount = contentLength - cipher .getBlockSize ();
70- if (contentRead .get () < amount ) {
71- // If the amount of data read so far is less than the amount of data that should have been read,
72- // send the data downstream, expecting that downstream will request more data.
73- wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
74- } else {
75- // If the amount of data read so far is at least the amount of data that should have been read,
76- // complete the stream, as downstream will not request any more data.
74+ // Once all content has been read, call onComplete.
75+ // This class can identify when all content has been read because the amount of data read so far
76+ // plus the tag length will equal the content length.
77+ if (contentRead .get () + tagLength == contentLength ) {
78+ // All content has been read, so complete the stream.
79+ // The next onNext call MUST include all bytes, including the result of cipher.doFinal().
80+ // Sending any additional onNext calls violates the Reactive Streams specification
81+ // and can lead to issues.
7782 this .onComplete ();
83+ } else {
84+ // Needs to read more data, so send the data downstream,
85+ // expecting that downstream will continue to request more data.
86+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
7887 }
7988 }
8089 } else {
@@ -107,17 +116,23 @@ public void onError(Throwable t) {
107116
108117 @ Override
109118 public void onComplete () {
119+ // onComplete can be signalled to CipherSubscriber multiple times,
120+ // but additional calls should be deduped.
110121 if (onCompleteCalled ) {
111122 return ;
112123 }
113124 onCompleteCalled = true ;
125+
126+ // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
114127 if (!isLastPart ) {
115- // If this isn't the last part, skip doFinal, we aren't done
128+ // First, propagate the bytes that were in outputBuffer downstream.
116129 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
130+ // Then, propagate the onComplete signal downstream.
117131 wrappedSubscriber .onComplete ();
118132 return ;
119133 }
120134
135+ // If this is the last part, include the result of doFinal in the value sent downstream.
121136 byte [] finalBytes = null ;
122137 try {
123138 finalBytes = cipher .doFinal ();
@@ -126,14 +141,14 @@ public void onComplete() {
126141 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
127142 // Forward error, else the wrapped subscriber waits indefinitely
128143 wrappedSubscriber .onError (exception );
129- // Even though doFinal failed, downstream still expects to receive onComplete signal
144+ // Even though doFinal failed, propagate the onComplete signal downstream.
130145 wrappedSubscriber .onComplete ();
131146 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
132147 }
133148
134149 // Combine the bytes from outputBuffer and finalBytes into one onNext call.
135- // Downstream has requested `1` in its request method, so this class can only call onNext once.
136- // This onNext call must contain both the bytes from outputBuffer and the tag.
150+ // Downstream has requested one item in its request method, so this class can only call onNext once.
151+ // This single onNext call must contain both the bytes from outputBuffer and the tag.
137152 byte [] combinedBytes ;
138153 if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
139154 combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
@@ -147,9 +162,7 @@ public void onComplete() {
147162 combinedBytes = new byte [0 ];
148163 }
149164
150- if (combinedBytes .length > 0 ) {
151- wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
152- }
165+ wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
153166 wrappedSubscriber .onComplete ();
154167 }
155168
0 commit comments