3030import com .google .cloud .storage .Conversions .Decoder ;
3131import com .google .cloud .storage .Crc32cValue .Crc32cLengthKnown ;
3232import com .google .cloud .storage .GrpcUtils .ZeroCopyServerStreamingCallable ;
33+ import com .google .cloud .storage .Hasher .UncheckedChecksumMismatchException ;
3334import com .google .cloud .storage .ResponseContentLifecycleHandle .ChildRef ;
3435import com .google .cloud .storage .Retrying .Retrier ;
3536import com .google .cloud .storage .UnbufferedReadableByteChannelSession .UnbufferedReadableByteChannel ;
@@ -104,7 +105,11 @@ public boolean shouldRetry(
104105 boolean isWatchdogTimeout =
105106 previousThrowable instanceof StorageException
106107 && previousThrowable .getCause () instanceof WatchdogTimeoutException ;
107- boolean shouldRetry = isWatchdogTimeout || alg .shouldRetry (previousThrowable , null );
108+ boolean isChecksumMismatch =
109+ previousThrowable instanceof StorageException
110+ && previousThrowable .getCause () instanceof UncheckedChecksumMismatchException ;
111+ boolean shouldRetry =
112+ isWatchdogTimeout || isChecksumMismatch || alg .shouldRetry (previousThrowable , null );
108113 if (previousThrowable != null && !shouldRetry ) {
109114 result .setException (previousThrowable );
110115 }
@@ -146,59 +151,38 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
146151 Thread .currentThread ().interrupt ();
147152 throw new InterruptedIOException ();
148153 }
154+ if (take instanceof IOException ) {
155+ IOException ioe = (IOException ) take ;
156+ if (alg .shouldRetry (ioe , null )) {
157+ readObjectObserver = null ;
158+ continue ;
159+ } else {
160+ ioe .addSuppressed (new AsyncStorageTaskException ());
161+ throw ioe ;
162+ }
163+ }
149164 if (take instanceof Throwable ) {
150165 Throwable throwable = (Throwable ) take ;
151166 BaseServiceException coalesce = StorageException .coalesce (throwable );
152167 if (alg .shouldRetry (coalesce , null )) {
153168 readObjectObserver = null ;
154169 continue ;
155170 } else {
171+ close ();
156172 throw new IOException (coalesce );
157173 }
158174 }
159175 if (take == EOF_MARKER ) {
160176 complete = true ;
161177 break ;
162178 }
163- readObjectObserver .request ();
164179
165- ReadObjectResponse resp = (ReadObjectResponse ) take ;
166- try (ResponseContentLifecycleHandle <ReadObjectResponse > handle =
167- read .getResponseContentLifecycleManager ().get (resp )) {
168- ReadObjectResponseChildRef ref = ReadObjectResponseChildRef .from (handle );
169- if (resp .hasMetadata ()) {
170- Object respMetadata = resp .getMetadata ();
171- if (metadata == null ) {
172- metadata = respMetadata ;
173- } else if (metadata .getGeneration () != respMetadata .getGeneration ()) {
174- throw closeWithError (
175- String .format (
176- Locale .US ,
177- "Mismatch Generation between subsequent reads. Expected %d but received %d" ,
178- metadata .getGeneration (),
179- respMetadata .getGeneration ()));
180- }
181- }
182- ChecksummedData checksummedData = resp .getChecksummedData ();
183- ByteString content = checksummedData .getContent ();
184- int contentSize = content .size ();
185- // Very important to know whether a crc32c value is set. Without checking, protobuf will
186- // happily return 0, which is a valid crc32c value.
187- if (checksummedData .hasCrc32C ()) {
188- Crc32cLengthKnown expected = Crc32cValue .of (checksummedData .getCrc32C (), contentSize );
189- try {
190- hasher .validate (expected , content );
191- } catch (IOException e ) {
192- close ();
193- throw e ;
194- }
195- }
196- ref .copy (c , dsts , offset , length );
197- if (ref .hasRemaining ()) {
198- leftovers = ref ;
199- } else {
200- ref .close ();
201- }
180+ ReadObjectResponseChildRef ref = (ReadObjectResponseChildRef ) take ;
181+ ref .copy (c , dsts , offset , length );
182+ if (ref .hasRemaining ()) {
183+ leftovers = ref ;
184+ } else {
185+ ref .close ();
202186 }
203187 }
204188 long read = c .read ();
@@ -321,11 +305,10 @@ private void ensureStreamOpen() {
321305 }
322306 }
323307
324- private IOException closeWithError (String message ) throws IOException {
325- close ();
308+ private IOException createError (String message ) throws IOException {
326309 StorageException cause =
327310 new StorageException (HttpStatusCodes .STATUS_CODE_PRECONDITION_FAILED , message );
328- throw new IOException (message , cause );
311+ return new IOException (message , cause );
329312 }
330313
331314 private final class ReadObjectObserver extends StateCheckingResponseObserver <ReadObjectResponse > {
@@ -335,10 +318,6 @@ private final class ReadObjectObserver extends StateCheckingResponseObserver<Rea
335318
336319 private volatile StreamController controller ;
337320
338- void request () {
339- controller .request (1 );
340- }
341-
342321 void cancel () {
343322 controller .cancel ();
344323 }
@@ -352,16 +331,50 @@ protected void onStartImpl(StreamController controller) {
352331
353332 @ Override
354333 protected void onResponseImpl (ReadObjectResponse response ) {
355- try {
356- open .set (null );
357- queue .offer (response );
358- fetchOffset .addAndGet (response .getChecksummedData ().getContent ().size ());
334+ controller .request (1 );
335+ open .set (null );
336+ try (ResponseContentLifecycleHandle <ReadObjectResponse > handle =
337+ read .getResponseContentLifecycleManager ().get (response )) {
338+ ChecksummedData checksummedData = response .getChecksummedData ();
339+ ByteString content = checksummedData .getContent ();
340+ int contentSize = content .size ();
341+ // Very important to know whether a crc32c value is set. Without checking, protobuf will
342+ // happily return 0, which is a valid crc32c value.
343+ if (checksummedData .hasCrc32C ()) {
344+ Crc32cLengthKnown expected = Crc32cValue .of (checksummedData .getCrc32C (), contentSize );
345+ try {
346+ hasher .validateUnchecked (expected , content );
347+ } catch (UncheckedChecksumMismatchException e ) {
348+ queue .offer (e );
349+ return ;
350+ }
351+ }
352+ if (response .hasMetadata ()) {
353+ Object respMetadata = response .getMetadata ();
354+ if (metadata == null ) {
355+ metadata = respMetadata ;
356+ } else if (metadata .getGeneration () != respMetadata .getGeneration ()) {
357+ IOException exception =
358+ createError (
359+ String .format (
360+ Locale .US ,
361+ "Mismatch Generation between subsequent reads. Expected %d but received %d" ,
362+ metadata .getGeneration (),
363+ respMetadata .getGeneration ()));
364+ queue .offer (exception );
365+ return ;
366+ }
367+ }
368+ queue .offer (ReadObjectResponseChildRef .from (handle ));
369+ fetchOffset .addAndGet (contentSize );
359370 if (response .hasMetadata () && !result .isDone ()) {
360371 result .set (response .getMetadata ());
361372 }
362373 } catch (InterruptedException e ) {
363374 Thread .currentThread ().interrupt ();
364375 throw Code .ABORTED .toStatus ().withCause (e ).asRuntimeException ();
376+ } catch (IOException e ) {
377+ throw new RuntimeException (e );
365378 }
366379 }
367380
0 commit comments