15
15
*/
16
16
package com .google .cloud .bigtable .data .v2 .stub ;
17
17
18
- import static org .mockito .Mockito .never ;
19
- import static org .mockito .Mockito .times ;
20
- import static org .mockito .Mockito .verify ;
18
+ import static com .google .common .truth .Truth .assertThat ;
21
19
import static org .mockito .Mockito .when ;
22
20
23
21
import com .google .api .core .ApiFuture ;
24
22
import com .google .api .gax .core .NoCredentialsProvider ;
25
- import com .google .api .gax .tracing .ApiTracer ;
26
23
import com .google .api .gax .tracing .ApiTracerFactory ;
27
24
import com .google .auto .value .AutoValue ;
28
25
import com .google .bigtable .v2 .BigtableGrpc ;
43
40
import com .google .cloud .bigtable .data .v2 .models .TableId ;
44
41
import com .google .cloud .bigtable .data .v2 .models .TargetId ;
45
42
import com .google .cloud .bigtable .data .v2 .stub .metrics .BigtableTracer ;
43
+ import com .google .cloud .bigtable .data .v2 .stub .metrics .NoopMetricsProvider ;
46
44
import com .google .common .base .Preconditions ;
47
45
import com .google .common .base .Supplier ;
48
46
import com .google .common .collect .ImmutableList ;
56
54
import io .grpc .ServerServiceDefinition ;
57
55
import io .grpc .stub .ServerCalls ;
58
56
import io .grpc .stub .StreamObserver ;
57
+ import java .util .Optional ;
58
+ import java .util .concurrent .ConcurrentHashMap ;
59
59
import java .util .concurrent .ExecutionException ;
60
60
import java .util .concurrent .LinkedBlockingDeque ;
61
61
import java .util .concurrent .TimeUnit ;
62
62
import java .util .concurrent .TimeoutException ;
63
+ import java .util .concurrent .atomic .AtomicInteger ;
63
64
import org .junit .After ;
64
65
import org .junit .Assert ;
65
66
import org .junit .Before ;
69
70
import org .junit .runners .JUnit4 ;
70
71
import org .mockito .Mock ;
71
72
import org .mockito .Mockito ;
72
- import org .mockito .exceptions .verification .WantedButNotInvoked ;
73
73
import org .mockito .junit .MockitoJUnit ;
74
74
import org .mockito .junit .MockitoRule ;
75
75
@@ -85,7 +85,7 @@ public class SkipTrailersTest {
85
85
private Server server ;
86
86
87
87
@ Mock private ApiTracerFactory tracerFactory ;
88
- @ Mock private BigtableTracer tracer ;
88
+ private FakeTracer tracer = new FakeTracer () ;
89
89
90
90
private BigtableDataClient client ;
91
91
@@ -95,12 +95,12 @@ public void setUp() throws Exception {
95
95
server = FakeServiceBuilder .create (hackedService ).start ();
96
96
97
97
when (tracerFactory .newTracer (Mockito .any (), Mockito .any (), Mockito .any ())).thenReturn (tracer );
98
- when (tracer .inScope ()).thenReturn (Mockito .mock (ApiTracer .Scope .class ));
99
98
100
99
BigtableDataSettings .Builder clientBuilder =
101
100
BigtableDataSettings .newBuilderForEmulator (server .getPort ())
102
101
.setProjectId (PROJECT_ID )
103
102
.setInstanceId (INSTANCE_ID )
103
+ .setMetricsProvider (NoopMetricsProvider .INSTANCE )
104
104
.setCredentialsProvider (NoCredentialsProvider .create ());
105
105
clientBuilder .stubSettings ().setEnableSkipTrailers (true ).setTracerFactory (tracerFactory );
106
106
@@ -159,7 +159,7 @@ private <T> void test(Supplier<ApiFuture<?>> invoker, T fakeResponse)
159
159
160
160
// Wait for the call to start on the server
161
161
@ SuppressWarnings ("unchecked" )
162
- ServerRpc <?, T > rpc = (ServerRpc <?, T >) hackedService .rpcs .poll (10 , TimeUnit .SECONDS );
162
+ ServerRpc <?, T > rpc = (ServerRpc <?, T >) hackedService .rpcs .poll (30 , TimeUnit .SECONDS );
163
163
Preconditions .checkNotNull (
164
164
rpc , "Timed out waiting for the call to be received by the mock server" );
165
165
@@ -173,8 +173,21 @@ private <T> void test(Supplier<ApiFuture<?>> invoker, T fakeResponse)
173
173
Assert .fail ("timed out waiting for the trailer optimization future to resolve" );
174
174
}
175
175
176
- verify (tracer , times (1 )).operationFinishEarly ();
177
- verify (tracer , never ()).operationSucceeded ();
176
+ // The tracer will be notified in parallel to the future being resolved
177
+ // This normal and expected, but requires the test to wait a bit
178
+ for (int i = 10 ; i > 0 ; i --) {
179
+ try {
180
+ assertThat (tracer .getCallCount ("operationFinishEarly" )).isEqualTo (1 );
181
+ break ;
182
+ } catch (AssertionError e ) {
183
+ if (i > 1 ) {
184
+ Thread .sleep (100 );
185
+ } else {
186
+ throw e ;
187
+ }
188
+ }
189
+ }
190
+ assertThat (tracer .getCallCount ("operationSucceeded" )).isEqualTo (0 );
178
191
179
192
// clean up
180
193
rpc .getResponseStream ().onCompleted ();
@@ -183,9 +196,9 @@ private <T> void test(Supplier<ApiFuture<?>> invoker, T fakeResponse)
183
196
// Since we dont have a way to know exactly when this happens, we poll
184
197
for (int i = 10 ; i > 0 ; i --) {
185
198
try {
186
- verify (tracer , times ( 1 )).operationSucceeded ( );
199
+ assertThat (tracer . getCallCount ( "operationSucceeded" )).isEqualTo ( 1 );
187
200
break ;
188
- } catch (WantedButNotInvoked e ) {
201
+ } catch (AssertionError e ) {
189
202
if (i > 1 ) {
190
203
Thread .sleep (100 );
191
204
} else {
@@ -195,6 +208,27 @@ private <T> void test(Supplier<ApiFuture<?>> invoker, T fakeResponse)
195
208
}
196
209
}
197
210
211
+ static class FakeTracer extends BigtableTracer {
212
+ ConcurrentHashMap <String , AtomicInteger > callCounts = new ConcurrentHashMap <>();
213
+
214
+ @ Override
215
+ public void operationFinishEarly () {
216
+ record ("operationFinishEarly" );
217
+ }
218
+
219
+ @ Override
220
+ public void operationSucceeded () {
221
+ record ("operationSucceeded" );
222
+ }
223
+
224
+ private void record (String op ) {
225
+ callCounts .computeIfAbsent (op , (ignored ) -> new AtomicInteger ()).getAndIncrement ();
226
+ }
227
+
228
+ private int getCallCount (String op ) {
229
+ return Optional .ofNullable (callCounts .get (op )).map (AtomicInteger ::get ).orElse (0 );
230
+ }
231
+ }
198
232
/**
199
233
* Hack the srvice definition to allow grpc server to simulate delayed trailers. This will augment
200
234
* the bigtable service definition to promote unary rpcs to server streaming
0 commit comments