Skip to content

Commit aeafc9f

Browse files
committed
update
1 parent 9dea3cc commit aeafc9f

File tree

3 files changed

+53
-33
lines changed

3 files changed

+53
-33
lines changed

crates/metrics/src/server.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ use tracing::info;
1111
use crate::exporters::Exporter;
1212
use crate::{Error, Report};
1313

14+
/// A helper trait for defining the type for hooks that are called when the metrics are being
15+
/// collected by the server.
16+
trait Hook: Fn() + Send + Sync {}
17+
impl<T: Fn() + Send + Sync> Hook for T {}
18+
19+
/// A shared hook that can be cloned.
20+
type SharedHook = Arc<dyn Hook<Output = ()>>;
21+
/// A list of shared hooks.
22+
type Hooks = Vec<SharedHook>;
23+
1424
/// A handle to the metrics server.
1525
#[derive(Debug)]
1626
pub struct MetricsServerHandle {
@@ -45,40 +55,27 @@ impl MetricsServerHandle {
4555
}
4656
}
4757

48-
/// A helper trait for defining the type for hooks that are called when the metrics are being
49-
/// collected by the server.
50-
trait Hook: Fn() + Send + Sync {}
51-
impl<T: Fn() + Send + Sync> Hook for T {}
52-
53-
/// A shared hook that can be cloned.
54-
type SharedHook = Arc<dyn Hook<Output = ()>>;
55-
/// A list of shared hooks.
56-
type Hooks = Vec<SharedHook>;
57-
5858
/// Server for serving metrics.
5959
// TODO: allow configuring the server executor to allow cancelling on invidiual connection tasks.
6060
// See, [hyper::server::server::Builder::executor]
61-
pub struct Server<MetricsExporter> {
61+
pub struct MetricsServer<E> {
6262
/// Hooks or callable functions for collecting metrics in the cases where
6363
/// the metrics are not being collected in the main program flow.
6464
///
6565
/// These are called when metrics are being served through the server.
6666
hooks: Hooks,
6767
/// The exporter that is used to export the collected metrics.
68-
exporter: MetricsExporter,
68+
exporter: E,
6969
}
7070

71-
impl<MetricsExporter> Server<MetricsExporter>
72-
where
73-
MetricsExporter: Exporter + 'static,
74-
{
71+
impl<E: Exporter + 'static> MetricsServer<E> {
7572
/// Creates a new metrics server using the given exporter.
76-
pub fn new(exporter: MetricsExporter) -> Self {
73+
pub fn new(exporter: E) -> Self {
7774
Self { exporter, hooks: Vec::new() }
7875
}
7976

8077
/// Add new metrics reporter to the server.
81-
pub fn with_reports<I>(mut self, reports: I) -> Self
78+
pub fn reports<I>(mut self, reports: I) -> Self
8279
where
8380
I: IntoIterator<Item = Box<dyn Report>>,
8481
{
@@ -105,7 +102,7 @@ where
105102
/// Starts an endpoint at the given address to serve Prometheus metrics.
106103
///
107104
/// Returns a handle that can be used to stop the server and wait for it to finish.
108-
pub async fn start(&self, addr: SocketAddr) -> Result<MetricsServerHandle, Error> {
105+
pub fn start(&self, addr: SocketAddr) -> Result<MetricsServerHandle, Error> {
109106
// Clone the hooks (clones the Arc references, not the closures themselves)
110107
let hooks = self.hooks.clone();
111108
let exporter = self.exporter.clone();
@@ -137,6 +134,7 @@ where
137134
let actual_addr = server.local_addr();
138135

139136
// Spawn the server with graceful shutdown
137+
// TODO: spawn on a task manager
140138
let task_handle = tokio::spawn(async move {
141139
server
142140
.with_graceful_shutdown(async {
@@ -152,7 +150,7 @@ where
152150
}
153151
}
154152

155-
impl<MetricsExporter> fmt::Debug for Server<MetricsExporter> {
153+
impl<E> fmt::Debug for MetricsServer<E> {
156154
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157155
f.debug_struct("Server")
158156
.field("hooks", &format_args!("{} hook(s)", self.hooks.len()))

crates/node/src/full/mod.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use katana_chain_spec::ChainSpec;
1111
use katana_executor::ExecutionFlags;
1212
use katana_gas_price_oracle::GasPriceOracle;
1313
use katana_gateway_client::Client as SequencerGateway;
14-
use katana_metrics::exporters::prometheus::PrometheusRecorder;
15-
use katana_metrics::{Report, Server as MetricsServer};
14+
use katana_metrics::exporters::prometheus::{Prometheus, PrometheusRecorder};
15+
use katana_metrics::sys::DiskReporter;
16+
use katana_metrics::{MetricsServer, MetricsServerHandle, Report};
1617
use katana_pipeline::{Pipeline, PipelineHandle};
1718
use katana_pool::ordering::TipOrdering;
1819
use katana_provider::providers::db::DbProvider;
@@ -76,6 +77,7 @@ pub struct Node {
7677
pub pipeline: Pipeline<DbProvider>,
7778
pub rpc_server: RpcServer,
7879
pub gateway_client: SequencerGateway,
80+
pub metrics_server: Option<MetricsServer<Prometheus>>,
7981
pub chain_tip_watcher: ChainTipWatcher<SequencerGateway>,
8082
}
8183

@@ -213,29 +215,45 @@ impl Node {
213215
rpc_server = rpc_server.max_response_body_size(max_response_body_size);
214216
}
215217

218+
// --- build metrics server (optional)
219+
220+
let metrics_server = if config.metrics.is_some() {
221+
let db_metrics = Box::new(db.clone()) as Box<dyn Report>;
222+
let disk_metrics = Box::new(DiskReporter::new(db.path())?) as Box<dyn Report>;
223+
let reports: Vec<Box<dyn Report>> = vec![db_metrics, disk_metrics];
224+
225+
let exporter = PrometheusRecorder::current().expect("qed; should exist at this point");
226+
let server = MetricsServer::new(exporter).with_process_metrics().reports(reports);
227+
228+
Some(server)
229+
} else {
230+
None
231+
};
232+
216233
Ok(Node {
217234
db,
218235
pool,
219236
pipeline,
220237
rpc_server,
221238
task_manager,
222239
gateway_client,
240+
metrics_server,
223241
chain_tip_watcher,
224242
config: Arc::new(config),
225243
})
226244
}
227245

228246
pub async fn launch(self) -> Result<LaunchedNode> {
229-
if let Some(ref cfg) = self.config.metrics {
230-
let reports: Vec<Box<dyn Report>> = vec![Box::new(self.db.clone()) as Box<dyn Report>];
231-
let exporter = PrometheusRecorder::current().expect("qed; should exist at this point");
247+
// --- start the metrics server (if configured)
232248

249+
let metrics_handle = if let Some(ref server) = self.metrics_server {
250+
// safe to unwrap here because metrics_server can only be Some if the metrics config exists
251+
let cfg = self.config.metrics.as_ref().expect("qed; must exist");
233252
let addr = cfg.socket_addr();
234-
let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports);
235-
236-
// Start the metrics server and discard the handle since full node doesn't track it
237-
let _ = server.start(addr).await?;
238-
}
253+
Some(server.start(addr)?)
254+
} else {
255+
None
256+
};
239257

240258
let pipeline_handle = self.pipeline.handle();
241259

@@ -277,6 +295,7 @@ impl Node {
277295
config: self.config,
278296
task_manager: self.task_manager,
279297
pipeline: pipeline_handle,
298+
metrics: metrics_handle,
280299
rpc,
281300
})
282301
}
@@ -289,6 +308,8 @@ pub struct LaunchedNode {
289308
pub config: Arc<Config>,
290309
pub rpc: RpcServerHandle,
291310
pub pipeline: PipelineHandle,
311+
/// Handle to the metrics server (if enabled).
312+
pub metrics: Option<MetricsServerHandle>,
292313
}
293314

294315
impl LaunchedNode {

crates/node/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use katana_gas_price_oracle::{FixedPriceOracle, GasPriceOracle};
2727
use katana_gateway_server::{GatewayServer, GatewayServerHandle};
2828
use katana_metrics::exporters::prometheus::{Prometheus, PrometheusRecorder};
2929
use katana_metrics::sys::DiskReporter;
30-
use katana_metrics::{MetricsServerHandle, Report, Server as MetricsServer};
30+
use katana_metrics::{MetricsServer, MetricsServerHandle, Report};
3131
use katana_pool::ordering::FiFo;
3232
use katana_pool::TxPool;
3333
use katana_primitives::env::VersionedConstantsOverrides;
@@ -329,7 +329,7 @@ impl Node {
329329
let reports: Vec<Box<dyn Report>> = vec![db_metrics, disk_metrics];
330330

331331
let exporter = PrometheusRecorder::current().expect("qed; should exist at this point");
332-
let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports);
332+
let server = MetricsServer::new(exporter).with_process_metrics().reports(reports);
333333

334334
Some(server)
335335
} else {
@@ -359,9 +359,10 @@ impl Node {
359359
// --- start the metrics server (if configured)
360360

361361
let metrics_handle = if let Some(ref server) = self.metrics_server {
362+
// safe to unwrap here because metrics_server can only be Some if the metrics config exists
362363
let cfg = self.config.metrics.as_ref().expect("qed; must exist");
363364
let addr = cfg.socket_addr();
364-
Some(server.start(addr).await?)
365+
Some(server.start(addr)?)
365366
} else {
366367
None
367368
};

0 commit comments

Comments
 (0)