1818import static com .google .common .truth .Truth .assertThat ;
1919import static org .junit .Assert .assertThrows ;
2020
21- import com .google .api .gax .core .NoCredentialsProvider ;
2221import com .google .api .gax .grpc .GrpcStatusCode ;
23- import com .google .api .gax .grpc .GrpcTransportChannel ;
2422import com .google .api .gax .rpc .ApiException ;
2523import com .google .api .gax .rpc .ErrorDetails ;
26- import com .google .api .gax .rpc .FixedTransportChannelProvider ;
2724import com .google .api .gax .rpc .InternalException ;
2825import com .google .api .gax .rpc .UnavailableException ;
2926import com .google .bigtable .v2 .BigtableGrpc ;
4542import com .google .bigtable .v2 .SampleRowKeysResponse ;
4643import com .google .cloud .bigtable .data .v2 .BigtableDataClient ;
4744import com .google .cloud .bigtable .data .v2 .BigtableDataSettings ;
45+ import com .google .cloud .bigtable .data .v2 .FakeServiceBuilder ;
4846import com .google .cloud .bigtable .data .v2 .models .BulkMutation ;
4947import com .google .cloud .bigtable .data .v2 .models .ConditionalRowMutation ;
5048import com .google .cloud .bigtable .data .v2 .models .Filters ;
5553import com .google .cloud .bigtable .data .v2 .models .ReadModifyWriteRow ;
5654import com .google .cloud .bigtable .data .v2 .models .RowMutation ;
5755import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
56+ import com .google .cloud .bigtable .data .v2 .models .TableId ;
5857import com .google .common .base .Stopwatch ;
5958import com .google .common .collect .ImmutableList ;
6059import com .google .common .collect .Queues ;
6160import com .google .protobuf .Any ;
6261import com .google .rpc .RetryInfo ;
62+ import io .grpc .ForwardingServerCall ;
6363import io .grpc .Metadata ;
64+ import io .grpc .MethodDescriptor ;
65+ import io .grpc .Server ;
66+ import io .grpc .ServerCall ;
67+ import io .grpc .ServerCallHandler ;
68+ import io .grpc .ServerInterceptor ;
6469import io .grpc .Status ;
6570import io .grpc .StatusRuntimeException ;
6671import io .grpc .stub .StreamObserver ;
67- import io .grpc .testing .GrpcServerRule ;
6872import java .io .IOException ;
6973import java .time .Duration ;
74+ import java .util .HashSet ;
7075import java .util .Queue ;
76+ import java .util .Set ;
7177import java .util .concurrent .atomic .AtomicInteger ;
78+ import java .util .stream .Collectors ;
79+ import org .junit .After ;
7280import org .junit .Before ;
73- import org .junit .Rule ;
7481import org .junit .Test ;
7582import org .junit .runner .RunWith ;
7683import org .junit .runners .JUnit4 ;
7784
7885@ RunWith (JUnit4 .class )
7986public class RetryInfoTest {
8087
81- @ Rule public GrpcServerRule serverRule = new GrpcServerRule ();
82-
8388 private static final Metadata .Key <byte []> ERROR_DETAILS_KEY =
8489 Metadata .Key .of ("grpc-status-details-bin" , Metadata .BINARY_BYTE_MARSHALLER );
8590
91+ private final Set <String > methods = new HashSet <>();
92+
8693 private FakeBigtableService service ;
94+ private Server server ;
8795 private BigtableDataClient client ;
8896 private BigtableDataSettings .Builder settings ;
8997
@@ -94,29 +102,111 @@ public class RetryInfoTest {
94102 @ Before
95103 public void setUp () throws IOException {
96104 service = new FakeBigtableService ();
97- serverRule .getServiceRegistry ().addService (service );
105+
106+ ServerInterceptor serverInterceptor =
107+ new ServerInterceptor () {
108+ @ Override
109+ public <ReqT , RespT > ServerCall .Listener <ReqT > interceptCall (
110+ ServerCall <ReqT , RespT > serverCall ,
111+ Metadata metadata ,
112+ ServerCallHandler <ReqT , RespT > serverCallHandler ) {
113+ return serverCallHandler .startCall (
114+ new ForwardingServerCall .SimpleForwardingServerCall <ReqT , RespT >(serverCall ) {
115+ @ Override
116+ public void close (Status status , Metadata trailers ) {
117+ if (trailers .containsKey (ERROR_DETAILS_KEY )) {
118+ methods .add (serverCall .getMethodDescriptor ().getBareMethodName ());
119+ }
120+ super .close (status , trailers );
121+ }
122+ },
123+ metadata );
124+ }
125+ };
126+ server = FakeServiceBuilder .create (service ).intercept (serverInterceptor ).start ();
98127
99128 settings =
100- BigtableDataSettings .newBuilder ( )
129+ BigtableDataSettings .newBuilderForEmulator ( server . getPort () )
101130 .setProjectId ("fake-project" )
102- .setInstanceId ("fake-instance" )
103- .setCredentialsProvider (NoCredentialsProvider .create ());
104-
105- settings
106- .stubSettings ()
107- .setTransportChannelProvider (
108- FixedTransportChannelProvider .create (
109- GrpcTransportChannel .create (serverRule .getChannel ())))
110- // channel priming doesn't work with FixedTransportChannelProvider. Disable it for the test
111- .setRefreshingChannel (false )
112- .build ();
131+ .setInstanceId ("fake-instance" );
113132
114133 this .client = BigtableDataClient .create (settings .build ());
115134 }
116135
136+ @ After
137+ public void tearDown () {
138+ if (client != null ) {
139+ client .close ();
140+ }
141+ if (server != null ) {
142+ server .shutdown ();
143+ }
144+ }
145+
117146 @ Test
118- public void testReadRow () {
119- verifyRetryInfoIsUsed (() -> client .readRow ("table" , "row" ), true );
147+ public void testAllMethods () {
148+ // Verify retry info is handled correctly for all the methods in data API.
149+ verifyRetryInfoIsUsed (() -> client .readRow (TableId .of ("table" ), "row" ), true );
150+
151+ attemptCounter .set (0 );
152+ verifyRetryInfoIsUsed (
153+ () -> client .readRows (Query .create (TableId .of ("table" ))).iterator ().hasNext (), true );
154+
155+ attemptCounter .set (0 );
156+ verifyRetryInfoIsUsed (
157+ () ->
158+ client .bulkMutateRows (
159+ BulkMutation .create (TableId .of ("fake-table" ))
160+ .add (RowMutationEntry .create ("row-key-1" ).setCell ("cf" , "q" , "v" ))),
161+ true );
162+
163+ attemptCounter .set (0 );
164+ verifyRetryInfoIsUsed (
165+ () ->
166+ client .mutateRow (
167+ RowMutation .create (TableId .of ("fake-table" ), "key" ).setCell ("cf" , "q" , "v" )),
168+ true );
169+
170+ attemptCounter .set (0 );
171+ verifyRetryInfoIsUsed (() -> client .sampleRowKeys (TableId .of ("table" )), true );
172+
173+ attemptCounter .set (0 );
174+ verifyRetryInfoIsUsed (
175+ () ->
176+ client .checkAndMutateRow (
177+ ConditionalRowMutation .create ("table" , "key" )
178+ .condition (Filters .FILTERS .value ().regex ("old-value" ))
179+ .then (Mutation .create ().setCell ("cf" , "q" , "v" ))),
180+ true );
181+
182+ attemptCounter .set (0 );
183+ verifyRetryInfoIsUsed (
184+ () ->
185+ client .readModifyWriteRow (
186+ ReadModifyWriteRow .create ("table" , "row" ).append ("cf" , "q" , "v" )),
187+ true );
188+
189+ attemptCounter .set (0 );
190+ verifyRetryInfoIsUsed (
191+ () -> client .readChangeStream (ReadChangeStreamQuery .create ("table" )).iterator ().hasNext (),
192+ true );
193+
194+ attemptCounter .set (0 );
195+ verifyRetryInfoIsUsed (
196+ () -> client .generateInitialChangeStreamPartitions ("table" ).iterator ().hasNext (), true );
197+
198+ // Verify that the new data API methods are tested or excluded. This is enforced by
199+ // introspecting grpc
200+ // method descriptors.
201+ Set <String > expected =
202+ BigtableGrpc .getServiceDescriptor ().getMethods ().stream ()
203+ .map (MethodDescriptor ::getBareMethodName )
204+ .collect (Collectors .toSet ());
205+
206+ // Exclude methods that don't support retry info
207+ methods .add ("PingAndWarm" );
208+
209+ assertThat (methods ).containsExactlyElementsIn (expected );
120210 }
121211
122212 @ Test
@@ -147,11 +237,6 @@ public void testReadRowServerNotReturningRetryInfoClientDisabledHandling() throw
147237 }
148238 }
149239
150- @ Test
151- public void testReadRows () {
152- verifyRetryInfoIsUsed (() -> client .readRows (Query .create ("table" )).iterator ().hasNext (), true );
153- }
154-
155240 @ Test
156241 public void testReadRowsNonRetraybleErrorWithRetryInfo () {
157242 verifyRetryInfoIsUsed (() -> client .readRows (Query .create ("table" )).iterator ().hasNext (), false );
@@ -181,16 +266,6 @@ public void testReadRowsServerNotReturningRetryInfoClientDisabledHandling() thro
181266 }
182267 }
183268
184- @ Test
185- public void testMutateRows () {
186- verifyRetryInfoIsUsed (
187- () ->
188- client .bulkMutateRows (
189- BulkMutation .create ("fake-table" )
190- .add (RowMutationEntry .create ("row-key-1" ).setCell ("cf" , "q" , "v" ))),
191- true );
192- }
193-
194269 @ Test
195270 public void testMutateRowsNonRetryableErrorWithRetryInfo () {
196271 verifyRetryInfoIsUsed (
@@ -238,12 +313,6 @@ public void testMutateRowsServerNotReturningRetryInfoClientDisabledHandling() th
238313 }
239314 }
240315
241- @ Test
242- public void testMutateRow () {
243- verifyRetryInfoIsUsed (
244- () -> client .mutateRow (RowMutation .create ("table" , "key" ).setCell ("cf" , "q" , "v" )), true );
245- }
246-
247316 @ Test
248317 public void testMutateRowNonRetryableErrorWithRetryInfo () {
249318 verifyRetryInfoIsUsed (
@@ -278,11 +347,6 @@ public void testMutateRowServerNotReturningRetryInfoClientDisabledHandling() thr
278347 }
279348 }
280349
281- @ Test
282- public void testSampleRowKeys () {
283- verifyRetryInfoIsUsed (() -> client .sampleRowKeys ("table" ), true );
284- }
285-
286350 @ Test
287351 public void testSampleRowKeysNonRetryableErrorWithRetryInfo () {
288352 verifyRetryInfoIsUsed (() -> client .sampleRowKeys ("table" ), false );
@@ -312,17 +376,6 @@ public void testSampleRowKeysServerNotReturningRetryInfoClientDisabledHandling()
312376 }
313377 }
314378
315- @ Test
316- public void testCheckAndMutateRow () {
317- verifyRetryInfoIsUsed (
318- () ->
319- client .checkAndMutateRow (
320- ConditionalRowMutation .create ("table" , "key" )
321- .condition (Filters .FILTERS .value ().regex ("old-value" ))
322- .then (Mutation .create ().setCell ("cf" , "q" , "v" ))),
323- true );
324- }
325-
326379 @ Test
327380 public void testCheckAndMutateDisableRetryInfo () throws IOException {
328381 settings .stubSettings ().setEnableRetryInfo (false );
@@ -368,15 +421,6 @@ public void testCheckAndMutateServerNotReturningRetryInfoClientDisabledHandling(
368421 }
369422 }
370423
371- @ Test
372- public void testReadModifyWrite () {
373- verifyRetryInfoIsUsed (
374- () ->
375- client .readModifyWriteRow (
376- ReadModifyWriteRow .create ("table" , "row" ).append ("cf" , "q" , "v" )),
377- true );
378- }
379-
380424 @ Test
381425 public void testReadModifyWriteDisableRetryInfo () throws IOException {
382426 settings .stubSettings ().setEnableRetryInfo (false );
@@ -414,13 +458,6 @@ public void testReadModifyWriteNotReturningRetryInfoClientDisabledHandling() thr
414458 }
415459 }
416460
417- @ Test
418- public void testReadChangeStream () {
419- verifyRetryInfoIsUsed (
420- () -> client .readChangeStream (ReadChangeStreamQuery .create ("table" )).iterator ().hasNext (),
421- true );
422- }
423-
424461 @ Test
425462 public void testReadChangeStreamNonRetryableErrorWithRetryInfo () {
426463 verifyRetryInfoIsUsed (
@@ -465,12 +502,6 @@ public void testReadChangeStreamNotReturningRetryInfoClientDisabledHandling() th
465502 }
466503 }
467504
468- @ Test
469- public void testGenerateInitialChangeStreamPartition () {
470- verifyRetryInfoIsUsed (
471- () -> client .generateInitialChangeStreamPartitions ("table" ).iterator ().hasNext (), true );
472- }
473-
474505 @ Test
475506 public void testGenerateInitialChangeStreamPartitionNonRetryableError () {
476507 verifyRetryInfoIsUsed (
0 commit comments