@@ -30,7 +30,8 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
3030 cipher = materials .getCipher (iv );
3131 this .isLastPart = isLastPart ;
3232
33- // Determine the tag length based on the cipher algorithm
33+ // Determine the tag length based on the cipher algorithm.
34+ // This class uses the tag length to identify the end of the stream before the onComplete signal is sent.
3435 if (cipher .getAlgorithm ().contains ("GCM" )) {
3536 tagLength = 16 ;
3637 } else if (cipher .getAlgorithm ().contains ("CBC" ) || cipher .getAlgorithm ().contains ("CTR" )) {
@@ -71,14 +72,25 @@ public void onNext(ByteBuffer byteBuffer) {
7172 // send an empty buffer to the wrapped subscriber.
7273 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
7374 } else {
75+ // Check if stream has read all expected content.
7476 // Once all content has been read, call onComplete.
75- // This class can identify when all content has been read because the amount of data read so far
76- // plus the tag length exceeds the content length.
77+ //
78+ // This class determines that all content has been read by checking if
79+ // the amount of data read so far plus the tag length is at least the content length.
80+ // Once this is true, downstream will never call `request` again
81+ // (beyond the current request that is being responded to in this onNext invocation.)
82+ // As a result, this class can only call `wrappedSubscriber.onNext` one more time.
83+ // (Reactive streams require that downstream sends a `request(n)`
84+ // to indicate it is ready for more data, and upstream responds to that request by calling `onNext`.
85+ // The `n` in request is the maximum number of `onNext` calls that downstream
86+ // will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.)
87+ // Since this class can only call `wrappedSubscriber.onNext` once,
88+ // it must send all remaining data in the next onNext call,
89+ // including the result of cipher.doFinal(), if applicable.
90+ // Calling `wrappedSubscriber.onNext` more than once violates the Reactive Streams specification
91+ // and can cause exceptions downstream.
7792 if (contentRead .get () + tagLength >= contentLength ) {
78- // All content has been read, so complete the stream.
79- // The next onNext call MUST include all bytes, including the result of cipher.doFinal().
80- // Sending any additional onNext calls violates the Reactive Streams specification
81- // and can lead to issues.
93+ // All content has been read; complete the stream.
8294 this .onComplete ();
8395 } else {
8496 // Needs to read more data, so send the data downstream,
@@ -116,14 +128,17 @@ public void onError(Throwable t) {
116128
117129 @ Override
118130 public void onComplete () {
119- // onComplete can be signalled to CipherSubscriber multiple times,
120- // but additional calls should be deduped.
131+ // onComplete can be signalled to CipherSubscriber multiple times
132+ // but additional calls should be deduped to avoid calling onNext multiple times
133+ // and raising exceptions.
121134 if (onCompleteCalled ) {
122135 return ;
123136 }
124137 onCompleteCalled = true ;
125138
126139 // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
140+ // doFinal requires that all parts have been processed to compute the tag,
141+ // so the tag will only be computed when the last part is processed.
127142 if (!isLastPart ) {
128143 // First, propagate the bytes that were in outputBuffer downstream.
129144 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
@@ -132,7 +147,9 @@ public void onComplete() {
132147 return ;
133148 }
134149
135- // If this is the last part, include the result of doFinal in the value sent downstream.
150+ // If this is the last part, compute doFinal and include its result in the value sent downstream.
151+ // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
152+ // When this class calculates it has read all content
136153 byte [] finalBytes = null ;
137154 try {
138155 finalBytes = cipher .doFinal ();
@@ -141,7 +158,7 @@ public void onComplete() {
141158 wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
142159 // Forward error, else the wrapped subscriber waits indefinitely
143160 wrappedSubscriber .onError (exception );
144- // Even though doFinal failed, propagate the onComplete signal downstream.
161+ // Even though doFinal failed, propagate the onComplete signal downstream
145162 wrappedSubscriber .onComplete ();
146163 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
147164 }
0 commit comments