Skip to content

Commit b2fd263

Browse files
author
Lucas McDonald
committed
m
1 parent e11731a commit b2fd263

File tree

1 file changed

+73
-32
lines changed

1 file changed

+73
-32
lines changed

src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java

Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import javax.crypto.Cipher;
1212
import java.nio.ByteBuffer;
1313
import java.security.GeneralSecurityException;
14+
import java.util.Arrays;
1415
import java.util.concurrent.atomic.AtomicLong;
1516

1617
public class CipherSubscriber implements Subscriber<ByteBuffer> {
@@ -23,6 +24,7 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
2324
private byte[] outputBuffer;
2425

2526
CipherSubscriber(Subscriber<? super ByteBuffer> wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv, boolean isLastPart) {
27+
System.out.println("[CipherSubscriber] Constructor called with contentLength: " + contentLength + ", isLastPart: " + isLastPart);
2628
this.wrappedSubscriber = wrappedSubscriber;
2729
this.contentLength = contentLength;
2830
cipher = materials.getCipher(iv);
@@ -36,78 +38,117 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
3638

3739
@Override
3840
public void onSubscribe(Subscription s) {
39-
wrappedSubscriber.onSubscribe(s);
41+
System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s);
42+
43+
wrappedSubscriber.onSubscribe(new Subscription() {
44+
@Override
45+
public void request(long n) {
46+
System.out.println("[CipherSubscriber] Request called for " + n + " items");
47+
s.request(n);
48+
}
49+
50+
@Override
51+
public void cancel() {
52+
System.out.println("[CipherSubscriber] Cancel called");
53+
s.cancel();
54+
}
55+
});
4056
}
4157

4258
@Override
4359
public void onNext(ByteBuffer byteBuffer) {
60+
System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining());
4461
int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer);
62+
System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer);
4563

4664
if (amountToReadFromByteBuffer > 0) {
65+
System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer);
4766
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
67+
System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer");
68+
4869
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);
49-
if (outputBuffer == null || outputBuffer.length == 0) {
50-
// No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber.
51-
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
52-
} else {
53-
boolean atEnd = isLastPart && contentRead.get() + amountToReadFromByteBuffer >= contentLength;
54-
55-
if (atEnd) {
56-
// If all content has been read, send the final bytes in this onNext call.
57-
// The final bytes must be sent with the final onNext call, not during the onComplete call.
58-
byte[] finalBytes;
59-
try {
60-
finalBytes = cipher.doFinal();
61-
} catch (final GeneralSecurityException exception) {
62-
wrappedSubscriber.onError(exception);
63-
throw new S3EncryptionClientSecurityException(exception.getMessage(), exception);
64-
}
65-
66-
// Combine outputBuffer and finalBytes if both exist
67-
byte[] combinedBuffer;
68-
if (outputBuffer != null && outputBuffer.length > 0) {
69-
combinedBuffer = new byte[outputBuffer.length + finalBytes.length];
70-
System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length);
71-
System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length);
72-
} else {
73-
combinedBuffer = finalBytes;
74-
}
75-
wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer));
70+
System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0));
71+
72+
boolean atEnd = isLastPart && contentRead.get() >= contentLength - 16;
73+
System.out.println("[CipherSubscriber] atEnd: " + atEnd + " (isLastPart: " + isLastPart + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength + ")");
74+
75+
if (atEnd) {
76+
System.out.println("[CipherSubscriber] Processing final bytes");
77+
// The final bytes must be sent with the final onNext call, not during the onComplete call.
78+
byte[] finalBytes;
79+
try {
80+
finalBytes = cipher.doFinal();
81+
System.out.println("[CipherSubscriber] Cipher doFinal produced " + finalBytes.length + " bytes");
82+
} catch (final GeneralSecurityException exception) {
83+
System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage());
84+
wrappedSubscriber.onError(exception);
85+
throw new S3EncryptionClientSecurityException(exception.getMessage(), exception);
86+
}
87+
88+
// Combine outputBuffer and finalBytes if both exist
89+
byte[] combinedBuffer;
90+
if (outputBuffer != null && outputBuffer.length > 0) {
91+
System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)");
92+
combinedBuffer = new byte[outputBuffer.length + finalBytes.length];
93+
System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length);
94+
System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length);
95+
System.out.println("[CipherSubscriber] Combined buffer total length: " + combinedBuffer.length);
7696
} else {
77-
// Not at end; send content so far
78-
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
97+
System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)");
98+
combinedBuffer = finalBytes;
7999
}
100+
101+
System.out.println("[CipherSubscriber] Sending combined buffer to wrapped subscriber");
102+
wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer));
103+
return;
104+
} else if (outputBuffer == null || outputBuffer.length == 0) {
105+
System.out.println("[CipherSubscriber] No bytes from cipher update, sending empty buffer");
106+
// No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber.
107+
return;
108+
} else {
109+
System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber");
110+
// Not at end; send content so far
111+
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
80112
}
81113
} else {
82-
// Do nothing
83-
wrappedSubscriber.onNext(byteBuffer);
114+
System.out.println("[CipherSubscriber] No bytes to read from input buffer");
115+
return;
84116
}
85117
}
86118

87119
private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
120+
System.out.println("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer.remaining());
121+
System.out.println("[CipherSubscriber] Current contentRead: " + contentRead.get() + ", contentLength: " + contentLength);
122+
88123
// If content length is null, we should include everything in the cipher because the stream is essentially
89124
// unbounded.
90125
if (contentLength == null) {
126+
System.out.println("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer.remaining());
91127
return byteBuffer.remaining();
92128
}
93129

94130
long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining());
95131
long amountRemaining = Math.max(0, contentLength - amountReadSoFar);
132+
System.out.println("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining);
96133

97134
if (amountRemaining > byteBuffer.remaining()) {
135+
System.out.println("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer.remaining());
98136
return byteBuffer.remaining();
99137
} else {
138+
System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining);
100139
return Math.toIntExact(amountRemaining);
101140
}
102141
}
103142

104143
@Override
105144
public void onError(Throwable t) {
145+
System.out.println("[CipherSubscriber] onError called: " + t.getMessage());
106146
wrappedSubscriber.onError(t);
107147
}
108148

109149
@Override
110150
public void onComplete() {
151+
System.out.println("[CipherSubscriber] onComplete called");
111152
wrappedSubscriber.onComplete();
112153
}
113154

0 commit comments

Comments
 (0)