Skip to content

Commit ff38eb0

Browse files
authored
Revert "fix: fix Storage#readAllBytes to allow reading compressed byt… (#2330)
…es (#2304)" This reverts commit 68b96a9. Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-storage/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent 57f1819 commit ff38eb0

19 files changed

+347
-435
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
101101
long totalRead = 0;
102102
do {
103103
if (sbc == null) {
104-
try {
105-
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
106-
} catch (StorageException e) {
107-
result.setException(e);
108-
throw e;
109-
}
104+
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
110105
}
111106

112107
long totalRemaining = Buffers.totalRemaining(dsts, offset, length);
@@ -129,17 +124,13 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
129124
sbc = null;
130125
} else if (t instanceof IOException) {
131126
IOException ioE = (IOException) t;
132-
StorageException translate = StorageException.translate(ioE);
133-
if (resultRetryAlgorithm.shouldRetry(translate, null)) {
127+
if (resultRetryAlgorithm.shouldRetry(StorageException.translate(ioE), null)) {
134128
sbc = null;
135129
} else {
136-
result.setException(translate);
137130
throw ioE;
138131
}
139132
} else {
140-
BaseServiceException coalesce = StorageException.coalesce(t);
141-
result.setException(coalesce);
142-
throw new IOException(coalesce);
133+
throw new IOException(StorageException.coalesce(t));
143134
}
144135
} finally {
145136
long totalRemainingAfter = Buffers.totalRemaining(dsts, offset, length);
@@ -216,17 +207,20 @@ private ScatteringByteChannel open() {
216207
if (xGoogGeneration != null) {
217208
int statusCode = e.getStatusCode();
218209
if (statusCode == 404) {
219-
StorageException storageException =
220-
new StorageException(404, "Failure while trying to resume download", e);
221-
result.setException(storageException);
222-
throw storageException;
210+
throw new StorageException(404, "Failure while trying to resume download", e);
223211
}
224212
}
225-
throw StorageException.translate(e);
213+
StorageException translate = StorageException.translate(e);
214+
result.setException(translate);
215+
throw translate;
226216
} catch (IOException e) {
227-
throw StorageException.translate(e);
217+
StorageException translate = StorageException.translate(e);
218+
result.setException(translate);
219+
throw translate;
228220
} catch (Throwable t) {
229-
throw StorageException.coalesce(t);
221+
BaseServiceException coalesce = StorageException.coalesce(t);
222+
result.setException(coalesce);
223+
throw coalesce;
230224
}
231225
}
232226

google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,12 @@ public final synchronized int write(ByteBuffer src) throws IOException {
101101
}
102102
int write = tmp.write(src);
103103
return write;
104+
} catch (StorageException e) {
105+
throw new IOException(e);
104106
} catch (IOException e) {
105107
throw e;
106108
} catch (Exception e) {
107-
throw StorageException.coalesce(e);
109+
throw new IOException(StorageException.coalesce(e));
108110
}
109111
}
110112

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ final class BlobReadChannelV2 extends BaseStorageReadChannel<StorageObject> {
4444
this.opts = opts;
4545
this.blobReadChannelContext = blobReadChannelContext;
4646
this.autoGzipDecompression =
47-
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
47+
// RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
48+
// RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding
49+
// gzip.
50+
Boolean.FALSE.equals(opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM));
4851
}
4952

5053
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 12 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -256,34 +256,11 @@ public Blob create(
256256

257257
@Override
258258
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
259-
requireNonNull(blobInfo, "blobInfo must be non null");
260-
261-
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
262-
GrpcCallContext grpcCallContext =
263-
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
264-
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
265-
266-
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
267-
ResumableMedia.gapic()
268-
.write()
269-
.byteChannel(
270-
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
271-
.setHasher(Hasher.enabled())
272-
.setByteStringStrategy(ByteStringStrategy.noCopy())
273-
.direct()
274-
.unbuffered()
275-
.setRequest(req)
276-
.build();
277-
278-
// Specifically not in the try-with, so we don't close the provided stream
279-
ReadableByteChannel src =
280-
Channels.newChannel(firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)));
281-
try (UnbufferedWritableByteChannel dst = session.open()) {
282-
ByteStreams.copy(src, dst);
283-
} catch (Exception e) {
259+
try {
260+
return createFrom(blobInfo, content, options);
261+
} catch (IOException e) {
284262
throw StorageException.coalesce(e);
285263
}
286-
return getBlob(session.getResult());
287264
}
288265

289266
@Override
@@ -338,7 +315,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
338315
}
339316
return codecs.blobInfo().decode(object).asBlob(this);
340317
} catch (InterruptedException | ExecutionException e) {
341-
throw StorageException.coalesce(e.getCause());
318+
throw StorageException.coalesce(e);
342319
}
343320
}
344321

@@ -388,14 +365,7 @@ public Blob createFrom(
388365
@Override
389366
public Bucket get(String bucket, BucketGetOption... options) {
390367
Opts<BucketSourceOpt> unwrap = Opts.unwrap(options);
391-
try {
392-
return internalBucketGet(bucket, unwrap);
393-
} catch (StorageException e) {
394-
if (e.getCode() == 404) {
395-
return null;
396-
}
397-
throw e;
398-
}
368+
return internalBucketGet(bucket, unwrap);
399369
}
400370

401371
@Override
@@ -689,8 +659,7 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio
689659

690660
@Override
691661
public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
692-
UnbufferedReadableByteChannelSession<Object> session =
693-
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
662+
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
694663

695664
ByteArrayOutputStream baos = new ByteArrayOutputStream();
696665
try (UnbufferedReadableByteChannel r = session.open();
@@ -718,19 +687,16 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
718687
ReadObjectRequest request = getReadObjectRequest(blob, opts);
719688
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
720689
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
721-
boolean autoGzipDecompression =
722-
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
723690
return new GrpcBlobReadChannel(
724691
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
725692
request,
726-
autoGzipDecompression);
693+
!opts.autoGzipDecompression());
727694
}
728695

729696
@Override
730697
public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
731698

732-
UnbufferedReadableByteChannelSession<Object> session =
733-
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
699+
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
734700

735701
try (UnbufferedReadableByteChannel r = session.open();
736702
WritableByteChannel w = Files.newByteChannel(path, WRITE_OPS)) {
@@ -743,8 +709,7 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
743709
@Override
744710
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
745711

746-
UnbufferedReadableByteChannelSession<Object> session =
747-
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
712+
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
748713

749714
try (UnbufferedReadableByteChannel r = session.open();
750715
WritableByteChannel w = Channels.newChannel(outputStream)) {
@@ -1842,20 +1807,18 @@ WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> op
18421807
return opts.writeObjectRequest().apply(requestBuilder).build();
18431808
}
18441809

1845-
private UnbufferedReadableByteChannelSession<Object>
1846-
unbufferedDefaultAutoGzipDecompressingReadSession(BlobId blob, BlobSourceOption[] options) {
1810+
private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
1811+
BlobId blob, BlobSourceOption[] options) {
18471812
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
18481813
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
18491814
Set<StatusCode.Code> codes =
18501815
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
18511816
GrpcCallContext grpcCallContext =
18521817
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
1853-
boolean autoGzipDecompression =
1854-
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ true);
18551818
return ResumableMedia.gapic()
18561819
.read()
18571820
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
1858-
.setAutoGzipDecompression(autoGzipDecompression)
1821+
.setAutoGzipDecompression(!opts.autoGzipDecompression())
18591822
.unbuffered()
18601823
.setReadObjectRequest(readObjectRequest)
18611824
.build();

google-cloud-storage/src/main/java/com/google/cloud/storage/GzipReadableByteChannel.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.ApiFuture;
20-
import com.google.api.gax.rpc.ApiExceptions;
2120
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
2221
import java.io.ByteArrayInputStream;
2322
import java.io.FilterInputStream;
@@ -28,6 +27,7 @@
2827
import java.nio.channels.Channels;
2928
import java.nio.channels.ReadableByteChannel;
3029
import java.nio.channels.ScatteringByteChannel;
30+
import java.util.concurrent.ExecutionException;
3131
import java.util.zip.GZIPInputStream;
3232

3333
final class GzipReadableByteChannel implements UnbufferedReadableByteChannel {
@@ -60,7 +60,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
6060
source.read(wrap);
6161
try {
6262
// Step 2: wait for the object metadata, this is populated in the first message from GCS
63-
String contentEncoding = ApiExceptions.callAndTranslateApiException(this.contentEncoding);
63+
String contentEncoding = this.contentEncoding.get();
6464
// if the Content-Encoding is gzip, Step 3: wire gzip decompression into the byte path
6565
// this will have a copy impact as we are no longer controlling all the buffers
6666
if ("gzip".equals(contentEncoding) || "x-gzip".equals(contentEncoding)) {
@@ -86,9 +86,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
8686
bytesRead += Buffers.copy(wrap, dsts, offset, length);
8787
delegate = source;
8888
}
89-
} catch (StorageException se) {
90-
throw se;
91-
} catch (Exception e) {
89+
} catch (InterruptedException | ExecutionException e) {
9290
throw new IOException(e);
9391
}
9492
}

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static com.google.common.base.Preconditions.checkState;
2323
import static java.nio.charset.StandardCharsets.UTF_8;
24+
import static java.util.concurrent.Executors.callable;
2425

2526
import com.google.api.core.ApiFuture;
2627
import com.google.api.gax.paging.Page;
@@ -36,14 +37,12 @@
3637
import com.google.cloud.Policy;
3738
import com.google.cloud.WriteChannel;
3839
import com.google.cloud.storage.Acl.Entity;
39-
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
4040
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
4141
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
4242
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
4343
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
4444
import com.google.cloud.storage.PostPolicyV4.PostFieldsV4;
4545
import com.google.cloud.storage.PostPolicyV4.PostPolicyV4Document;
46-
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
4746
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
4847
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
4948
import com.google.cloud.storage.UnifiedOpts.Opts;
@@ -60,10 +59,9 @@
6059
import com.google.common.collect.Maps;
6160
import com.google.common.hash.Hashing;
6261
import com.google.common.io.BaseEncoding;
63-
import com.google.common.io.ByteStreams;
62+
import com.google.common.io.CountingOutputStream;
6463
import com.google.common.primitives.Ints;
6564
import java.io.ByteArrayInputStream;
66-
import java.io.ByteArrayOutputStream;
6765
import java.io.IOException;
6866
import java.io.InputStream;
6967
import java.io.OutputStream;
@@ -75,7 +73,6 @@
7573
import java.nio.ByteBuffer;
7674
import java.nio.channels.Channels;
7775
import java.nio.channels.ReadableByteChannel;
78-
import java.nio.channels.WritableByteChannel;
7976
import java.nio.file.Files;
8077
import java.nio.file.Path;
8178
import java.text.SimpleDateFormat;
@@ -617,25 +614,9 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
617614
Opts<ObjectSourceOpt> unwrap = Opts.unwrap(options);
618615
Opts<ObjectSourceOpt> resolve = unwrap.resolveFrom(blob);
619616
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
620-
boolean autoGzipDecompression =
621-
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
622-
UnbufferedReadableByteChannelSession<StorageObject> session =
623-
ResumableMedia.http()
624-
.read()
625-
.byteChannel(BlobReadChannelContext.from(this))
626-
.setAutoGzipDecompression(autoGzipDecompression)
627-
.unbuffered()
628-
.setApiaryReadRequest(
629-
new ApiaryReadRequest(storageObject, optionsMap, ByteRangeSpec.nullRange()))
630-
.build();
631-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
632-
try (UnbufferedReadableByteChannel r = session.open();
633-
WritableByteChannel w = Channels.newChannel(baos)) {
634-
ByteStreams.copy(r, w);
635-
} catch (IOException e) {
636-
throw StorageException.translate(e);
637-
}
638-
return baos.toByteArray();
617+
ResultRetryAlgorithm<?> algorithm =
618+
retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap);
619+
return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity());
639620
}
640621

641622
@Override
@@ -667,26 +648,19 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
667648

668649
@Override
669650
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
651+
final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
670652
final StorageObject pb = codecs.blobId().encode(blob);
671-
Opts<ObjectSourceOpt> resolve = Opts.unwrap(options).resolveFrom(blob);
672-
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
673-
boolean autoGzipDecompression =
674-
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
675-
UnbufferedReadableByteChannelSession<StorageObject> session =
676-
ResumableMedia.http()
677-
.read()
678-
.byteChannel(BlobReadChannelContext.from(this))
679-
.setAutoGzipDecompression(autoGzipDecompression)
680-
.unbuffered()
681-
.setApiaryReadRequest(new ApiaryReadRequest(pb, optionsMap, ByteRangeSpec.nullRange()))
682-
.build();
683-
// don't close the provided stream
684-
WritableByteChannel w = Channels.newChannel(outputStream);
685-
try (UnbufferedReadableByteChannel r = session.open()) {
686-
ByteStreams.copy(r, w);
687-
} catch (IOException e) {
688-
throw StorageException.translate(e);
689-
}
653+
ImmutableMap<StorageRpc.Option, ?> optionsMap =
654+
Opts.unwrap(options).resolveFrom(blob).getRpcOptions();
655+
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap);
656+
run(
657+
algorithm,
658+
callable(
659+
() -> {
660+
storageRpc.read(
661+
pb, optionsMap, countingOutputStream.getCount(), countingOutputStream);
662+
}),
663+
Function.identity());
690664
}
691665

692666
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2339,6 +2339,18 @@ Mapper<BlobInfo.Builder> blobInfoMapper() {
23392339
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::blobInfo);
23402340
}
23412341

2342+
/**
2343+
* Here for compatibility. This should NOT be an "Opt" instead an attribute of the channel
2344+
* builder. When {@link ReturnRawInputStream} is removed, this method should be removed as well.
2345+
*
2346+
* @see
2347+
* GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder#setAutoGzipDecompression(boolean)
2348+
*/
2349+
@Deprecated
2350+
boolean autoGzipDecompression() {
2351+
return filterTo(ReturnRawInputStream.class).findFirst().map(r -> r.val).orElse(true);
2352+
}
2353+
23422354
Decoder<BlobInfo, BlobInfo> clearBlobFields() {
23432355
return filterTo(Fields.class)
23442356
.findFirst()

0 commit comments

Comments
 (0)