Skip to content

Commit 23584da

Browse files
authored
chore: update ChunkSegmenter to optionally allow a limit on the number of bytes it should consume (#3279)
* chore: update ChunkSegmenter to optionally allow a limit on the number of bytes it should consume Update BidiAppendableUnbufferedWritableByteChannel to only attempt to consume as many bytes as are available according to the stream -- this prevents over packing of segments we will for sure never be able to use. * chore: fix blockSize reset logic
1 parent 0e348db commit 23584da

File tree

4 files changed

+137
-20
lines changed

4 files changed

+137
-20
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th
136136
RewindableContent rewindableContent = RewindableContent.of(srcs, srcsOffset, srcsLength);
137137
long totalBufferRemaining = rewindableContent.getLength();
138138

139-
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, true);
139+
ChunkSegment[] data =
140+
chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, true, availableCapacity);
140141
if (data.length == 0) {
141142
return 0;
142143
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ChunkSegmenter.java

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2020
import com.google.common.annotations.VisibleForTesting;
21+
import com.google.common.base.MoreObjects;
2122
import com.google.common.base.Preconditions;
2223
import com.google.common.math.IntMath;
24+
import com.google.common.primitives.Ints;
2325
import com.google.protobuf.ByteString;
2426
import java.math.RoundingMode;
2527
import java.nio.ByteBuffer;
@@ -97,66 +99,96 @@ ChunkSegment[] segmentBuffers(
9799
// turn this into a single branch, rather than multiple that would need to be checked each
98100
// element of the iteration
99101
if (allowUnalignedBlocks) {
100-
return segmentWithUnaligned(bbs, offset, length);
102+
return segmentWithUnaligned(bbs, offset, length, Long.MAX_VALUE);
101103
} else {
102-
return segmentWithoutUnaligned(bbs, offset, length);
104+
return segmentWithoutUnaligned(bbs, offset, length, Long.MAX_VALUE);
103105
}
104106
}
105107

106-
private ChunkSegment[] segmentWithUnaligned(ByteBuffer[] bbs, int offset, int length) {
108+
ChunkSegment[] segmentBuffers(
109+
ByteBuffer[] bbs,
110+
int offset,
111+
int length,
112+
boolean allowUnalignedBlocks,
113+
long maxBytesToConsume) {
114+
// turn this into a single branch, rather than multiple that would need to be checked each
115+
// element of the iteration
116+
if (allowUnalignedBlocks) {
117+
return segmentWithUnaligned(bbs, offset, length, maxBytesToConsume);
118+
} else {
119+
long misaligned = maxBytesToConsume % blockSize;
120+
long alignedMaxBytesToConsume = maxBytesToConsume - misaligned;
121+
return segmentWithoutUnaligned(bbs, offset, length, alignedMaxBytesToConsume);
122+
}
123+
}
124+
125+
private ChunkSegment[] segmentWithUnaligned(
126+
ByteBuffer[] bbs, int offset, int length, long maxBytesToConsume) {
107127
Deque<ChunkSegment> data = new ArrayDeque<>();
108128

129+
long consumed = 0;
109130
for (int i = offset; i < length; i++) {
110131
ByteBuffer buffer = bbs[i];
111132
int remaining;
112-
while ((remaining = buffer.remaining()) > 0) {
113-
consumeBytes(data, remaining, buffer);
133+
while ((remaining = buffer.remaining()) > 0 && consumed < maxBytesToConsume) {
134+
long remainingConsumable = maxBytesToConsume - consumed;
135+
int toConsume = remaining;
136+
if (remainingConsumable < remaining) {
137+
toConsume = Math.toIntExact(remainingConsumable);
138+
}
139+
long consumeBytes = consumeBytes(data, toConsume, buffer);
140+
consumed += consumeBytes;
114141
}
115142
}
116143

117144
return data.toArray(new ChunkSegment[0]);
118145
}
119146

120-
private ChunkSegment[] segmentWithoutUnaligned(ByteBuffer[] bbs, int offset, int length) {
147+
private ChunkSegment[] segmentWithoutUnaligned(
148+
ByteBuffer[] bbs, int offset, int length, long maxBytesToConsume) {
121149
Deque<ChunkSegment> data = new ArrayDeque<>();
122150

123-
final long totalRemaining = Buffers.totalRemaining(bbs, offset, length);
151+
long buffersTotalRemaining = Buffers.totalRemaining(bbs, offset, length);
152+
final long totalRemaining = Math.min(maxBytesToConsume, buffersTotalRemaining);
124153
long consumedSoFar = 0;
125154

126155
int currentBlockPending = blockSize;
127156

157+
outerloop:
128158
for (int i = offset; i < length; i++) {
129159
ByteBuffer buffer = bbs[i];
130160
int remaining;
131161
while ((remaining = buffer.remaining()) > 0) {
132162
long overallRemaining = totalRemaining - consumedSoFar;
133163
if (overallRemaining < blockSize && currentBlockPending == blockSize) {
134-
break;
164+
break outerloop;
135165
}
136166

137167
int numBytesConsumable;
138-
if (remaining >= blockSize) {
168+
if (remaining >= blockSize && currentBlockPending == blockSize) {
139169
int blockCount = IntMath.divide(remaining, blockSize, RoundingMode.DOWN);
140170
numBytesConsumable = blockCount * blockSize;
141-
} else if (currentBlockPending < blockSize) {
142-
numBytesConsumable = currentBlockPending;
143-
currentBlockPending = blockSize;
144171
} else {
145-
numBytesConsumable = remaining;
146-
currentBlockPending = currentBlockPending - remaining;
172+
numBytesConsumable = Math.min(remaining, currentBlockPending);
147173
}
148174
if (numBytesConsumable <= 0) {
149-
continue;
175+
break outerloop;
150176
}
151177

152-
consumedSoFar += consumeBytes(data, numBytesConsumable, buffer);
178+
int consumed = consumeBytes(data, numBytesConsumable, buffer);
179+
int currentBlockPendingLessConsumed = currentBlockPending - consumed;
180+
currentBlockPending = currentBlockPendingLessConsumed % blockSize;
181+
if (currentBlockPending == 0) {
182+
currentBlockPending = blockSize;
183+
}
184+
consumedSoFar += consumed;
153185
}
154186
}
155187

156188
return data.toArray(new ChunkSegment[0]);
157189
}
158190

159-
private long consumeBytes(Deque<ChunkSegment> data, int numBytesConsumable, ByteBuffer buffer) {
191+
private int consumeBytes(Deque<ChunkSegment> data, int numBytesConsumable, ByteBuffer buffer) {
160192
// either no chunk or most recent chunk is full, start a new one
161193
ChunkSegment peekLast = data.peekLast();
162194
if (peekLast == null || peekLast.b.size() == maxSegmentSize) {
@@ -167,7 +199,8 @@ private long consumeBytes(Deque<ChunkSegment> data, int numBytesConsumable, Byte
167199
} else {
168200
ChunkSegment chunkSoFar = data.pollLast();
169201
//noinspection ConstantConditions -- covered by peekLast check above
170-
int limit = Math.min(numBytesConsumable, maxSegmentSize - chunkSoFar.b.size());
202+
int limit =
203+
Ints.min(buffer.remaining(), numBytesConsumable, maxSegmentSize - chunkSoFar.b.size());
171204
ChunkSegment datum = newSegment(buffer, limit);
172205
ChunkSegment plus = chunkSoFar.concat(datum);
173206
data.addLast(plus);
@@ -218,5 +251,14 @@ public Crc32cLengthKnown getCrc32c() {
218251
public boolean isOnlyFullBlocks() {
219252
return onlyFullBlocks;
220253
}
254+
255+
@Override
256+
public String toString() {
257+
return MoreObjects.toStringHelper(this)
258+
.add("crc32c", crc32c)
259+
.add("onlyFullBlocks", onlyFullBlocks)
260+
.add("b", b)
261+
.toString();
262+
}
221263
}
222264
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ChunkSegmenterTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
2323
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
24+
import com.google.cloud.storage.it.ChecksummedTestContent;
2425
import com.google.common.collect.ImmutableList;
2526
import com.google.common.hash.HashCode;
2627
import com.google.common.hash.Hashing;
@@ -172,6 +173,78 @@ void allowUnalignedBlocks_false_3() throws Exception {
172173
() -> assertThat(actual).isEqualTo(expected));
173174
}
174175

176+
@Example
177+
void maxBytesToConsume_unaligned() throws Exception {
178+
179+
ChecksummedTestContent ctc = ChecksummedTestContent.gen(64);
180+
181+
ChunkSegmenter segmenter = new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.noCopy(), 6, 3);
182+
183+
List<ChecksummedTestContent> chunks = ctc.chunkup(4);
184+
ByteBuffer[] buffers =
185+
chunks.stream().map(ChecksummedTestContent::asByteBuffer).toArray(ByteBuffer[]::new);
186+
buffers[1].position(1);
187+
188+
ChecksummedTestContent slice = ctc.slice(5, 37);
189+
List<ByteString> expected =
190+
slice.chunkup(6).stream()
191+
.map(ChecksummedTestContent::asByteBuffer)
192+
.map(ByteStringStrategy.noCopy())
193+
.collect(Collectors.toList());
194+
195+
ChunkSegment[] segments = segmenter.segmentBuffers(buffers, 1, buffers.length - 2, true, 37);
196+
List<ByteString> actual =
197+
Arrays.stream(segments).map(ChunkSegment::getB).collect(Collectors.toList());
198+
assertThat(actual).isEqualTo(expected);
199+
}
200+
201+
@Example
202+
void maxBytesToConsume_aligned() throws Exception {
203+
204+
ChecksummedTestContent ctc = ChecksummedTestContent.gen(64);
205+
206+
ChunkSegmenter segmenter = new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.noCopy(), 6, 3);
207+
208+
List<ChecksummedTestContent> chunks = ctc.chunkup(4);
209+
ByteBuffer[] buffers =
210+
chunks.stream().map(ChecksummedTestContent::asByteBuffer).toArray(ByteBuffer[]::new);
211+
buffers[1].position(1);
212+
213+
ChecksummedTestContent slice = ctc.slice(5, 36);
214+
List<ByteString> expected =
215+
slice.chunkup(6).stream()
216+
.map(ChecksummedTestContent::asByteBuffer)
217+
.map(ByteStringStrategy.noCopy())
218+
.collect(Collectors.toList());
219+
220+
ChunkSegment[] segments = segmenter.segmentBuffers(buffers, 1, buffers.length - 2, false, 37);
221+
List<ByteString> actual =
222+
Arrays.stream(segments).map(ChunkSegment::getB).collect(Collectors.toList());
223+
assertThat(actual).isEqualTo(expected);
224+
}
225+
226+
@Example
227+
void alignedConsumeForLargeBuffersOnlyConsumesAligned() throws Exception {
228+
229+
ChecksummedTestContent ctc = ChecksummedTestContent.gen(2048 + 13);
230+
231+
ChunkSegmenter segmenter =
232+
new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.noCopy(), 2048, 256);
233+
234+
ChecksummedTestContent slice = ctc.slice(0, 2048);
235+
List<ByteString> expected =
236+
slice.chunkup(2048).stream()
237+
.map(ChecksummedTestContent::asByteBuffer)
238+
.map(ByteStringStrategy.noCopy())
239+
.collect(Collectors.toList());
240+
241+
ByteBuffer buf = ctc.asByteBuffer();
242+
ChunkSegment[] segments = segmenter.segmentBuffers(new ByteBuffer[] {buf}, 0, 1, false);
243+
List<ByteString> actual =
244+
Arrays.stream(segments).map(ChunkSegment::getB).collect(Collectors.toList());
245+
assertThat(actual).isEqualTo(expected);
246+
}
247+
175248
@Provide("TestData")
176249
static Arbitrary<TestData> arbitraryTestData() {
177250
return Arbitraries.lazyOf(

google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ public void takeoverJustToFinalizeWorks() throws Exception {
263263
}
264264

265265
private void checkTestbenchIssue733() {
266-
if (p.uploadConfig.getCloseAction() == CloseAction.FINALIZE_WHEN_CLOSING) {
266+
if (backend == Backend.TEST_BENCH
267+
&& p.uploadConfig.getCloseAction() == CloseAction.FINALIZE_WHEN_CLOSING) {
267268
int estimatedMessageCount = 0;
268269
FlushPolicy flushPolicy = p.uploadConfig.getFlushPolicy();
269270
if (flushPolicy instanceof MinFlushSizeFlushPolicy) {

0 commit comments

Comments
 (0)