@@ -12,12 +12,13 @@ use governor::{
1212} ;
1313use lazy_static:: lazy_static;
1414use log:: { debug, error, info, trace, warn} ;
15- use std:: path:: PathBuf ;
15+ use std:: path:: { Path , PathBuf } ;
1616use std:: sync:: atomic:: AtomicBool ;
1717use std:: sync:: {
1818 atomic:: { AtomicUsize , Ordering } ,
1919 Arc ,
2020} ;
21+ use std:: time:: Duration ;
2122use tokio:: sync:: { broadcast, mpsc, Mutex , OnceCell } ;
2223use tokio_stream:: wrappers:: ReceiverStream ;
2324use tonic:: { transport:: ServerTlsConfig , Code , Request , Response , Status } ;
@@ -29,6 +30,31 @@ pub use wrapper::WrappedNodeServer;
2930static LIMITER : OnceCell < RateLimiter < NotKeyed , InMemoryState , MonotonicClock > > =
3031 OnceCell :: const_new ( ) ;
3132
33+ static RPC_CLIENT : OnceCell < Arc < Mutex < cln_rpc:: ClnRpc > > > = OnceCell :: const_new ( ) ;
34+
35+ pub async fn get_rpc < P : AsRef < Path > > ( path : P ) -> Arc < Mutex < cln_rpc:: ClnRpc > > {
36+ RPC_CLIENT
37+ . get_or_init ( || async {
38+ loop {
39+ match cln_rpc:: ClnRpc :: new ( path. as_ref ( ) ) . await {
40+ Ok ( client) => {
41+ debug ! ( "Connected to lightning_rpc." ) ;
42+ return Arc :: new ( Mutex :: new ( client) ) ;
43+ }
44+ Err ( e) => {
45+ debug ! (
46+ "Failed to connect to ligthning_rpc: {:?}. Retrying in 50m..." ,
47+ e
48+ ) ;
49+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
50+ }
51+ }
52+ }
53+ } )
54+ . await
55+ . clone ( )
56+ }
57+
3258lazy_static ! {
3359 static ref HSM_ID_COUNT : AtomicUsize = AtomicUsize :: new( 0 ) ;
3460
@@ -102,30 +128,8 @@ impl PluginNodeServer {
102128 } ;
103129
104130 tokio:: spawn ( async move {
105- debug ! ( "Locking grpc interface until the JSON-RPC interface becomes available." ) ;
106- use tokio:: time:: { sleep, Duration } ;
107-
108- // Move the lock into the closure so we can release it later.
109- let mut rpc = cln_rpc:: ClnRpc :: new ( rpc_path. clone ( ) ) . await . unwrap ( ) ;
110- loop {
111- let res = rpc
112- . call_typed ( & cln_rpc:: model:: requests:: GetinfoRequest { } )
113- . await ;
114- match res {
115- Ok ( _) => break ,
116- Err ( e) => {
117- warn ! (
118- "JSON-RPC interface not yet available. Delaying 50ms. {:?}" ,
119- e
120- ) ;
121- sleep ( Duration :: from_millis ( 50 ) ) . await ;
122- }
123- }
124- }
125-
126- // Signal that the RPC is ready now.
127- RPC_READY . store ( true , Ordering :: SeqCst ) ;
128-
131+ let rpc_arc = get_rpc ( & rpc_path) . await . clone ( ) ;
132+ let mut rpc = rpc_arc. lock ( ) . await ;
129133 let list_datastore_req = cln_rpc:: model:: requests:: ListdatastoreRequest {
130134 key : Some ( vec ! [ "glconf" . to_string( ) , "request" . to_string( ) ] ) ,
131135 } ;
@@ -150,8 +154,6 @@ impl PluginNodeServer {
150154 }
151155 Err ( _) => { }
152156 }
153-
154- drop ( rpc) ;
155157 } ) ;
156158
157159 Ok ( s)
@@ -168,10 +170,6 @@ impl PluginNodeServer {
168170
169171 limiter. until_ready ( ) . await
170172 }
171-
172- pub async fn get_rpc ( & self ) -> Result < cln_rpc:: ClnRpc > {
173- cln_rpc:: ClnRpc :: new ( self . rpc_path . clone ( ) ) . await
174- }
175173}
176174
177175#[ tonic:: async_trait]
@@ -464,12 +462,8 @@ impl Node for PluginNodeServer {
464462 ) -> Result < Response < pb:: Empty > , Status > {
465463 self . limit ( ) . await ;
466464 let gl_config = req. into_inner ( ) ;
467- let mut rpc = self . get_rpc ( ) . await . map_err ( |e| {
468- tonic:: Status :: new (
469- tonic:: Code :: Internal ,
470- format ! ( "could not connect rpc client: {e}" ) ,
471- )
472- } ) ?;
465+ let rpc_arc = get_rpc ( & self . rpc_path ) . await ;
466+ let mut rpc = rpc_arc. lock ( ) . await ;
473467
474468 let res = rpc
475469 . call_typed ( & cln_rpc:: model:: requests:: GetinfoRequest { } )
@@ -622,7 +616,8 @@ impl PluginNodeServer {
622616 }
623617
624618 log:: info!( "Reconnecting all peers (plugin)" ) ;
625- let mut rpc = cln_rpc:: ClnRpc :: new ( self . rpc_path . clone ( ) ) . await ?;
619+ let rpc_arc = get_rpc ( & self . rpc_path ) . await ;
620+ let mut rpc = rpc_arc. lock ( ) . await ;
626621 let peers = self . get_reconnect_peers ( ) . await ?;
627622 log:: info!(
628623 "Found {} peers to reconnect: {:?} (plugin)" ,
@@ -646,8 +641,8 @@ impl PluginNodeServer {
646641 async fn get_reconnect_peers (
647642 & self ,
648643 ) -> Result < Vec < cln_rpc:: model:: requests:: ConnectRequest > , Error > {
649- let rpc_path = self . rpc_path . clone ( ) ;
650- let mut rpc = cln_rpc :: ClnRpc :: new ( rpc_path ) . await ? ;
644+ let rpc_arc = get_rpc ( & self . rpc_path ) . await ;
645+ let mut rpc = rpc_arc . lock ( ) . await ;
651646 let peers = rpc
652647 . call_typed ( & cln_rpc:: model:: requests:: ListpeersRequest {
653648 id : None ,
0 commit comments