Skip to content

Commit 7a3326d

Browse files
authored
Add monitoring to node service (#4607)
## Motivation Currently, if we start a node service we get no metrics or memory profiling. ## Proposal Add support for metrics and memory profiling from node service. Also fix the fact that it doesn't terminate with Ctrl+C (the faucet had the same thing, so I fixed it there too). ## Test Plan Start a node service locally, see that memory profile is enabled and working, and that killing it with Ctrl+C actually kills it ## Release Plan - Nothing to do / These changes follow the usual release cycle.
1 parent bb01155 commit 7a3326d

File tree

6 files changed

+61
-14
lines changed

6 files changed

+61
-14
lines changed

linera-faucet/server/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,9 @@ where
702702
let batch_processor_task = batch_processor.run(cancellation_token.clone());
703703
let tcp_listener =
704704
tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
705-
let server = axum::serve(tcp_listener, app).into_future();
705+
let server = axum::serve(tcp_listener, app)
706+
.with_graceful_shutdown(cancellation_token.cancelled_owned())
707+
.into_future();
706708
futures::select! {
707709
result = Box::pin(chain_listener).fuse() => result?,
708710
_ = Box::pin(batch_processor_task).fuse() => {},

linera-metrics/src/monitoring_server.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ pub fn start_metrics(
1515
address: impl ToSocketAddrs + Debug + Send + 'static,
1616
shutdown_signal: CancellationToken,
1717
) {
18-
info!("Starting to serve metrics on {:?}", address);
19-
2018
#[cfg(feature = "memory-profiling")]
2119
let app = {
2220
// Try to add memory profiling endpoint
@@ -42,7 +40,13 @@ pub fn start_metrics(
4240
let app = Router::new().route("/metrics", get(serve_metrics));
4341

4442
tokio::spawn(async move {
45-
if let Err(e) = axum::serve(tokio::net::TcpListener::bind(address).await.unwrap(), app)
43+
let listener = tokio::net::TcpListener::bind(address)
44+
.await
45+
.expect("Failed to bind to address");
46+
let address = listener.local_addr().expect("Failed to get local address");
47+
48+
info!("Starting to serve metrics on {:?}", address);
49+
if let Err(e) = axum::serve(listener, app)
4650
.with_graceful_shutdown(shutdown_signal.cancelled_owned())
4751
.await
4852
{

linera-service/src/cli/command.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,11 @@ pub enum ClientCommand {
742742
/// The port on which to run the server
743743
#[arg(long)]
744744
port: NonZeroU16,
745+
746+
/// The port to expose metrics on.
747+
#[cfg(with_metrics)]
748+
#[arg(long)]
749+
metrics_port: NonZeroU16,
745750
},
746751

747752
/// Run a GraphQL service that exposes a faucet where users can claim tokens.

linera-service/src/cli/main.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,7 +1307,12 @@ impl Runnable for Job {
13071307
info!("Notification stream ended.");
13081308
}
13091309

1310-
Service { config, port } => {
1310+
Service {
1311+
config,
1312+
port,
1313+
#[cfg(with_metrics)]
1314+
metrics_port,
1315+
} => {
13111316
let context = ClientContext::new(
13121317
storage,
13131318
options.context_options.clone(),
@@ -1316,11 +1321,17 @@ impl Runnable for Job {
13161321
);
13171322

13181323
let default_chain = context.wallet().default_chain();
1319-
let service = NodeService::new(config, port, default_chain, context);
1324+
let service = NodeService::new(
1325+
config,
1326+
port,
1327+
#[cfg(with_metrics)]
1328+
metrics_port,
1329+
default_chain,
1330+
context,
1331+
);
13201332
let cancellation_token = CancellationToken::new();
1321-
let child_token = cancellation_token.child_token();
1322-
tokio::spawn(listen_for_shutdown_signals(cancellation_token));
1323-
service.run(child_token).await?;
1333+
tokio::spawn(listen_for_shutdown_signals(cancellation_token.clone()));
1334+
service.run(cancellation_token).await?;
13241335
}
13251336

13261337
Faucet {

linera-service/src/node_service.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use linera_execution::{
3636
committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
3737
SystemOperation,
3838
};
39+
#[cfg(with_metrics)]
40+
use linera_metrics::monitoring_server;
3941
use linera_sdk::linera_base_types::BlobContent;
4042
use serde::{Deserialize, Serialize};
4143
use serde_json::json;
@@ -816,6 +818,8 @@ where
816818
{
817819
config: ChainListenerConfig,
818820
port: NonZeroU16,
821+
#[cfg(with_metrics)]
822+
metrics_port: NonZeroU16,
819823
default_chain: Option<ChainId>,
820824
context: Arc<Mutex<C>>,
821825
}
@@ -828,6 +832,8 @@ where
828832
Self {
829833
config: self.config.clone(),
830834
port: self.port,
835+
#[cfg(with_metrics)]
836+
metrics_port: self.metrics_port,
831837
default_chain: self.default_chain,
832838
context: Arc::clone(&self.context),
833839
}
@@ -842,17 +848,25 @@ where
842848
pub fn new(
843849
config: ChainListenerConfig,
844850
port: NonZeroU16,
851+
#[cfg(with_metrics)] metrics_port: NonZeroU16,
845852
default_chain: Option<ChainId>,
846853
context: C,
847854
) -> Self {
848855
Self {
849856
config,
850857
port,
858+
#[cfg(with_metrics)]
859+
metrics_port,
851860
default_chain,
852861
context: Arc::new(Mutex::new(context)),
853862
}
854863
}
855864

865+
#[cfg(with_metrics)]
866+
pub fn metrics_address(&self) -> SocketAddr {
867+
SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
868+
}
869+
856870
pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
857871
Schema::build(
858872
QueryRoot {
@@ -878,6 +892,9 @@ where
878892
let application_handler =
879893
axum::routing::get(util::graphiql).post(Self::application_handler);
880894

895+
#[cfg(with_metrics)]
896+
monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
897+
881898
let app = Router::new()
882899
.route("/", index_handler)
883900
.route(
@@ -894,14 +911,20 @@ where
894911

895912
let storage = self.context.lock().await.storage().clone();
896913

897-
let chain_listener =
898-
ChainListener::new(self.config, self.context, storage, cancellation_token)
899-
.run()
900-
.await?;
914+
let chain_listener = ChainListener::new(
915+
self.config,
916+
self.context,
917+
storage,
918+
cancellation_token.clone(),
919+
)
920+
.run()
921+
.await?;
901922
let mut chain_listener = Box::pin(chain_listener).fuse();
902923
let tcp_listener =
903924
tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
904-
let server = axum::serve(tcp_listener, app).into_future();
925+
let server = axum::serve(tcp_listener, app)
926+
.with_graceful_shutdown(cancellation_token.cancelled_owned())
927+
.into_future();
905928
futures::select! {
906929
result = chain_listener => result?,
907930
result = Box::pin(server).fuse() => result?,

linera-service/src/schema_export.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ async fn main() -> std::io::Result<()> {
228228
let service = NodeService::new(
229229
ChainListenerConfig::default(),
230230
std::num::NonZeroU16::new(8080).unwrap(),
231+
#[cfg(with_metrics)]
232+
std::num::NonZeroU16::new(8081).unwrap(),
231233
None,
232234
DummyContext,
233235
);

0 commit comments

Comments
 (0)