@@ -7,6 +7,7 @@ use std::{
7
7
} ;
8
8
9
9
use anyhow:: { anyhow, Context , Error } ;
10
+ use arc_swap:: ArcSwap ;
10
11
use async_trait:: async_trait;
11
12
use axum:: {
12
13
body:: Body ,
@@ -29,7 +30,9 @@ use opentelemetry::{
29
30
KeyValue ,
30
31
} ;
31
32
use opentelemetry_prometheus:: { ExporterBuilder , PrometheusExporter } ;
33
+ use prometheus:: proto:: MetricFamily ;
32
34
use prometheus:: { Encoder as PrometheusEncoder , TextEncoder } ;
35
+
33
36
use regex:: Regex ;
34
37
use tokio:: { sync:: Semaphore , task} ;
35
38
use tracing:: info;
@@ -59,6 +62,7 @@ use crate::{
59
62
} ;
60
63
61
64
const SERVICE_NAME : & str = "control-plane" ;
65
+ const CHECKER_METRIC_PREFIX : & str = "control_plane_check_" ;
62
66
63
67
const SECOND : Duration = Duration :: from_secs ( 1 ) ;
64
68
const MINUTE : Duration = Duration :: from_secs ( 60 ) ;
@@ -151,18 +155,20 @@ async fn main() -> Result<(), Error> {
151
155
)
152
156
. expect ( "failed to set global subscriber" ) ;
153
157
154
- let exporter = ExporterBuilder :: new (
155
- controllers:: basic (
156
- processors:: factory (
157
- selectors:: simple:: histogram ( [ ] ) ,
158
- aggregation:: cumulative_temporality_selector ( ) ,
158
+ let exporter = Arc :: new (
159
+ ExporterBuilder :: new (
160
+ controllers:: basic (
161
+ processors:: factory (
162
+ selectors:: simple:: histogram ( [ ] ) ,
163
+ aggregation:: cumulative_temporality_selector ( ) ,
164
+ )
165
+ . with_memory ( true ) ,
159
166
)
160
- . with_memory ( true ) ,
167
+ . with_resource ( Resource :: new ( vec ! [ KeyValue :: new( "service" , SERVICE_NAME ) ] ) )
168
+ . build ( ) ,
161
169
)
162
- . with_resource ( Resource :: new ( vec ! [ KeyValue :: new( "service" , SERVICE_NAME ) ] ) )
163
- . build ( ) ,
164
- )
165
- . init ( ) ;
170
+ . init ( ) ,
171
+ ) ;
166
172
167
173
// Metrics
168
174
let meter = global:: meter ( SERVICE_NAME ) ;
@@ -211,6 +217,9 @@ async fn main() -> Result<(), Error> {
211
217
let snapshot_runner = WithThrottle ( snapshot_runner, ThrottleParams :: new ( 1 * MINUTE ) ) ;
212
218
let mut snapshot_runner = snapshot_runner;
213
219
220
+ let checker_metrics: Vec < MetricFamily > = Vec :: new ( ) ;
221
+ let checker_metrics = Arc :: new ( ArcSwap :: from_pointee ( checker_metrics) ) ;
222
+
214
223
let checker = Checker :: new ( http_client) ;
215
224
let checker = CheckWithMetrics (
216
225
checker,
@@ -299,8 +308,9 @@ async fn main() -> Result<(), Error> {
299
308
let metrics_router = Router :: new ( )
300
309
. route ( "/metrics" , get ( metrics_handler) )
301
310
. with_state ( MetricsHandlerArgs {
302
- exporter,
311
+ exporter : Arc :: clone ( & exporter ) ,
303
312
active_replicas,
313
+ checker_metrics : Arc :: clone ( & checker_metrics) ,
304
314
} ) ;
305
315
306
316
info ! (
@@ -319,8 +329,12 @@ async fn main() -> Result<(), Error> {
319
329
}
320
330
} ) ,
321
331
task:: spawn( async move {
332
+ let exporter = Arc :: clone( & exporter) ;
333
+ let checker_metrics = Arc :: clone( & checker_metrics) ;
334
+
322
335
loop {
323
336
let _ = check_persist_runner. run( ) . await ;
337
+ update_checker_metrics( & exporter, & checker_metrics) ;
324
338
}
325
339
} ) ,
326
340
task:: spawn(
@@ -336,18 +350,49 @@ async fn main() -> Result<(), Error> {
336
350
337
351
#[ derive( Clone ) ]
338
352
struct MetricsHandlerArgs < A > {
339
- exporter : PrometheusExporter ,
353
+ exporter : Arc < PrometheusExporter > ,
340
354
active_replicas : A ,
355
+ checker_metrics : Arc < ArcSwap < Vec < MetricFamily > > > ,
356
+ }
357
+
358
+ // Gathers metrics relevant to node checking and stores them in the ArcSwap
359
+ fn update_checker_metrics (
360
+ exporter : & Arc < PrometheusExporter > ,
361
+ checker_metrics : & Arc < ArcSwap < Vec < MetricFamily > > > ,
362
+ ) {
363
+ // Gather node checker metrics
364
+ let metric_families = Arc :: new (
365
+ exporter
366
+ . registry ( )
367
+ . gather ( )
368
+ . into_iter ( )
369
+ . filter ( |x| x. get_name ( ) . starts_with ( CHECKER_METRIC_PREFIX ) )
370
+ . collect :: < Vec < _ > > ( ) ,
371
+ ) ;
372
+
373
+ checker_metrics. store ( metric_families) ;
341
374
}
342
375
343
376
async fn metrics_handler < A : ActiveChecker > (
344
377
State ( MetricsHandlerArgs {
345
378
exporter,
346
379
active_replicas,
380
+ checker_metrics,
347
381
} ) : State < MetricsHandlerArgs < A > > ,
348
382
_: Request < Body > ,
349
383
) -> Response < Body > {
350
- let metric_families = exporter. registry ( ) . gather ( ) ;
384
+ // Read out all metrics that are not related to node checking
385
+ let mut metric_families = exporter
386
+ . registry ( )
387
+ . gather ( )
388
+ . into_iter ( )
389
+ . filter ( |x| !x. get_name ( ) . starts_with ( CHECKER_METRIC_PREFIX ) )
390
+ . collect :: < Vec < _ > > ( ) ;
391
+
392
+ // Concatenate node checking metrics with all others & sort the result to be consistent with gather() output
393
+ let mut _checker_metrics = { Vec :: clone ( & checker_metrics. load ( ) ) } ;
394
+ metric_families. append ( & mut _checker_metrics) ;
395
+ metric_families. sort_by ( |a, b| a. get_name ( ) . cmp ( b. get_name ( ) ) ) ;
351
396
352
397
let encoder = TextEncoder :: new ( ) ;
353
398
0 commit comments