Skip to content

Commit 28d304d

Browse files
authored
Merge pull request #447 from yidongnan/metrics/cancellations
Use ServerCall.Listener#onComplete for metric collection instead of ServerCall#close
2 parents aa286ab + 5e23eed commit 28d304d

File tree

13 files changed

+370
-65
lines changed

13 files changed

+370
-65
lines changed

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/metric/MetricCollectingClientCall.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
package net.devh.boot.grpc.client.metric;
1919

20-
import java.util.function.Function;
20+
import java.util.function.Consumer;
2121

2222
import io.grpc.ClientCall;
2323
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
2424
import io.grpc.Metadata;
25-
import io.grpc.Status.Code;
25+
import io.grpc.Status;
2626
import io.micrometer.core.instrument.Counter;
27-
import io.micrometer.core.instrument.MeterRegistry;
28-
import io.micrometer.core.instrument.Timer;
2927

3028
/**
3129
* A simple forwarding client call that collects metrics for micrometer.
@@ -36,35 +34,37 @@
3634
*/
3735
class MetricCollectingClientCall<Q, A> extends SimpleForwardingClientCall<Q, A> {
3836

39-
private final MeterRegistry registry;
4037
private final Counter requestCounter;
4138
private final Counter responseCounter;
42-
private final Function<Code, Timer> timerFunction;
39+
private final Consumer<Status.Code> processingDurationTiming;
4340

4441
/**
4542
* Creates a new delegating ClientCall that will wrap the given client call to collect metrics.
4643
*
4744
* @param delegate The original call to wrap.
48-
* @param registry The registry to save the metrics to.
4945
* @param requestCounter The counter for outgoing requests.
5046
* @param responseCounter The counter for incoming responses.
51-
* @param timerFunction A function that will return a timer for a given status code.
47+
* @param processingDurationTiming The consumer used to time the processing duration along with a response status.
5248
*/
53-
public MetricCollectingClientCall(final ClientCall<Q, A> delegate, final MeterRegistry registry,
54-
final Counter requestCounter, final Counter responseCounter,
55-
final Function<Code, Timer> timerFunction) {
49+
public MetricCollectingClientCall(
50+
final ClientCall<Q, A> delegate,
51+
final Counter requestCounter,
52+
final Counter responseCounter,
53+
final Consumer<Status.Code> processingDurationTiming) {
54+
5655
super(delegate);
57-
this.registry = registry;
5856
this.requestCounter = requestCounter;
5957
this.responseCounter = responseCounter;
60-
this.timerFunction = timerFunction;
58+
this.processingDurationTiming = processingDurationTiming;
6159
}
6260

6361
@Override
6462
public void start(final ClientCall.Listener<A> responseListener, final Metadata metadata) {
6563
super.start(
66-
new MetricCollectingClientCallListener<>(responseListener, this.registry, this.responseCounter,
67-
this.timerFunction),
64+
new MetricCollectingClientCallListener<>(
65+
responseListener,
66+
this.responseCounter,
67+
this.processingDurationTiming),
6868
metadata);
6969
}
7070

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/metric/MetricCollectingClientCallListener.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717

1818
package net.devh.boot.grpc.client.metric;
1919

20-
import java.util.function.Function;
20+
import java.util.function.Consumer;
2121

2222
import io.grpc.ClientCall;
2323
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
2424
import io.grpc.Metadata;
2525
import io.grpc.Status;
26-
import io.grpc.Status.Code;
2726
import io.micrometer.core.instrument.Counter;
28-
import io.micrometer.core.instrument.MeterRegistry;
29-
import io.micrometer.core.instrument.Timer;
3027

3128
/**
3229
* A simple forwarding client call listener that collects metrics for micrometer.
@@ -36,32 +33,29 @@
3633
*/
3734
class MetricCollectingClientCallListener<A> extends SimpleForwardingClientCallListener<A> {
3835

39-
private final Timer.Sample timerSample;
4036
private final Counter responseCounter;
41-
private final Function<Code, Timer> timerFunction;
37+
private final Consumer<Status.Code> processingDurationTiming;
4238

4339
/**
4440
* Creates a new delegating ClientCallListener that will wrap the given client call listener to collect metrics.
4541
*
4642
* @param delegate The original call to wrap.
47-
* @param registry The registry to save the metrics to.
4843
* @param responseCounter The counter for incoming responses.
49-
* @param timerFunction A function that will return a timer for a given status code.
44+
* @param processingDurationTiming The consumer used to time the processing duration along with a response status.
5045
*/
5146
public MetricCollectingClientCallListener(
5247
final ClientCall.Listener<A> delegate,
53-
final MeterRegistry registry,
5448
final Counter responseCounter,
55-
final Function<Code, Timer> timerFunction) {
49+
final Consumer<Status.Code> processingDurationTiming) {
50+
5651
super(delegate);
5752
this.responseCounter = responseCounter;
58-
this.timerFunction = timerFunction;
59-
this.timerSample = Timer.start(registry);
53+
this.processingDurationTiming = processingDurationTiming;
6054
}
6155

6256
@Override
6357
public void onClose(final Status status, final Metadata metadata) {
64-
this.timerSample.stop(this.timerFunction.apply(status.getCode()));
58+
this.processingDurationTiming.accept(status.getCode());
6559
super.onClose(status, metadata);
6660
}
6761

grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/metric/MetricCollectingClientInterceptor.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static net.devh.boot.grpc.common.metric.MetricUtils.prepareCounterFor;
2424
import static net.devh.boot.grpc.common.metric.MetricUtils.prepareTimerFor;
2525

26+
import java.util.function.Consumer;
2627
import java.util.function.Function;
2728
import java.util.function.UnaryOperator;
2829

@@ -104,15 +105,19 @@ protected Function<Code, Timer> newTimerFunction(final MethodDescriptor<?, ?> me
104105
}
105106

106107
@Override
107-
public <Q, A> ClientCall<Q, A> interceptCall(final MethodDescriptor<Q, A> methodDescriptor,
108-
final CallOptions callOptions, final Channel channel) {
108+
public <Q, A> ClientCall<Q, A> interceptCall(
109+
final MethodDescriptor<Q, A> methodDescriptor,
110+
final CallOptions callOptions,
111+
final Channel channel) {
112+
109113
final MetricSet metrics = metricsFor(methodDescriptor);
114+
final Consumer<Code> processingDurationTiming = metrics.newProcessingDurationTiming(this.registry);
115+
110116
return new MetricCollectingClientCall<>(
111117
channel.newCall(methodDescriptor, callOptions),
112-
this.registry,
113118
metrics.getRequestCounter(),
114119
metrics.getResponseCounter(),
115-
metrics.getTimerFunction());
120+
processingDurationTiming);
116121
}
117122

118123
}

grpc-common-spring-boot/src/main/java/net/devh/boot/grpc/common/metric/AbstractMetricCollectingInterceptor.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,19 @@
2222
import java.util.EnumMap;
2323
import java.util.Map;
2424
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.function.Consumer;
2526
import java.util.function.Function;
2627
import java.util.function.Supplier;
2728
import java.util.function.UnaryOperator;
2829

2930
import io.grpc.MethodDescriptor;
3031
import io.grpc.ServiceDescriptor;
32+
import io.grpc.Status;
3133
import io.grpc.Status.Code;
3234
import io.micrometer.core.instrument.Counter;
3335
import io.micrometer.core.instrument.MeterRegistry;
3436
import io.micrometer.core.instrument.Timer;
37+
import io.micrometer.core.instrument.Timer.Sample;
3538
import lombok.Getter;
3639
import lombok.extern.slf4j.Slf4j;
3740

@@ -185,13 +188,29 @@ protected static class MetricSet {
185188
* @param responseCounter The response counter to use.
186189
* @param timerFunction The timer function to use.
187190
*/
188-
public MetricSet(final Counter requestCounter, final Counter responseCounter,
191+
public MetricSet(
192+
final Counter requestCounter,
193+
final Counter responseCounter,
189194
final Function<Code, Timer> timerFunction) {
195+
190196
this.requestCounter = requestCounter;
191197
this.responseCounter = responseCounter;
192198
this.timerFunction = timerFunction;
193199
}
194200

201+
/**
202+
* Uses the given registry to create a {@link Sample Timer.Sample} that will be reported if the returned
203+
* consumer is invoked.
204+
*
205+
* @param registry The registry used to create the sample.
206+
* @return The newly created consumer that will report the processing duration since calling this method and
207+
* invoking the returned consumer along with the status code.
208+
*/
209+
public Consumer<Status.Code> newProcessingDurationTiming(final MeterRegistry registry) {
210+
final Timer.Sample timerSample = Timer.start(registry);
211+
return code -> timerSample.stop(this.timerFunction.apply(code));
212+
}
213+
195214
}
196215

197216
}

grpc-server-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/server/metric/MetricCollectingServerCall.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717

1818
package net.devh.boot.grpc.server.metric;
1919

20-
import java.util.function.Function;
21-
2220
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
2321
import io.grpc.Metadata;
2422
import io.grpc.ServerCall;
2523
import io.grpc.Status;
2624
import io.grpc.Status.Code;
2725
import io.micrometer.core.instrument.Counter;
28-
import io.micrometer.core.instrument.MeterRegistry;
29-
import io.micrometer.core.instrument.Timer;
3026

3127
/**
3228
* A simple forwarding server call that collects metrics for micrometer.
@@ -38,29 +34,29 @@
3834
class MetricCollectingServerCall<Q, A> extends SimpleForwardingServerCall<Q, A> {
3935

4036
private final Counter responseCounter;
41-
private final Function<Code, Timer> timerFunction;
42-
private final Timer.Sample timerSample;
37+
private Code responseCode = Code.UNKNOWN;
4338

4439
/**
4540
* Creates a new delegating ServerCall that will wrap the given server call to collect metrics.
4641
*
4742
* @param delegate The original call to wrap.
48-
* @param registry The registry to save the metrics to.
4943
* @param responseCounter The counter for incoming responses.
50-
* @param timerFunction A function that will return a timer for a given status code.
5144
*/
52-
public MetricCollectingServerCall(final ServerCall<Q, A> delegate, final MeterRegistry registry,
53-
final Counter responseCounter,
54-
final Function<Code, Timer> timerFunction) {
45+
public MetricCollectingServerCall(
46+
final ServerCall<Q, A> delegate,
47+
final Counter responseCounter) {
48+
5549
super(delegate);
5650
this.responseCounter = responseCounter;
57-
this.timerFunction = timerFunction;
58-
this.timerSample = Timer.start(registry);
51+
}
52+
53+
public Code getResponseCode() {
54+
return this.responseCode;
5955
}
6056

6157
@Override
6258
public void close(final Status status, final Metadata responseHeaders) {
63-
this.timerSample.stop(this.timerFunction.apply(status.getCode()));
59+
this.responseCode = status.getCode();
6460
super.close(status, responseHeaders);
6561
}
6662

grpc-server-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/server/metric/MetricCollectingServerCallListener.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
package net.devh.boot.grpc.server.metric;
1919

20+
import java.util.function.Consumer;
21+
import java.util.function.Supplier;
22+
2023
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
21-
import io.grpc.ServerCall;
24+
import io.grpc.ServerCall.Listener;
25+
import io.grpc.Status;
2226
import io.micrometer.core.instrument.Counter;
2327

2428
/**
@@ -30,16 +34,28 @@
3034
class MetricCollectingServerCallListener<Q> extends SimpleForwardingServerCallListener<Q> {
3135

3236
private final Counter requestCounter;
37+
private final Supplier<Status.Code> responseCodeSupplier;
38+
private final Consumer<Status.Code> responseStatusTiming;
3339

3440
/**
3541
* Creates a new delegating ServerCallListener that will wrap the given server call listener to collect metrics.
3642
*
3743
* @param delegate The original listener to wrap.
3844
* @param requestCounter The counter for incoming requests.
45+
* @param responseCodeSupplier The supplier of the response code.
46+
* @param responseStatusTiming The consumer used to time the processing duration along with a response status.
3947
*/
40-
public MetricCollectingServerCallListener(final ServerCall.Listener<Q> delegate, final Counter requestCounter) {
48+
49+
public MetricCollectingServerCallListener(
50+
final Listener<Q> delegate,
51+
final Counter requestCounter,
52+
final Supplier<Status.Code> responseCodeSupplier,
53+
final Consumer<Status.Code> responseStatusTiming) {
54+
4155
super(delegate);
4256
this.requestCounter = requestCounter;
57+
this.responseCodeSupplier = responseCodeSupplier;
58+
this.responseStatusTiming = responseStatusTiming;
4359
}
4460

4561
@Override
@@ -48,4 +64,20 @@ public void onMessage(final Q requestMessage) {
4864
super.onMessage(requestMessage);
4965
}
5066

67+
@Override
68+
public void onComplete() {
69+
report(this.responseCodeSupplier.get());
70+
super.onComplete();
71+
}
72+
73+
@Override
74+
public void onCancel() {
75+
report(Status.Code.CANCELLED);
76+
super.onCancel();
77+
}
78+
79+
private void report(final Status.Code code) {
80+
this.responseStatusTiming.accept(code);
81+
}
82+
5183
}

grpc-server-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/server/metric/MetricCollectingServerInterceptor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static net.devh.boot.grpc.common.metric.MetricUtils.prepareCounterFor;
2424
import static net.devh.boot.grpc.common.metric.MetricUtils.prepareTimerFor;
2525

26+
import java.util.function.Consumer;
2627
import java.util.function.Function;
2728
import java.util.function.UnaryOperator;
2829

@@ -36,6 +37,7 @@
3637
import io.grpc.ServerInterceptor;
3738
import io.grpc.ServerServiceDefinition;
3839
import io.grpc.ServiceDescriptor;
40+
import io.grpc.Status;
3941
import io.grpc.Status.Code;
4042
import io.micrometer.core.instrument.Counter;
4143
import io.micrometer.core.instrument.MeterRegistry;
@@ -131,11 +133,18 @@ public <Q, A> ServerCall.Listener<Q> interceptCall(
131133
final ServerCall<Q, A> call,
132134
final Metadata requestHeaders,
133135
final ServerCallHandler<Q, A> next) {
136+
134137
final MetricSet metrics = metricsFor(call.getMethodDescriptor());
135-
final ServerCall<Q, A> monitoringCall = new MetricCollectingServerCall<>(call, this.registry,
136-
metrics.getResponseCounter(), metrics.getTimerFunction());
138+
final Consumer<Status.Code> responseStatusTiming = metrics.newProcessingDurationTiming(this.registry);
139+
140+
final MetricCollectingServerCall<Q, A> monitoringCall =
141+
new MetricCollectingServerCall<>(call, metrics.getResponseCounter());
142+
137143
return new MetricCollectingServerCallListener<>(
138-
next.startCall(monitoringCall, requestHeaders), metrics.getRequestCounter());
144+
next.startCall(monitoringCall, requestHeaders),
145+
metrics.getRequestCounter(),
146+
monitoringCall::getResponseCode,
147+
responseStatusTiming);
139148
}
140149

141150
}

tests/src/test/java/net/devh/boot/grpc/test/metric/MetricCollectingClientInterceptorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
class MetricCollectingClientInterceptorTest {
4040

4141
@Test
42-
public void testClientPreRegistration() {
42+
void testClientPreRegistration() {
4343
log.info("--- Starting tests with client pre-registration ---");
4444
final MeterRegistry meterRegistry = new SimpleMeterRegistry();
4545
assertEquals(0, meterRegistry.getMeters().size());
@@ -52,7 +52,7 @@ public void testClientPreRegistration() {
5252
}
5353

5454
@Test
55-
public void testClientCustomization() {
55+
void testClientCustomization() {
5656
log.info("--- Starting tests with client customization ---");
5757
final MeterRegistry meterRegistry = new SimpleMeterRegistry();
5858
assertEquals(0, meterRegistry.getMeters().size());

0 commit comments

Comments
 (0)