Skip to content

Commit 9dea3cc

Browse files
committed
refactor(node): include metrics server in Node
1 parent ec1224e commit 9dea3cc

File tree

6 files changed

+140
-41
lines changed

6 files changed

+140
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/metrics/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ version.workspace = true
88
[dependencies]
99
hyper = { workspace = true, features = [ "http1", "http2", "server", "tcp" ] }
1010
thiserror.workspace = true
11+
tokio.workspace = true
1112
tracing.workspace = true
1213

1314
# Metrics

crates/metrics/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ pub enum Error {
2929
#[error("could not bind to address: {addr}")]
3030
FailedToBindAddress { addr: SocketAddr },
3131

32+
#[error("metrics server has already been stopped")]
33+
AlreadyStopped,
34+
35+
#[error("failed to join server task: {0}")]
36+
JoinError(String),
37+
3238
#[error(transparent)]
3339
Server(#[from] hyper::Error),
3440
}

crates/metrics/src/server.rs

Lines changed: 86 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,55 @@ use std::sync::Arc;
55

66
use hyper::service::{make_service_fn, service_fn};
77
use hyper::{Body, Request, Response};
8+
use tokio::sync::oneshot;
9+
use tracing::info;
810

911
use crate::exporters::Exporter;
1012
use crate::{Error, Report};
1113

14+
/// A handle to the metrics server.
15+
#[derive(Debug)]
16+
pub struct MetricsServerHandle {
17+
/// The actual address that the server is bound to.
18+
addr: SocketAddr,
19+
/// The shutdown sender to stop the server.
20+
shutdown_tx: Option<oneshot::Sender<()>>,
21+
/// The task handle to wait for server completion.
22+
task_handle: tokio::task::JoinHandle<Result<(), Error>>,
23+
}
24+
25+
impl MetricsServerHandle {
26+
/// Tell the server to stop without waiting for the server to stop.
27+
pub fn stop(&mut self) -> Result<(), Error> {
28+
if let Some(tx) = self.shutdown_tx.take() {
29+
// Ignore error if receiver already dropped
30+
let _ = tx.send(());
31+
Ok(())
32+
} else {
33+
Err(Error::AlreadyStopped)
34+
}
35+
}
36+
37+
/// Wait until the server has stopped.
38+
pub async fn stopped(self) -> Result<(), Error> {
39+
self.task_handle.await.map_err(|e| Error::JoinError(e.to_string()))?
40+
}
41+
42+
/// Returns the socket address the server is listening on.
43+
pub fn addr(&self) -> &SocketAddr {
44+
&self.addr
45+
}
46+
}
47+
1248
/// A helper trait for defining the type for hooks that are called when the metrics are being
1349
/// collected by the server.
1450
trait Hook: Fn() + Send + Sync {}
1551
impl<T: Fn() + Send + Sync> Hook for T {}
1652

17-
/// A boxed [`Hook`].
18-
type BoxedHook = Box<dyn Hook<Output = ()>>;
19-
/// A list of [BoxedHook].
20-
type Hooks = Vec<BoxedHook>;
53+
/// A shared hook that can be cloned.
54+
type SharedHook = Arc<dyn Hook<Output = ()>>;
55+
/// A list of shared hooks.
56+
type Hooks = Vec<SharedHook>;
2157

2258
/// Server for serving metrics.
2359
// TODO: allow configuring the server executor to allow cancelling on invidiual connection tasks.
@@ -46,8 +82,8 @@ where
4682
where
4783
I: IntoIterator<Item = Box<dyn Report>>,
4884
{
49-
// convert the report types into callable hooks
50-
let hooks = reports.into_iter().map(|r| Box::new(move || r.report()) as BoxedHook);
85+
// convert the report types into callable hooks, wrapping in Arc for sharing
86+
let hooks = reports.into_iter().map(|r| Arc::new(move || r.report()) as SharedHook);
5187
self.hooks.extend(hooks);
5288
self
5389
}
@@ -60,42 +96,67 @@ where
6096
describe_memory_stats();
6197

6298
let hooks: Hooks =
63-
vec![Box::new(collect_memory_stats), Box::new(move || process.collect())];
99+
vec![Arc::new(collect_memory_stats), Arc::new(move || process.collect())];
64100

65101
self.hooks.extend(hooks);
66102
self
67103
}
68104

69105
/// Starts an endpoint at the given address to serve Prometheus metrics.
70-
pub async fn start(self, addr: SocketAddr) -> Result<(), Error> {
71-
let hooks = Arc::new(move || self.hooks.iter().for_each(|hook| hook()));
106+
///
107+
/// Returns a handle that can be used to stop the server and wait for it to finish.
108+
pub async fn start(&self, addr: SocketAddr) -> Result<MetricsServerHandle, Error> {
109+
// Clone the hooks (clones the Arc references, not the closures themselves)
110+
let hooks = self.hooks.clone();
111+
let exporter = self.exporter.clone();
72112

73-
hyper::Server::try_bind(&addr)
113+
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
114+
115+
let server = hyper::Server::try_bind(&addr)
74116
.map_err(|_| Error::FailedToBindAddress { addr })?
75117
.serve(make_service_fn(move |_| {
76-
let hook = Arc::clone(&hooks);
77-
let exporter = self.exporter.clone();
118+
let hooks = hooks.clone();
119+
let exporter = exporter.clone();
78120
async move {
79121
Ok::<_, Infallible>(service_fn(move |_: Request<Body>| {
80-
// call the hooks to collect metrics before exporting them
81-
(hook)();
82-
// export the metrics from the installed exporter and send as response
83-
let metrics = Body::from(exporter.export());
84-
async move { Ok::<_, Infallible>(Response::new(metrics)) }
122+
let hooks = hooks.clone();
123+
let exporter = exporter.clone();
124+
async move {
125+
// need to call the hooks to collect metrics before exporting them
126+
for hook in &hooks {
127+
hook();
128+
}
129+
// export the metrics from the installed exporter and send as response
130+
let metrics = Body::from(exporter.export());
131+
Ok::<_, Infallible>(Response::new(metrics))
132+
}
85133
}))
86134
}
87-
}))
88-
.await?;
135+
}));
136+
137+
let actual_addr = server.local_addr();
89138

90-
Ok(())
139+
// Spawn the server with graceful shutdown
140+
let task_handle = tokio::spawn(async move {
141+
server
142+
.with_graceful_shutdown(async {
143+
shutdown_rx.await.ok();
144+
})
145+
.await
146+
.map_err(Error::Server)
147+
});
148+
149+
info!(target: "metrics", addr = %actual_addr, "Metrics server started.");
150+
151+
Ok(MetricsServerHandle { addr: actual_addr, shutdown_tx: Some(shutdown_tx), task_handle })
91152
}
92153
}
93154

94-
impl<MetricsExporter> fmt::Debug for Server<MetricsExporter>
95-
where
96-
MetricsExporter: fmt::Debug,
97-
{
155+
impl<MetricsExporter> fmt::Debug for Server<MetricsExporter> {
98156
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99-
f.debug_struct("Server").field("hooks", &"...").field("exporter", &self.exporter).finish()
157+
f.debug_struct("Server")
158+
.field("hooks", &format_args!("{} hook(s)", self.hooks.len()))
159+
.field("exporter", &"<exporter>")
160+
.finish()
100161
}
101162
}

crates/node/src/full/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,9 @@ impl Node {
232232

233233
let addr = cfg.socket_addr();
234234
let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports);
235-
self.task_manager.task_spawner().build_task().spawn(server.start(addr));
236235

237-
info!(%addr, "Metrics server started.");
236+
// Start the metrics server and discard the handle since full node doesn't track it
237+
let _ = server.start(addr).await?;
238238
}
239239

240240
let pipeline_handle = self.pipeline.handle();

crates/node/src/lib.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use katana_executor::implementation::blockifier::BlockifierFactory;
2525
use katana_executor::{ExecutionFlags, ExecutorFactory};
2626
use katana_gas_price_oracle::{FixedPriceOracle, GasPriceOracle};
2727
use katana_gateway_server::{GatewayServer, GatewayServerHandle};
28-
use katana_metrics::exporters::prometheus::PrometheusRecorder;
28+
use katana_metrics::exporters::prometheus::{Prometheus, PrometheusRecorder};
2929
use katana_metrics::sys::DiskReporter;
30-
use katana_metrics::{Report, Server as MetricsServer};
30+
use katana_metrics::{MetricsServerHandle, Report, Server as MetricsServer};
3131
use katana_pool::ordering::FiFo;
3232
use katana_pool::TxPool;
3333
use katana_primitives::env::VersionedConstantsOverrides;
@@ -65,6 +65,7 @@ pub struct Node {
6565
backend: Arc<Backend<BlockifierFactory>>,
6666
block_producer: BlockProducer<BlockifierFactory>,
6767
gateway_server: Option<GatewayServer<TxPool>>,
68+
metrics_server: Option<MetricsServer<Prometheus>>,
6869
}
6970

7071
impl Node {
@@ -320,13 +321,29 @@ impl Node {
320321
None
321322
};
322323

324+
// --- build metrics server (optional)
325+
326+
let metrics_server = if config.metrics.is_some() {
327+
let db_metrics = Box::new(db.clone()) as Box<dyn Report>;
328+
let disk_metrics = Box::new(DiskReporter::new(db.path())?) as Box<dyn Report>;
329+
let reports: Vec<Box<dyn Report>> = vec![db_metrics, disk_metrics];
330+
331+
let exporter = PrometheusRecorder::current().expect("qed; should exist at this point");
332+
let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports);
333+
334+
Some(server)
335+
} else {
336+
None
337+
};
338+
323339
Ok(Node {
324340
db,
325341
pool,
326342
backend,
327343
rpc_server,
328344
gateway_server,
329345
block_producer,
346+
metrics_server,
330347
config: Arc::new(config),
331348
task_manager,
332349
})
@@ -339,19 +356,15 @@ impl Node {
339356
let chain = self.backend.chain_spec.id();
340357
info!(%chain, "Starting node.");
341358

342-
// TODO: maybe move this to the build stage
343-
if let Some(ref cfg) = self.config.metrics {
344-
let db_metrics = Box::new(self.db.clone()) as Box<dyn Report>;
345-
let disk_metrics = Box::new(DiskReporter::new(self.db.path())?) as Box<dyn Report>;
346-
let reports: Vec<Box<dyn Report>> = vec![db_metrics, disk_metrics];
347-
348-
let exporter = PrometheusRecorder::current().expect("qed; should exist at this point");
349-
let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports);
359+
// --- start the metrics server (if configured)
350360

361+
let metrics_handle = if let Some(ref server) = self.metrics_server {
362+
let cfg = self.config.metrics.as_ref().expect("qed; must exist");
351363
let addr = cfg.socket_addr();
352-
self.task_manager.task_spawner().build_task().spawn(server.start(addr));
353-
info!(%addr, "Metrics server started.");
354-
}
364+
Some(server.start(addr).await?)
365+
} else {
366+
None
367+
};
355368

356369
let pool = self.pool.clone();
357370
let backend = self.backend.clone();
@@ -401,7 +414,12 @@ impl Node {
401414

402415
info!(target: "node", "Gas price oracle worker started.");
403416

404-
Ok(LaunchedNode { node: self, rpc: rpc_handle, gateway: gateway_handle })
417+
Ok(LaunchedNode {
418+
node: self,
419+
rpc: rpc_handle,
420+
gateway: gateway_handle,
421+
metrics: metrics_handle,
422+
})
405423
}
406424

407425
/// Returns a reference to the node's database environment (if any).
@@ -437,6 +455,8 @@ pub struct LaunchedNode {
437455
rpc: RpcServerHandle,
438456
/// Handle to the gateway server (if enabled).
439457
gateway: Option<GatewayServerHandle>,
458+
/// Handle to the metrics server (if enabled).
459+
metrics: Option<MetricsServerHandle>,
440460
}
441461

442462
impl LaunchedNode {
@@ -455,6 +475,11 @@ impl LaunchedNode {
455475
self.gateway.as_ref()
456476
}
457477

478+
/// Returns a reference to the metrics server handle (if enabled).
479+
pub fn metrics(&self) -> Option<&MetricsServerHandle> {
480+
self.metrics.as_ref()
481+
}
482+
458483
/// Stops the node.
459484
///
460485
/// This will instruct the node to stop and wait until it has actually stop.
@@ -467,6 +492,11 @@ impl LaunchedNode {
467492
handle.stop()?;
468493
}
469494

495+
// Stop metrics server if it's running
496+
if let Some(mut handle) = self.metrics {
497+
handle.stop()?;
498+
}
499+
470500
self.node.task_manager.shutdown().await;
471501
Ok(())
472502
}

0 commit comments

Comments
 (0)