Skip to content

Commit 56fd235

Browse files
author
Lucas McDonald
committed
m
1 parent 033ba23 commit 56fd235

File tree

1 file changed

+5
-45
lines changed

1 file changed

+5
-45
lines changed

src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -48,33 +48,16 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
4848

4949
@Override
5050
public void onSubscribe(Subscription s) {
51-
System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s);
52-
wrappedSubscriber.onSubscribe(new Subscription() {
53-
@Override
54-
public void request(long n) {
55-
System.out.println("[CipherSubscriber] Request received for " + n + " items");
56-
s.request(n);
57-
}
58-
59-
@Override
60-
public void cancel() {
61-
System.out.println("[CipherSubscriber] Subscription cancelled");
62-
s.cancel();
63-
}
64-
});
51+
wrappedSubscriber.onSubscribe(s);
6552
}
6653

6754
@Override
6855
public void onNext(ByteBuffer byteBuffer) {
69-
System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength);
7056
int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer);
71-
System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer);
7257

7358
if (amountToReadFromByteBuffer > 0) {
7459
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
75-
System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer");
7660
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);
77-
System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes");
7861

7962
if (outputBuffer == null || outputBuffer.length == 0) {
8063
// The underlying data is too short to fill in the block cipher.
@@ -83,14 +66,12 @@ public void onNext(ByteBuffer byteBuffer) {
8366
// null OR length == 0.
8467
if (contentRead.get() + tagLength >= contentLength) {
8568
// All content has been read, so complete to get the final bytes
86-
System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes");
8769
finalBytes();
8870
return;
8971
}
9072
// Otherwise, wait for more bytes. To avoid blocking,
9173
// send an empty buffer to the wrapped subscriber.
92-
System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber");
93-
// wrappedSubscriber.onNext(ByteBuffer.allocate(0));
74+
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
9475
} else {
9576
// Check if stream has read all expected content.
9677
// Once all content has been read, call onComplete.
@@ -109,23 +90,19 @@ public void onNext(ByteBuffer byteBuffer) {
10990
// including the result of cipher.doFinal(), if applicable.
11091
// Calling `wrappedSubscriber.onNext` more than once for `request(1)`
11192
// violates the Reactive Streams specification and can cause exceptions downstream.
112-
System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength);
11393
if (contentRead.get() + tagLength >= contentLength) {
11494
// All content has been read; complete the stream.
11595
// (Signalling onComplete from here is Reactive Streams-spec compliant;
11696
// this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
117-
System.out.println("[CipherSubscriber] Content read threshold reached, proceeding to finalBytes");
11897
finalBytes();
11998
} else {
12099
// Needs to read more data, so send the data downstream,
121100
// expecting that downstream will continue to request more data.
122-
System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber");
123101
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
124102
}
125103
}
126104
} else {
127105
// Do nothing
128-
System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly");
129106
wrappedSubscriber.onNext(byteBuffer);
130107
}
131108
}
@@ -134,42 +111,34 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
134111
// If content length is null, we should include everything in the cipher because the stream is essentially
135112
// unbounded.
136113
if (contentLength == null) {
137-
System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining());
138114
return byteBuffer.remaining();
139115
}
140116

141117
long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining());
142118
long amountRemaining = Math.max(0, contentLength - amountReadSoFar);
143-
System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining());
144119

145120
if (amountRemaining > byteBuffer.remaining()) {
146-
System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining());
147121
return byteBuffer.remaining();
148122
} else {
149-
System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining);
150123
return Math.toIntExact(amountRemaining);
151124
}
152125
}
153126

154127
@Override
155128
public void onError(Throwable t) {
156-
System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage());
157129
wrappedSubscriber.onError(t);
158130
}
159131

160132
@Override
161133
public void onComplete() {
162-
System.out.println("[CipherSubscriber] onComplete called");
163134
wrappedSubscriber.onComplete();
164135
}
165136

166137
public void finalBytes() {
167-
System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart + ", onCompleteCalled: " + onCompleteCalled);
168138
// onComplete can be signalled to CipherSubscriber multiple times,
169139
// but additional calls should be deduped to avoid calling onNext multiple times
170140
// and raising exceptions.
171141
if (onCompleteCalled) {
172-
System.out.println("[CipherSubscriber] finalBytes already called, returning");
173142
return;
174143
}
175144
onCompleteCalled = true;
@@ -178,29 +147,25 @@ public void finalBytes() {
178147
// doFinal requires that all parts have been processed to compute the tag,
179148
// so the tag will only be computed when the last part is processed.
180149
if (!isLastPart) {
181-
System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0));
182150
// First, propagate the bytes that were in outputBuffer downstream.
183151
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
184152
// Then, propagate the onComplete signal downstream.
185-
// wrappedSubscriber.onComplete();
153+
wrappedSubscriber.onComplete();
186154
return;
187155
}
188156

189157
// If this is the last part, compute doFinal and include its result in the value sent downstream.
190158
// The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
191159
byte[] finalBytes = null;
192160
try {
193-
System.out.println("[CipherSubscriber] Calling cipher.doFinal()");
194161
finalBytes = cipher.doFinal();
195-
System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes");
196162
} catch (final GeneralSecurityException exception) {
197163
// Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
198-
System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage());
199164
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
200165
// Forward error, else the wrapped subscriber waits indefinitely
201166
wrappedSubscriber.onError(exception);
202167
// Even though doFinal failed, propagate the onComplete signal downstream
203-
// wrappedSubscriber.onComplete();
168+
wrappedSubscriber.onComplete();
204169
throw new S3EncryptionClientSecurityException(exception.getMessage(), exception);
205170
}
206171

@@ -209,24 +174,19 @@ public void finalBytes() {
209174
// This single onNext call must contain both the bytes from outputBuffer and the tag.
210175
byte[] combinedBytes;
211176
if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) {
212-
System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)");
213177
combinedBytes = new byte[outputBuffer.length + finalBytes.length];
214178
System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length);
215179
System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length);
216180
} else if (outputBuffer != null && outputBuffer.length > 0) {
217-
System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)");
218181
combinedBytes = outputBuffer;
219182
} else if (finalBytes != null && finalBytes.length > 0) {
220-
System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)");
221183
combinedBytes = finalBytes;
222184
} else {
223-
System.out.println("[CipherSubscriber] No bytes to send");
224185
combinedBytes = new byte[0];
225186
}
226187

227-
System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length);
228188
wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes));
229-
// wrappedSubscriber.onComplete();
189+
wrappedSubscriber.onComplete();
230190
}
231191

232192
}

0 commit comments

Comments
 (0)