Skip to content

Commit 4a75f3d

Browse files
authored
chore: extend resumption strategy (#2489)
1 parent 47ca299 commit 4a75f3d

File tree

3 files changed

+43
-4
lines changed

3 files changed

+43
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.retrying.StreamResumptionStrategy;
20+
21+
@InternalApi
22+
/** Expand StreamResumptionStrategy to also process the error. */
23+
public abstract class BigtableStreamResumptionStrategy<RequestT, ResponseT>
24+
implements StreamResumptionStrategy<RequestT, ResponseT> {
25+
26+
public abstract Throwable processError(Throwable throwable);
27+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,21 @@
2222
import com.google.bigtable.v2.RowSet;
2323
import com.google.cloud.bigtable.data.v2.internal.RowSetUtil;
2424
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
25+
import com.google.cloud.bigtable.data.v2.stub.BigtableStreamResumptionStrategy;
2526
import com.google.common.base.Preconditions;
2627
import com.google.protobuf.ByteString;
2728

2829
/**
29-
* An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the
30-
* last complete row seen and upon retry can build a request to resume the stream from where it left
31-
* off.
30+
* An implementation of a {@link BigtableStreamResumptionStrategy} for merged rows. This class
31+
* tracks the last complete row seen and upon retry can build a request to resume the stream from
32+
* where it left off.
3233
*
3334
* <p>This class is considered an internal implementation detail and not meant to be used by
3435
* applications.
3536
*/
3637
@InternalApi
3738
public class ReadRowsResumptionStrategy<RowT>
38-
implements StreamResumptionStrategy<ReadRowsRequest, RowT> {
39+
extends BigtableStreamResumptionStrategy<ReadRowsRequest, RowT> {
3940
private final RowAdapter<RowT> rowAdapter;
4041
private ByteString lastKey = ByteString.EMPTY;
4142
// Number of rows processed excluding Marker row.
@@ -69,6 +70,12 @@ public RowT processResponse(RowT response) {
6970
return response;
7071
}
7172

73+
@Override
74+
public Throwable processError(Throwable throwable) {
75+
// Noop
76+
return throwable;
77+
}
78+
7279
/**
7380
* {@inheritDoc}
7481
*

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.api.gax.rpc.ServerStreamingCallable;
2626
import com.google.api.gax.rpc.StateCheckingResponseObserver;
2727
import com.google.api.gax.rpc.StreamController;
28+
import com.google.cloud.bigtable.data.v2.stub.BigtableStreamResumptionStrategy;
2829
import com.google.common.base.Preconditions;
2930
import java.util.concurrent.Callable;
3031
import java.util.concurrent.CancellationException;
@@ -345,6 +346,10 @@ private void onAttemptError(Throwable throwable) {
345346
localCancellationCause = cancellationCause;
346347
}
347348

349+
if (resumptionStrategy instanceof BigtableStreamResumptionStrategy) {
350+
throwable = ((BigtableStreamResumptionStrategy) resumptionStrategy).processError(throwable);
351+
}
352+
348353
if (localCancellationCause != null) {
349354
// Take special care to preserve the cancellation's stack trace.
350355
innerAttemptFuture.setException(localCancellationCause);

0 commit comments

Comments
 (0)