Skip to content

Commit b30a380

Browse files
author
Lucas McDonald
committed
m
1 parent 53d62d6 commit b30a380

File tree

1 file changed

+54
-9
lines changed

1 file changed

+54
-9
lines changed

src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,29 +48,49 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
4848

4949
@Override
5050
public void onSubscribe(Subscription s) {
51-
wrappedSubscriber.onSubscribe(s);
51+
System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s);
52+
wrappedSubscriber.onSubscribe(new Subscription() {
53+
@Override
54+
public void request(long n) {
55+
System.out.println("[CipherSubscriber] Request received for " + n + " items");
56+
s.request(n);
57+
}
58+
59+
@Override
60+
public void cancel() {
61+
System.out.println("[CipherSubscriber] Subscription cancelled");
62+
s.cancel();
63+
}
64+
});
5265
}
5366

5467
@Override
5568
public void onNext(ByteBuffer byteBuffer) {
69+
System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength);
5670
int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer);
71+
System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer);
5772

5873
if (amountToReadFromByteBuffer > 0) {
5974
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
75+
System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer");
6076
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);
77+
System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes");
6178

6279
if (outputBuffer == null || outputBuffer.length == 0) {
6380
// The underlying data is too short to fill in the block cipher.
6481
// Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
6582
// in practice SunJCE and ACCP return an empty buffer instead, hence checks for
6683
// null OR length == 0.
67-
if (contentRead.get() == contentLength) {
84+
if (contentRead.get() + tagLength >= contentLength) {
6885
// All content has been read, so complete to get the final bytes
69-
this.onComplete();
86+
System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes");
87+
finalBytes();
88+
return;
7089
}
7190
// Otherwise, wait for more bytes. To avoid blocking,
7291
// send an empty buffer to the wrapped subscriber.
73-
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
92+
System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber");
93+
// wrappedSubscriber.onNext(ByteBuffer.allocate(0));
7494
} else {
7595
// Check if stream has read all expected content.
7696
// Once all content has been read, call onComplete.
@@ -80,7 +100,7 @@ public void onNext(ByteBuffer byteBuffer) {
80100
// Once this is true, downstream will never call `request` again
81101
// (beyond the current request that is being responded to in this onNext invocation.)
82102
// As a result, this class can only call `wrappedSubscriber.onNext` one more time.
83-
// (Reactive streams require that downstream sends a `request(n)`
103+
// (Reactive streams require that downstream sends a `request(n)`
84104
// to indicate it is ready for more data, and upstream responds to that request by calling `onNext`.
85105
// The `n` in request is the maximum number of `onNext` calls that downstream
86106
// will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.)
@@ -89,19 +109,23 @@ public void onNext(ByteBuffer byteBuffer) {
89109
// including the result of cipher.doFinal(), if applicable.
90110
// Calling `wrappedSubscriber.onNext` more than once for `request(1)`
91111
// violates the Reactive Streams specification and can cause exceptions downstream.
112+
System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength);
92113
if (contentRead.get() + tagLength >= contentLength) {
93114
// All content has been read; complete the stream.
94115
// (Signalling onComplete from here is Reactive Streams-spec compliant;
95116
// this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
96-
this.onComplete();
117+
System.out.println("[CipherSubscriber] Content read threshold reached, proceeding to finalBytes");
118+
finalBytes();
97119
} else {
98120
// Needs to read more data, so send the data downstream,
99121
// expecting that downstream will continue to request more data.
122+
System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber");
100123
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
101124
}
102125
}
103126
} else {
104127
// Do nothing
128+
System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly");
105129
wrappedSubscriber.onNext(byteBuffer);
106130
}
107131
}
@@ -110,30 +134,42 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
110134
// If content length is null, we should include everything in the cipher because the stream is essentially
111135
// unbounded.
112136
if (contentLength == null) {
137+
System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining());
113138
return byteBuffer.remaining();
114139
}
115140

116141
long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining());
117142
long amountRemaining = Math.max(0, contentLength - amountReadSoFar);
143+
System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining());
118144

119145
if (amountRemaining > byteBuffer.remaining()) {
146+
System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining());
120147
return byteBuffer.remaining();
121148
} else {
149+
System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining);
122150
return Math.toIntExact(amountRemaining);
123151
}
124152
}
125153

126154
@Override
127155
public void onError(Throwable t) {
156+
System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage());
128157
wrappedSubscriber.onError(t);
129158
}
130159

131160
@Override
132161
public void onComplete() {
162+
System.out.println("[CipherSubscriber] onComplete called");
163+
wrappedSubscriber.onComplete();
164+
}
165+
166+
public void finalBytes() {
167+
System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart + ", onCompleteCalled: " + onCompleteCalled);
133168
// onComplete can be signalled to CipherSubscriber multiple times,
134169
// but additional calls should be deduped to avoid calling onNext multiple times
135170
// and raising exceptions.
136171
if (onCompleteCalled) {
172+
System.out.println("[CipherSubscriber] finalBytes already called, returning");
137173
return;
138174
}
139175
onCompleteCalled = true;
@@ -142,25 +178,29 @@ public void onComplete() {
142178
// doFinal requires that all parts have been processed to compute the tag,
143179
// so the tag will only be computed when the last part is processed.
144180
if (!isLastPart) {
181+
System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0));
145182
// First, propagate the bytes that were in outputBuffer downstream.
146183
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
147184
// Then, propagate the onComplete signal downstream.
148-
wrappedSubscriber.onComplete();
185+
// wrappedSubscriber.onComplete();
149186
return;
150187
}
151188

152189
// If this is the last part, compute doFinal and include its result in the value sent downstream.
153190
// The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
154191
byte[] finalBytes = null;
155192
try {
193+
System.out.println("[CipherSubscriber] Calling cipher.doFinal()");
156194
finalBytes = cipher.doFinal();
195+
System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes");
157196
} catch (final GeneralSecurityException exception) {
158197
// Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
198+
System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage());
159199
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
160200
// Forward error, else the wrapped subscriber waits indefinitely
161201
wrappedSubscriber.onError(exception);
162202
// Even though doFinal failed, propagate the onComplete signal downstream
163-
wrappedSubscriber.onComplete();
203+
// wrappedSubscriber.onComplete();
164204
throw new S3EncryptionClientSecurityException(exception.getMessage(), exception);
165205
}
166206

@@ -169,19 +209,24 @@ public void onComplete() {
169209
// This single onNext call must contain both the bytes from outputBuffer and the tag.
170210
byte[] combinedBytes;
171211
if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) {
212+
System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)");
172213
combinedBytes = new byte[outputBuffer.length + finalBytes.length];
173214
System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length);
174215
System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length);
175216
} else if (outputBuffer != null && outputBuffer.length > 0) {
217+
System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)");
176218
combinedBytes = outputBuffer;
177219
} else if (finalBytes != null && finalBytes.length > 0) {
220+
System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)");
178221
combinedBytes = finalBytes;
179222
} else {
223+
System.out.println("[CipherSubscriber] No bytes to send");
180224
combinedBytes = new byte[0];
181225
}
182226

227+
System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length);
183228
wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes));
184-
wrappedSubscriber.onComplete();
229+
// wrappedSubscriber.onComplete();
185230
}
186231

187232
}

0 commit comments

Comments
 (0)