Skip to content

Commit 9f0a93e

Browse files
authored
fix: update BlobReadSession channels to not implicitly close once EOF is observed (#3344)
1 parent e64565a commit 9f0a93e

File tree

4 files changed

+121
-32
lines changed

4 files changed

+121
-32
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,8 @@ public boolean canShareStreamWith(ObjectReadSessionStreamRead<?> other) {
343343
@Override
344344
public void internalClose() throws IOException {
345345
if (!closed) {
346-
retryContext.reset();
347346
closed = true;
348-
if (leftovers != null) {
349-
leftovers.ref.close();
350-
}
351-
GrpcUtils.closeAll(queue);
347+
internalCleanup();
352348
}
353349
}
354350

@@ -378,7 +374,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
378374
throw new ClosedChannelException();
379375
}
380376
if (complete) {
381-
close();
377+
internalCleanup();
382378
return -1;
383379
}
384380

@@ -406,7 +402,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
406402
} else if (poll == EofMarker.INSTANCE) {
407403
complete = true;
408404
if (read == 0) {
409-
close();
405+
internalCleanup();
410406
return -1;
411407
}
412408
break;
@@ -442,6 +438,14 @@ private void offer(Closeable offer) throws InterruptedIOException {
442438
}
443439
}
444440

441+
private void internalCleanup() throws IOException {
442+
retryContext.reset();
443+
if (leftovers != null) {
444+
leftovers.ref.close();
445+
}
446+
GrpcUtils.closeAll(queue);
447+
}
448+
445449
/**
446450
* The queue items are added to is a queue of {@link Closeable}. This class smuggles a Throwable
447451
* in a no-op Closable, such that the throwable can be in the queue.

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext
7070
* them all as suppressed exceptions on the first occurrence.
7171
*/
7272
static <C extends Closeable> void closeAll(Collection<C> closeables) throws IOException {
73+
if (closeables.isEmpty()) {
74+
return;
75+
}
7376
IOException ioException =
7477
closeables.stream()
7578
.filter(Objects::nonNull)

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -356,31 +356,9 @@ public void streamingRead_eofShouldBeReturnedIfNoOtherBytesRead() throws Excepti
356356
1, RangeSpec.of(0, 137), Hasher.enabled(), RetryContext.neverRetry())) {
357357
read.eof();
358358
assertThat(read.read(ByteBuffer.allocate(1))).isEqualTo(-1);
359-
360-
assertAll(
361-
() -> assertThrows(ClosedChannelException.class, () -> read.read((ByteBuffer) null)),
362-
() -> assertThat(read.isOpen()).isFalse());
363-
}
364-
}
365-
366-
@Test
367-
public void streamingRead_closedOnceEofIsRead() throws Exception {
368-
try (StreamingRead read =
369-
ObjectReadSessionStreamRead.streamingRead(
370-
1, RangeSpec.of(0, 137), Hasher.enabled(), RetryContext.neverRetry())) {
371-
ByteString bytes1 = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(62));
372-
try (ResponseContentLifecycleHandle<ByteString> handle = noopContentHandle(bytes1)) {
373-
read.accept(handle.borrow(Function.identity()));
374-
}
375-
376-
ByteBuffer buf = ByteBuffer.allocate(512);
377-
read.read(buf);
378-
read.eof();
379-
assertThat(read.read(buf)).isEqualTo(-1);
380-
381-
assertAll(
382-
() -> assertThrows(ClosedChannelException.class, () -> read.read(buf)),
383-
() -> assertThat(read.isOpen()).isFalse());
359+
assertThat(read.isOpen()).isTrue();
360+
read.close();
361+
assertThat(read.isOpen()).isFalse();
384362
}
385363
}
386364

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage.it;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static com.google.common.truth.Truth.assertWithMessage;
21+
22+
import com.google.cloud.ReadChannel;
23+
import com.google.cloud.storage.BlobId;
24+
import com.google.cloud.storage.BlobReadSession;
25+
import com.google.cloud.storage.BucketInfo;
26+
import com.google.cloud.storage.ReadProjectionConfig;
27+
import com.google.cloud.storage.ReadProjectionConfigs;
28+
import com.google.cloud.storage.Storage;
29+
import com.google.cloud.storage.TransportCompatibility.Transport;
30+
import com.google.cloud.storage.it.runner.StorageITRunner;
31+
import com.google.cloud.storage.it.runner.annotations.Backend;
32+
import com.google.cloud.storage.it.runner.annotations.CrossRun;
33+
import com.google.cloud.storage.it.runner.annotations.Inject;
34+
import com.google.cloud.storage.it.runner.registry.Generator;
35+
import com.google.cloud.storage.it.runner.registry.ObjectsFixture;
36+
import com.google.common.io.ByteStreams;
37+
import java.io.IOException;
38+
import java.nio.ByteBuffer;
39+
import java.nio.channels.Channels;
40+
import java.nio.channels.ReadableByteChannel;
41+
import java.util.concurrent.ExecutionException;
42+
import java.util.concurrent.ThreadLocalRandom;
43+
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.TimeoutException;
45+
import org.junit.Test;
46+
import org.junit.runner.RunWith;
47+
48+
@RunWith(StorageITRunner.class)
49+
@CrossRun(
50+
backends = {Backend.PROD},
51+
transports = {Transport.HTTP, Transport.GRPC})
52+
public final class ITReadableByteChannelBehaviorTest {
53+
54+
@Inject public Storage storage;
55+
@Inject public BucketInfo bucket;
56+
@Inject public Generator generator;
57+
@Inject public ObjectsFixture objectsFixture;
58+
59+
@Test
60+
public void eofReturnedMultipleTimes_reader() throws IOException {
61+
BlobId id = objectsFixture.getObj512KiB().getInfo().getBlobId();
62+
63+
try (ReadChannel reader = storage.reader(id)) {
64+
eofReturnedMultipleTimes_doTest(reader);
65+
}
66+
}
67+
68+
@Test
69+
@CrossRun.Exclude(transports = Transport.HTTP)
70+
public void eofReturnedMultipleTimes_blobReadSession_channel()
71+
throws ExecutionException, InterruptedException, TimeoutException, IOException {
72+
eofReturnedMultipleTimes_doTestBlobReadSession(ReadProjectionConfigs.asChannel());
73+
}
74+
75+
@Test
76+
@CrossRun.Exclude(transports = Transport.HTTP)
77+
public void eofReturnedMultipleTimes_blobReadSession_seekableChannel()
78+
throws ExecutionException, InterruptedException, TimeoutException, IOException {
79+
eofReturnedMultipleTimes_doTestBlobReadSession(ReadProjectionConfigs.asSeekableChannel());
80+
}
81+
82+
private void eofReturnedMultipleTimes_doTestBlobReadSession(
83+
ReadProjectionConfig<? extends ReadableByteChannel> config)
84+
throws IOException, ExecutionException, InterruptedException, TimeoutException {
85+
BlobId id = objectsFixture.getObj512KiB().getInfo().getBlobId();
86+
87+
try (BlobReadSession session = storage.blobReadSession(id).get(3, TimeUnit.SECONDS)) {
88+
try (ReadableByteChannel c = session.readAs(config)) {
89+
eofReturnedMultipleTimes_doTest(c);
90+
}
91+
}
92+
}
93+
94+
private void eofReturnedMultipleTimes_doTest(ReadableByteChannel c) throws IOException {
95+
long copy = ByteStreams.copy(c, Channels.newChannel(ByteStreams.nullOutputStream()));
96+
assertThat(copy).isEqualTo(objectsFixture.getObj512KiB().getInfo().getSize());
97+
98+
ByteBuffer buf = ByteBuffer.allocate(8);
99+
int i = ThreadLocalRandom.current().nextInt(3, 10);
100+
for (int j = 0; j < i; j++) {
101+
assertWithMessage("expected EOF " + j).that(c.read(buf)).isEqualTo(-1);
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)