2525
2626import com .google .api .client .util .Lists ;
2727import com .google .api .core .SettableApiFuture ;
28+ import com .google .api .gax .batching .Batcher ;
29+ import com .google .api .gax .batching .BatchingSettings ;
30+ import com .google .api .gax .batching .FlowControlSettings ;
2831import com .google .api .gax .rpc .ClientContext ;
2932import com .google .api .gax .rpc .ResponseObserver ;
3033import com .google .api .gax .rpc .StreamController ;
3134import com .google .api .gax .tracing .SpanName ;
3235import com .google .bigtable .v2 .BigtableGrpc ;
3336import com .google .bigtable .v2 .MutateRowRequest ;
3437import com .google .bigtable .v2 .MutateRowResponse ;
38+ import com .google .bigtable .v2 .MutateRowsRequest ;
39+ import com .google .bigtable .v2 .MutateRowsResponse ;
3540import com .google .bigtable .v2 .ReadRowsRequest ;
3641import com .google .bigtable .v2 .ReadRowsResponse ;
3742import com .google .bigtable .v2 .ResponseParams ;
4045import com .google .cloud .bigtable .data .v2 .models .Query ;
4146import com .google .cloud .bigtable .data .v2 .models .Row ;
4247import com .google .cloud .bigtable .data .v2 .models .RowMutation ;
48+ import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
4349import com .google .cloud .bigtable .data .v2 .stub .EnhancedBigtableStub ;
4450import com .google .cloud .bigtable .data .v2 .stub .EnhancedBigtableStubSettings ;
4551import com .google .cloud .bigtable .stats .StatsRecorderWrapper ;
@@ -106,6 +112,8 @@ public class BuiltinMetricsTracerTest {
106112 @ Captor private ArgumentCaptor <String > zone ;
107113 @ Captor private ArgumentCaptor <String > cluster ;
108114
115+ private int batchElementCount = 2 ;
116+
109117 @ Before
110118 public void setUp () throws Exception {
111119 // Add an interceptor to add server-timing in headers
@@ -150,6 +158,22 @@ public void sendHeaders(Metadata headers) {
150158 .mutateRowSettings ()
151159 .retrySettings ()
152160 .setInitialRetryDelay (Duration .ofMillis (200 ));
161+
162+ stubSettingsBuilder
163+ .bulkMutateRowsSettings ()
164+ .setBatchingSettings (
165+ // Each batch has 2 mutations, batch has 1 in-flight request, disable auto flush by
166+ // setting the delay to 1 hour.
167+ BatchingSettings .newBuilder ()
168+ .setElementCountThreshold ((long ) batchElementCount )
169+ .setRequestByteThreshold (1000L )
170+ .setDelayThreshold (Duration .ofHours (1 ))
171+ .setFlowControlSettings (
172+ FlowControlSettings .newBuilder ()
173+ .setMaxOutstandingElementCount ((long ) batchElementCount )
174+ .setMaxOutstandingRequestBytes (1000L )
175+ .build ())
176+ .build ());
153177 stubSettingsBuilder .setTracerFactory (mockFactory );
154178
155179 EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder .build ();
@@ -163,7 +187,7 @@ public void tearDown() {
163187 }
164188
165189 @ Test
166- public void testOperationLatencies () {
190+ public void testReadRowsOperationLatencies () {
167191 when (mockFactory .newTracer (any (), any (), any ()))
168192 .thenAnswer (
169193 (Answer <BuiltinMetricsTracer >)
@@ -179,8 +203,15 @@ public void testOperationLatencies() {
179203 long elapsed = stopwatch .elapsed (TimeUnit .MILLISECONDS );
180204
181205 verify (statsRecorderWrapper ).putOperationLatencies (operationLatency .capture ());
206+ // verify record operation is only called once
207+ verify (statsRecorderWrapper )
208+ .recordOperation (status .capture (), tableId .capture (), zone .capture (), cluster .capture ());
182209
183210 assertThat (operationLatency .getValue ()).isIn (Range .closed (SERVER_LATENCY , elapsed ));
211+ assertThat (status .getAllValues ()).containsExactly ("OK" );
212+ assertThat (tableId .getAllValues ()).containsExactly (TABLE_ID );
213+ assertThat (zone .getAllValues ()).containsExactly (ZONE );
214+ assertThat (cluster .getAllValues ()).containsExactly (CLUSTER );
184215 }
185216
186217 @ Test
@@ -198,6 +229,10 @@ public void testGfeMetrics() {
198229
199230 Lists .newArrayList (stub .readRowsCallable ().call (Query .create (TABLE_ID )));
200231
232+ // Verify record attempt are called multiple times
233+ verify (statsRecorderWrapper , times (fakeService .getAttemptCounter ().get ()))
234+ .recordAttempt (status .capture (), tableId .capture (), zone .capture (), cluster .capture ());
235+
201236 // The request was retried and gfe latency is only recorded in the retry attempt
202237 verify (statsRecorderWrapper ).putGfeLatencies (gfeLatency .capture ());
203238 assertThat (gfeLatency .getValue ()).isEqualTo (FAKE_SERVER_TIMING );
@@ -206,6 +241,11 @@ public void testGfeMetrics() {
206241 verify (statsRecorderWrapper , times (fakeService .getAttemptCounter ().get ()))
207242 .putGfeMissingHeaders (gfeMissingHeaders .capture ());
208243 assertThat (gfeMissingHeaders .getValue ()).isEqualTo (1 );
244+
245+ assertThat (status .getAllValues ()).containsExactly ("UNAVAILABLE" , "OK" );
246+ assertThat (tableId .getAllValues ()).containsExactly (TABLE_ID , TABLE_ID );
247+ assertThat (zone .getAllValues ()).containsExactly ("global" , ZONE );
248+ assertThat (cluster .getAllValues ()).containsExactly ("unspecified" , CLUSTER );
209249 }
210250
211251 @ Test
@@ -255,6 +295,8 @@ public void onComplete() {
255295
256296 verify (statsRecorderWrapper ).putApplicationLatencies (applicationLatency .capture ());
257297 verify (statsRecorderWrapper ).putOperationLatencies (operationLatency .capture ());
298+ verify (statsRecorderWrapper )
299+ .recordOperation (status .capture (), tableId .capture (), zone .capture (), cluster .capture ());
258300
259301 assertThat (counter .get ()).isEqualTo (fakeService .getResponseCounter ().get ());
260302 assertThat (applicationLatency .getValue ()).isAtLeast (APPLICATION_LATENCY * counter .get ());
@@ -287,6 +329,8 @@ public void testReadRowsApplicationLatencyWithManualFlowControl() throws Excepti
287329
288330 verify (statsRecorderWrapper ).putApplicationLatencies (applicationLatency .capture ());
289331 verify (statsRecorderWrapper ).putOperationLatencies (operationLatency .capture ());
332+ verify (statsRecorderWrapper )
333+ .recordOperation (status .capture (), tableId .capture (), zone .capture (), cluster .capture ());
290334
291335 // For manual flow control, the last application latency shouldn't count, because at that point
292336 // the server already sent back all the responses.
@@ -324,7 +368,7 @@ public void testRetryCount() {
324368 }
325369
326370 @ Test
327- public void testMutateRowAttempts () {
371+ public void testMutateRowAttemptsTagValues () {
328372 when (mockFactory .newTracer (any (), any (), any ()))
329373 .thenReturn (
330374 new BuiltinMetricsTracer (
@@ -343,6 +387,55 @@ public void testMutateRowAttempts() {
343387 assertThat (zone .getAllValues ()).containsExactly ("global" , "global" , ZONE );
344388 assertThat (cluster .getAllValues ()).containsExactly ("unspecified" , "unspecified" , CLUSTER );
345389 assertThat (status .getAllValues ()).containsExactly ("UNAVAILABLE" , "UNAVAILABLE" , "OK" );
390+ assertThat (tableId .getAllValues ()).containsExactly (TABLE_ID , TABLE_ID , TABLE_ID );
391+ }
392+
393+ @ Test
394+ public void testReadRowsAttemptsTagValues () {
395+ when (mockFactory .newTracer (any (), any (), any ()))
396+ .thenReturn (
397+ new BuiltinMetricsTracer (
398+ OperationType .ServerStreaming ,
399+ SpanName .of ("Bigtable" , "ReadRows" ),
400+ statsRecorderWrapper ));
401+
402+ Lists .newArrayList (stub .readRowsCallable ().call (Query .create ("fake-table" )).iterator ());
403+
404+ // Set a timeout to reduce flakiness of this test. BasicRetryingFuture will set
405+ // attempt succeeded and set the response which will call complete() in AbstractFuture which
406+ // calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be
407+ // called after the mutateRow call is returned. So there's a race between when the call returns
408+ // and when the record() is called in onOperationCompletion().
409+ verify (statsRecorderWrapper , timeout (50 ).times (fakeService .getAttemptCounter ().get ()))
410+ .recordAttempt (status .capture (), tableId .capture (), zone .capture (), cluster .capture ());
411+ assertThat (zone .getAllValues ()).containsExactly ("global" , ZONE );
412+ assertThat (cluster .getAllValues ()).containsExactly ("unspecified" , CLUSTER );
413+ assertThat (status .getAllValues ()).containsExactly ("UNAVAILABLE" , "OK" );
414+ }
415+
416+ @ Test
417+ public void testClientBlockingLatencies () throws InterruptedException {
418+ when (mockFactory .newTracer (any (), any (), any ()))
419+ .thenReturn (
420+ new BuiltinMetricsTracer (
421+ OperationType .Unary , SpanName .of ("Bigtable" , "MutateRows" ), statsRecorderWrapper ));
422+ try (Batcher <RowMutationEntry , Void > batcher = stub .newMutateRowsBatcher (TABLE_ID , null )) {
423+ for (int i = 0 ; i < 6 ; i ++) {
424+ batcher .add (RowMutationEntry .create ("key" ).setCell ("f" , "q" , "v" ));
425+ }
426+
427+ int expectedNumRequests = 6 / batchElementCount ;
428+ ArgumentCaptor <Long > throttledTime = ArgumentCaptor .forClass (Long .class );
429+ verify (statsRecorderWrapper , times (expectedNumRequests ))
430+ .putBatchRequestThrottled (throttledTime .capture ());
431+
432+ // Adding the first 2 elements should not get throttled since the batch is empty
433+ assertThat (throttledTime .getAllValues ().get (0 )).isEqualTo (0 );
434+ // After the first request is sent, batcher will block on add because of the server latency.
435+ // Blocking latency should be around server latency.
436+ assertThat (throttledTime .getAllValues ().get (1 )).isAtLeast (SERVER_LATENCY - 10 );
437+ assertThat (throttledTime .getAllValues ().get (2 )).isAtLeast (SERVER_LATENCY - 10 );
438+ }
346439 }
347440
348441 private static class FakeService extends BigtableGrpc .BigtableImplBase {
@@ -413,6 +506,17 @@ public void mutateRow(
413506 responseObserver .onCompleted ();
414507 }
415508
509+ @ Override
510+ public void mutateRows (
511+ MutateRowsRequest request , StreamObserver <MutateRowsResponse > responseObserver ) {
512+ try {
513+ Thread .sleep (SERVER_LATENCY );
514+ } catch (InterruptedException e ) {
515+ }
516+ responseObserver .onNext (MutateRowsResponse .getDefaultInstance ());
517+ responseObserver .onCompleted ();
518+ }
519+
416520 public AtomicInteger getAttemptCounter () {
417521 return attemptCounter ;
418522 }
0 commit comments