11package org .lognet .springboot .grpc .autoconfigure .metrics ;
22
33import io .grpc .ForwardingServerCall ;
4+ import io .grpc .ForwardingServerCallListener ;
45import io .grpc .Grpc ;
56import io .grpc .Metadata ;
67import io .grpc .ServerCall ;
78import io .grpc .ServerCallHandler ;
89import io .grpc .ServerInterceptor ;
910import io .grpc .Status ;
1011import io .micrometer .core .instrument .MeterRegistry ;
12+ import io .micrometer .core .instrument .Tag ;
1113import io .micrometer .core .instrument .Timer ;
1214import lombok .Setter ;
1315import lombok .experimental .Accessors ;
16+ import lombok .extern .slf4j .Slf4j ;
1417import org .lognet .springboot .grpc .GRpcGlobalInterceptor ;
1518import org .lognet .springboot .grpc .GRpcService ;
1619import org .lognet .springboot .grpc .autoconfigure .GRpcServerProperties ;
20+ import org .springframework .beans .factory .annotation .Autowired ;
1721import org .springframework .boot .actuate .autoconfigure .metrics .CompositeMeterRegistryAutoConfiguration ;
1822import org .springframework .boot .actuate .autoconfigure .metrics .MetricsAutoConfiguration ;
1923import org .springframework .boot .autoconfigure .AutoConfigureAfter ;
2731import org .springframework .core .Ordered ;
2832
2933import java .net .SocketAddress ;
34+ import java .util .ArrayList ;
35+ import java .util .Collection ;
36+ import java .util .List ;
3037import java .util .Optional ;
38+ import java .util .stream .Collectors ;
39+ import java .util .stream .Stream ;
40+ import java .util .stream .StreamSupport ;
3141
3242@ Configuration
3343@ AutoConfigureAfter ({MetricsAutoConfiguration .class , CompositeMeterRegistryAutoConfiguration .class })
@@ -52,73 +62,136 @@ static class GrpcServiceCondition {
5262
5363 }
5464
55- static class MonitoringServerCall <ReqT , RespT > extends ForwardingServerCall .SimpleForwardingServerCall <ReqT , RespT > {
65+ static class MonitoringServerCall <ReqT , RespT > extends ForwardingServerCall .SimpleForwardingServerCall <ReqT , RespT > {
66+
5667
57- private final boolean addAddressTag ;
5868 private MeterRegistry registry ;
69+
5970 final Timer .Sample start ;
60- protected MonitoringServerCall (ServerCall <ReqT , RespT > delegate , MeterRegistry registry , boolean addAddressTag ) {
71+
72+ private Collection <GRpcMetricsTagsContributor > tagsContributors ;
73+ private List <Tag > additionalTags ;
74+
75+
76+
77+ protected MonitoringServerCall (ServerCall <ReqT , RespT > delegate , MeterRegistry registry , Collection <GRpcMetricsTagsContributor > tagsContributors ) {
6178 super (delegate );
6279 this .start = Timer .start (registry );
6380 this .registry = registry ;
64- this .addAddressTag = addAddressTag ;
65-
66-
81+ this .tagsContributors = tagsContributors ;
6782 }
83+
6884 @ Override
6985 public void close (Status status , Metadata trailers ) {
70- final Timer .Builder timerBuilder = Timer .builder ("grpc.server.calls" )
71- .tag ("method" , getMethodDescriptor ().getFullMethodName ())
72- .tag ("result" , status .getCode ().name ());
73- if (addAddressTag ){
74- Optional .ofNullable (delegate ().getAttributes ())
75- .map (a ->a .get (Grpc .TRANSPORT_ATTR_LOCAL_ADDR ))
76- .map (SocketAddress ::toString )
77- .ifPresent (a ->timerBuilder .tag ("address" ,a ));
78- }
86+
87+ final Timer .Builder timerBuilder = Timer .builder ("grpc.server.calls" );
88+ tagsContributors .forEach (c ->
89+ timerBuilder .tags (c .getTags (status ,getMethodDescriptor (),getAttributes ()))
90+ );
91+ Optional .ofNullable (additionalTags )
92+ .ifPresent (timerBuilder ::tags );
93+
7994 start .stop (timerBuilder .register (registry ));
8095
8196 super .close (status , trailers );
8297 }
98+
99+ public void addTags (List <Tag > tags ) {
100+ additionalTags = tags ;
101+ }
83102 }
84103
104+ @ Slf4j
85105 static class MonitoringServerInterceptor implements ServerInterceptor , Ordered {
86106
87-
88107 private MeterRegistry registry ;
89- private boolean addAddressTag ;
108+
109+
110+
111+ private Collection <GRpcMetricsTagsContributor > tagsContributors ;
112+
113+ @ Autowired
114+ public void setTagsContributors (Collection <GRpcMetricsTagsContributor > tagsContributors ) {
115+ this .tagsContributors = tagsContributors ;
116+ }
117+
90118 @ Setter
91119 @ Accessors (fluent = true )
92120 private Integer order ;
93121
94-
95- public MonitoringServerInterceptor (MeterRegistry registry ,boolean addAddressTag ) {
122+ public MonitoringServerInterceptor (MeterRegistry registry ) {
96123 this .registry = registry ;
97- this .addAddressTag = addAddressTag ;
98124 }
99125
100126 @ Override
101127 public <ReqT , RespT > ServerCall .Listener <ReqT > interceptCall (ServerCall <ReqT , RespT > call , Metadata headers , ServerCallHandler <ReqT , RespT > next ) {
102- return next .startCall (new MonitoringServerCall <>(call ,registry ,addAddressTag ), headers );
103128
129+ final MonitoringServerCall <ReqT , RespT > monitoringServerCall = new MonitoringServerCall <>(call , registry ,tagsContributors );
130+ final ServerCall .Listener <ReqT > measuredCall = next .startCall (monitoringServerCall , headers );
131+ if (call .getMethodDescriptor ().getType ().clientSendsOneMessage ()) {
132+ return new ForwardingServerCallListener .SimpleForwardingServerCallListener <ReqT >(measuredCall ) {
133+ @ Override
134+ public void onMessage (ReqT message ) {
135+
136+ final Stream <Tag > fd = tagsContributors
137+ .stream ()
138+ .filter (RequestAwareGRpcMetricsTagsContributor .class ::isInstance )
139+ .map (RequestAwareGRpcMetricsTagsContributor .class ::cast )
140+ .filter (c -> c .accepts (message ))
141+ .flatMap (c -> {
142+ try {
143+ return StreamSupport .stream (c .getTags (message ,monitoringServerCall .getMethodDescriptor (),monitoringServerCall .getAttributes ()).spliterator (), false );
144+ }catch (Throwable t ){
145+ log .error ("Failed to execute tag contributor" ,t );
146+ return Stream .empty ();
147+ }
148+ });
149+
150+ monitoringServerCall .addTags (fd .collect (Collectors .toList ()));
151+
152+ super .onMessage (message );
153+
154+ }
155+ };
156+ } else {
157+ return measuredCall ;
158+ }
104159 }
105160
106-
107-
108161 @ Override
109162 public int getOrder () {
110- return Optional .ofNullable (order ).orElse (HIGHEST_PRECEDENCE + 20 );
163+ return Optional .ofNullable (order ).orElse (HIGHEST_PRECEDENCE + 20 );
111164 }
112165 }
113166
114167 @ Bean
115168 @ GRpcGlobalInterceptor
116- public ServerInterceptor measure (MeterRegistry registry , GRpcServerProperties properties ,GRpcMetricsProperties metricsProperties ){
169+ public ServerInterceptor measure (MeterRegistry registry , GRpcMetricsProperties metricsProperties ) {
170+
171+ return new MonitoringServerInterceptor (registry )
172+ .order (metricsProperties .getInterceptorOrder ());
173+ }
174+
175+ @ Bean
176+ public GRpcMetricsTagsContributor defaultTagsContributor (GRpcServerProperties properties ) {
117177 final Boolean hasMultipleAddresses = Optional .ofNullable (properties .getNettyServer ())
118178 .map (GRpcServerProperties .NettyServerProperties ::getAdditionalListenAddresses )
119179 .map (l -> !l .isEmpty ())
120180 .orElse (false );
121- return new MonitoringServerInterceptor (registry ,hasMultipleAddresses )
122- .order (metricsProperties .getInterceptorOrder ());
181+
182+ return (status , methodDescriptor , attributes ) -> {
183+
184+ final ArrayList <Tag > tags = new ArrayList <>();
185+ tags .add (Tag .of ("result" , status .getCode ().name ()));
186+ tags .add (Tag .of ("method" , methodDescriptor .getFullMethodName ()));
187+ if (hasMultipleAddresses ) {
188+ Optional .ofNullable (attributes )
189+ .map (a -> a .get (Grpc .TRANSPORT_ATTR_LOCAL_ADDR ))
190+ .map (SocketAddress ::toString )
191+ .map (a -> Tag .of ("address" , a ))
192+ .ifPresent (tags ::add );
193+ }
194+ return tags ;
195+ };
123196 }
124197}
0 commit comments