Skip to content

Commit 4264a7f

Browse files
authored
Merge pull request #471 from yidongnan/fix/metrics-tests
Fix async processing issues in metric tests
2 parents afdfe8f + dcbd7b7 commit 4264a7f

File tree

2 files changed

+206
-12
lines changed

2 files changed

+206
-12
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright (c) 2016-2020 Michael Zhang <[email protected]>
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
7+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package net.devh.boot.grpc.test.config;
19+
20+
import static net.devh.boot.grpc.common.util.InterceptorOrder.ORDER_LAST;
21+
22+
import java.util.concurrent.CountDownLatch;
23+
24+
import org.springframework.context.annotation.Configuration;
25+
26+
import io.grpc.CallOptions;
27+
import io.grpc.Channel;
28+
import io.grpc.ClientCall;
29+
import io.grpc.ClientInterceptor;
30+
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
31+
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
32+
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
33+
import io.grpc.Metadata;
34+
import io.grpc.MethodDescriptor;
35+
import io.grpc.ServerCall;
36+
import io.grpc.ServerCall.Listener;
37+
import io.grpc.ServerCallHandler;
38+
import io.grpc.ServerInterceptor;
39+
import io.grpc.Status;
40+
import net.devh.boot.grpc.client.interceptor.GrpcGlobalClientInterceptor;
41+
import net.devh.boot.grpc.client.interceptor.OrderedClientInterceptor;
42+
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
43+
import net.devh.boot.grpc.server.interceptor.OrderedServerInterceptor;
44+
45+
/**
46+
* Helper configuration that can be used to await the completion/closing of the next calls.
47+
*
48+
* @author Daniel Theuke ([email protected])
49+
*/
50+
@Configuration
51+
public class AwaitableServerClientCallConfiguration {
52+
53+
private static CountDownLatch serverCounter;
54+
private static CountDownLatch clientCounter;
55+
56+
/**
57+
* A testing server interceptor, that allows awaiting the completion of the server call that is otherwise closed
58+
* asynchronously.
59+
*
60+
* @return A testing server interceptor bean.
61+
*/
62+
@GrpcGlobalServerInterceptor
63+
ServerInterceptor awaitableServerInterceptor() {
64+
return new OrderedServerInterceptor(new ServerInterceptor() {
65+
66+
@Override
67+
public <ReqT, RespT> Listener<ReqT> interceptCall(
68+
final ServerCall<ReqT, RespT> call,
69+
final Metadata headers,
70+
final ServerCallHandler<ReqT, RespT> next) {
71+
72+
if (serverCounter == null || serverCounter.getCount() == 0) {
73+
return next.startCall(call, headers);
74+
} else {
75+
final CountDownLatch thatCounter = serverCounter;
76+
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
77+
78+
@Override
79+
public void close(final Status status, final Metadata trailers) {
80+
super.close(status, trailers);
81+
thatCounter.countDown();
82+
}
83+
84+
}, headers);
85+
}
86+
}
87+
88+
}, ORDER_LAST);
89+
}
90+
91+
/**
92+
* A testing client interceptor, that allows awaiting the completion of the client call that is otherwise closed
93+
* asynchronously.
94+
*
95+
* @return A testing client interceptor bean.
96+
*/
97+
@GrpcGlobalClientInterceptor
98+
ClientInterceptor awaitableClientInterceptor() {
99+
return new OrderedClientInterceptor(new ClientInterceptor() {
100+
101+
@Override
102+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
103+
final MethodDescriptor<ReqT, RespT> method,
104+
final CallOptions callOptions,
105+
final Channel next) {
106+
107+
if (clientCounter == null || clientCounter.getCount() == 0) {
108+
return next.newCall(method, callOptions);
109+
} else {
110+
final CountDownLatch thatCounter = clientCounter;
111+
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
112+
113+
@Override
114+
public void start(final Listener<RespT> responseListener, final Metadata headers) {
115+
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
116+
117+
@Override
118+
public void onClose(final Status status, final Metadata trailers) {
119+
super.onClose(status, trailers);
120+
thatCounter.countDown();
121+
}
122+
123+
}, headers);
124+
}
125+
126+
};
127+
}
128+
}
129+
130+
}, ORDER_LAST);
131+
}
132+
133+
/**
134+
* Returns a {@link CountDownLatch} that will be used in the next server calls and can be used to await the
135+
* {@link ServerCall#close(Status, Metadata) ServerCall close}. This method must be called before the call is
136+
* started.
137+
*
138+
* @param count The number of call closes to await.
139+
* @return The counter used to await the server call close.
140+
*/
141+
public static CountDownLatch awaitNextServerCallCloses(final int count) {
142+
final CountDownLatch newCounter = new CountDownLatch(count);
143+
serverCounter = newCounter;
144+
return newCounter;
145+
}
146+
147+
/**
148+
* Returns a {@link CountDownLatch} that will be used in the next client calls and can be used to await the
149+
* {@link io.grpc.ClientCall.Listener#onClose(Status, Metadata) ClientCall close}. This method must be called before
150+
* the call is started.
151+
*
152+
* @param count The number of call closes to await.
153+
* @return The counter used to await the client call close.
154+
*/
155+
public static CountDownLatch awaitNextClientCallCloses(final int count) {
156+
final CountDownLatch newCounter = new CountDownLatch(count);
157+
clientCounter = newCounter;
158+
return newCounter;
159+
}
160+
161+
/**
162+
* Returns a {@link CountDownLatch} that will be used in the next server and client calls and can be used to await
163+
* the respective closes. This method must be called before the call is started.
164+
*
165+
* @param count The number of call closes to await.
166+
* @return The counter used to await the client call close.
167+
* @see #awaitNextClientCallCloses(int)
168+
* @see #awaitNextServerCallCloses(int)
169+
*/
170+
public static CountDownLatch awaitNextServerAndClientCallCloses(final int count) {
171+
final CountDownLatch newCounter = new CountDownLatch(2 * count);
172+
serverCounter = newCounter;
173+
clientCounter = newCounter;
174+
return newCounter;
175+
}
176+
177+
}

tests/src/test/java/net/devh/boot/grpc/test/metric/MetricCollectingInterceptorTest.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static net.devh.boot.grpc.common.metric.MetricConstants.METRIC_NAME_SERVER_RESPONSES_SENT;
2828
import static net.devh.boot.grpc.common.metric.MetricConstants.TAG_METHOD_NAME;
2929
import static net.devh.boot.grpc.common.metric.MetricConstants.TAG_STATUS_CODE;
30+
import static net.devh.boot.grpc.test.config.AwaitableServerClientCallConfiguration.awaitNextServerAndClientCallCloses;
3031
import static org.assertj.core.api.Assertions.assertThat;
3132
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3233
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -63,6 +64,7 @@
6364
import net.devh.boot.grpc.client.inject.GrpcClient;
6465
import net.devh.boot.grpc.common.metric.MetricConstants;
6566
import net.devh.boot.grpc.server.autoconfigure.GrpcServerMetricAutoConfiguration;
67+
import net.devh.boot.grpc.test.config.AwaitableServerClientCallConfiguration;
6668
import net.devh.boot.grpc.test.config.BaseAutoConfiguration;
6769
import net.devh.boot.grpc.test.config.MetricConfiguration;
6870
import net.devh.boot.grpc.test.config.ServiceConfiguration;
@@ -78,10 +80,18 @@
7880
@Slf4j
7981
@SpringBootTest(properties = {
8082
"grpc.client.GLOBAL.address=localhost:9090",
81-
"grpc.client.GLOBAL.negotiationType=PLAINTEXT"
83+
"grpc.client.GLOBAL.negotiationType=PLAINTEXT",
84+
})
85+
@SpringJUnitConfig(classes = {
86+
MetricConfiguration.class,
87+
ServiceConfiguration.class,
88+
BaseAutoConfiguration.class,
89+
AwaitableServerClientCallConfiguration.class,
90+
})
91+
@ImportAutoConfiguration({
92+
GrpcClientMetricAutoConfiguration.class,
93+
GrpcServerMetricAutoConfiguration.class,
8294
})
83-
@SpringJUnitConfig(classes = {MetricConfiguration.class, ServiceConfiguration.class, BaseAutoConfiguration.class})
84-
@ImportAutoConfiguration({GrpcClientMetricAutoConfiguration.class, GrpcServerMetricAutoConfiguration.class})
8595
@DirtiesContext
8696
class MetricCollectingInterceptorTest {
8797

@@ -103,9 +113,13 @@ class MetricCollectingInterceptorTest {
103113
@DirtiesContext
104114
void testMetricsSuccessfulCall() {
105115
log.info("--- Starting tests with successful call ---");
116+
CountDownLatch counter = awaitNextServerAndClientCallCloses(1);
117+
106118
// Invoke 1
107119
assertEquals("1.2.3", this.testService.normal(EMPTY).getVersion());
108120

121+
assertTimeoutPreemptively(Duration.ofSeconds(1), (Executable) counter::await);
122+
109123
// Test-Client 1
110124
final Counter requestSentCounter =
111125
this.meterRegistry.find(METRIC_NAME_CLIENT_REQUESTS_SENT).counter();
@@ -154,9 +168,13 @@ void testMetricsSuccessfulCall() {
154168

155169
// --------------------------------------------------------------------
156170

171+
counter = awaitNextServerAndClientCallCloses(1);
172+
157173
// Invoke 2
158174
assertEquals("1.2.3", this.testService.normal(EMPTY).getVersion());
159175

176+
assertTimeoutPreemptively(Duration.ofSeconds(1), (Executable) counter::await);
177+
160178
// Test-Client 2
161179
assertEquals(2, requestSentCounter.count());
162180
assertEquals(2, responseReceivedCounter.count());
@@ -182,7 +200,7 @@ void testMetricsSuccessfulCall() {
182200
void testMetricsEarlyCancelledCall() {
183201
log.info("--- Starting tests with early cancelled call ---");
184202
final AtomicReference<Throwable> exception = new AtomicReference<>();
185-
final CountDownLatch counter = new CountDownLatch(1);
203+
final CountDownLatch counter = awaitNextServerAndClientCallCloses(1);
186204

187205
// Invoke
188206
final ClientCallStreamObserver<SomeType> observer =
@@ -201,7 +219,6 @@ public void onNext(final SomeType value) {
201219
@Override
202220
public void onError(final Throwable t) {
203221
setError(t);
204-
counter.countDown();
205222
}
206223

207224
@Override
@@ -210,7 +227,6 @@ public void onCompleted() {
210227
fail("Should never be here");
211228
} catch (final RuntimeException t) {
212229
setError(t);
213-
counter.countDown();
214230
throw t;
215231
}
216232
}
@@ -296,21 +312,18 @@ private synchronized void setError(final Throwable t) {
296312
void testMetricsCancelledCall() {
297313
log.info("--- Starting tests with cancelled call ---");
298314
final AtomicReference<Throwable> exception = new AtomicReference<>();
299-
final CountDownLatch counter = new CountDownLatch(2);
315+
final CountDownLatch counter = awaitNextServerAndClientCallCloses(1);
300316

301317
// Invoke
302318
final ClientCallStreamObserver<SomeType> observer =
303319
(ClientCallStreamObserver<SomeType>) this.testStreamService.echo(new StreamObserver<SomeType>() {
304320

305321
@Override
306-
public void onNext(final SomeType value) {
307-
counter.countDown();
308-
}
322+
public void onNext(final SomeType value) {}
309323

310324
@Override
311325
public void onError(final Throwable t) {
312326
setError(t);
313-
counter.countDown();
314327
}
315328

316329
@Override
@@ -319,7 +332,6 @@ public void onCompleted() {
319332
fail("Should never be here");
320333
} catch (final RuntimeException t) {
321334
setError(t);
322-
counter.countDown();
323335
throw t;
324336
}
325337
}
@@ -405,10 +417,15 @@ private synchronized void setError(final Throwable t) {
405417
@DirtiesContext
406418
void testMetricsFailingCall() {
407419
log.info("--- Starting tests with failing call ---");
420+
421+
final CountDownLatch counter = awaitNextServerAndClientCallCloses(1);
422+
408423
// Invoke
409424
assertThrows(StatusRuntimeException.class,
410425
() -> this.testService.unimplemented(EMPTY));
411426

427+
assertTimeoutPreemptively(Duration.ofSeconds(1), (Executable) counter::await);
428+
412429
// Test-Client
413430
final Counter requestSentCounter = this.meterRegistry
414431
.find(METRIC_NAME_CLIENT_REQUESTS_SENT)

0 commit comments

Comments
 (0)