Skip to content

Commit 0b6423c

Browse files
nepetcdecker
authored andcommitted
plugin: Move rpc initialization into OnceCell
We want to ensure that we acutally can connect to the lightning-rpc socket before we send any calls to core lightning. Using a OnceCell ensures that we initialize it (only once) but independent of the actuall call. We still need to keep it in a Mutex as `cln_rpc`'s `call_raw_request` does not seem to be async safe. Signed-off-by: Peter Neuroth <pet.v.ne@gmail.com>
1 parent 8557cdf commit 0b6423c

File tree

1 file changed

+37
-40
lines changed

1 file changed

+37
-40
lines changed

libs/gl-plugin/src/node/mod.rs

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ use governor::{
1212
};
1313
use lazy_static::lazy_static;
1414
use log::{debug, error, info, trace, warn};
15-
use std::path::PathBuf;
15+
use std::path::{Path, PathBuf};
1616
use std::sync::atomic::AtomicBool;
1717
use std::sync::{
1818
atomic::{AtomicUsize, Ordering},
1919
Arc,
2020
};
21+
use std::time::Duration;
2122
use tokio::sync::{broadcast, mpsc, Mutex, OnceCell};
2223
use tokio_stream::wrappers::ReceiverStream;
2324
use tonic::{transport::ServerTlsConfig, Code, Request, Response, Status};
@@ -29,6 +30,32 @@ pub use wrapper::WrappedNodeServer;
2930
static LIMITER: OnceCell<RateLimiter<NotKeyed, InMemoryState, MonotonicClock>> =
3031
OnceCell::const_new();
3132

33+
static RPC_CLIENT: OnceCell<Arc<Mutex<cln_rpc::ClnRpc>>> = OnceCell::const_new();
34+
35+
pub async fn get_rpc<P: AsRef<Path>>(path: P) -> Arc<Mutex<cln_rpc::ClnRpc>> {
36+
RPC_CLIENT
37+
.get_or_init(|| async {
38+
loop {
39+
match cln_rpc::ClnRpc::new(path.as_ref()).await {
40+
Ok(client) => {
41+
debug!("Connected to lightning-rpc.");
42+
return Arc::new(Mutex::new(client));
43+
}
44+
Err(e) => {
45+
debug!(
46+
"Failed to connect to lightning-rpc: {:?}. Retrying in 50m...",
47+
e
48+
);
49+
tokio::time::sleep(Duration::from_millis(50)).await;
50+
continue;
51+
}
52+
}
53+
}
54+
})
55+
.await
56+
.clone()
57+
}
58+
3259
lazy_static! {
3360
static ref HSM_ID_COUNT: AtomicUsize = AtomicUsize::new(0);
3461

@@ -102,30 +129,8 @@ impl PluginNodeServer {
102129
};
103130

104131
tokio::spawn(async move {
105-
debug!("Locking grpc interface until the JSON-RPC interface becomes available.");
106-
use tokio::time::{sleep, Duration};
107-
108-
// Move the lock into the closure so we can release it later.
109-
let mut rpc = cln_rpc::ClnRpc::new(rpc_path.clone()).await.unwrap();
110-
loop {
111-
let res = rpc
112-
.call_typed(&cln_rpc::model::requests::GetinfoRequest {})
113-
.await;
114-
match res {
115-
Ok(_) => break,
116-
Err(e) => {
117-
warn!(
118-
"JSON-RPC interface not yet available. Delaying 50ms. {:?}",
119-
e
120-
);
121-
sleep(Duration::from_millis(50)).await;
122-
}
123-
}
124-
}
125-
126-
// Signal that the RPC is ready now.
127-
RPC_READY.store(true, Ordering::SeqCst);
128-
132+
let rpc_arc = get_rpc(&rpc_path).await.clone();
133+
let mut rpc = rpc_arc.lock().await;
129134
let list_datastore_req = cln_rpc::model::requests::ListdatastoreRequest {
130135
key: Some(vec!["glconf".to_string(), "request".to_string()]),
131136
};
@@ -150,8 +155,6 @@ impl PluginNodeServer {
150155
}
151156
Err(_) => {}
152157
}
153-
154-
drop(rpc);
155158
});
156159

157160
Ok(s)
@@ -168,10 +171,6 @@ impl PluginNodeServer {
168171

169172
limiter.until_ready().await
170173
}
171-
172-
pub async fn get_rpc(&self) -> Result<cln_rpc::ClnRpc> {
173-
cln_rpc::ClnRpc::new(self.rpc_path.clone()).await
174-
}
175174
}
176175

177176
#[tonic::async_trait]
@@ -464,12 +463,8 @@ impl Node for PluginNodeServer {
464463
) -> Result<Response<pb::Empty>, Status> {
465464
self.limit().await;
466465
let gl_config = req.into_inner();
467-
let mut rpc = self.get_rpc().await.map_err(|e| {
468-
tonic::Status::new(
469-
tonic::Code::Internal,
470-
format!("could not connect rpc client: {e}"),
471-
)
472-
})?;
466+
let rpc_arc = get_rpc(&self.rpc_path).await;
467+
let mut rpc = rpc_arc.lock().await;
473468

474469
let res = rpc
475470
.call_typed(&cln_rpc::model::requests::GetinfoRequest {})
@@ -622,14 +617,16 @@ impl PluginNodeServer {
622617
}
623618

624619
log::info!("Reconnecting all peers (plugin)");
625-
let mut rpc = cln_rpc::ClnRpc::new(self.rpc_path.clone()).await?;
626620
let peers = self.get_reconnect_peers().await?;
627621
log::info!(
628622
"Found {} peers to reconnect: {:?} (plugin)",
629623
peers.len(),
630624
peers.iter().map(|p| p.id.clone())
631625
);
632626

627+
let rpc_arc = get_rpc(&self.rpc_path).await;
628+
let mut rpc = rpc_arc.lock().await;
629+
633630
for r in peers {
634631
trace!("Calling connect: {:?} (plugin)", &r.id);
635632
let res = rpc.call_typed(&r).await;
@@ -646,8 +643,8 @@ impl PluginNodeServer {
646643
async fn get_reconnect_peers(
647644
&self,
648645
) -> Result<Vec<cln_rpc::model::requests::ConnectRequest>, Error> {
649-
let rpc_path = self.rpc_path.clone();
650-
let mut rpc = cln_rpc::ClnRpc::new(rpc_path).await?;
646+
let rpc_arc = get_rpc(&self.rpc_path).await;
647+
let mut rpc = rpc_arc.lock().await;
651648
let peers = rpc
652649
.call_typed(&cln_rpc::model::requests::ListpeersRequest {
653650
id: None,

0 commit comments

Comments
 (0)