Skip to content

Commit 1bbf52e

Browse files
committed
doe: Centralize shutdown tx/rx creation
1 parent e1125d3 commit 1bbf52e

File tree

1 file changed

+74
-24
lines changed

1 file changed

+74
-24
lines changed

node/src/init.rs

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ pub const DEFAULT_CHANNEL_SIZE: usize = 256;
5858
pub struct LexeNode {
5959
// --- General --- //
6060
args: RunArgs,
61-
shutdown_tx: broadcast::Sender<()>,
6261
pub peer_port: Port,
6362

6463
// --- Actors --- //
@@ -74,17 +73,17 @@ pub struct LexeNode {
7473
fee_estimator: Arc<FeeEstimatorType>,
7574
broadcaster: Arc<BroadcasterType>,
7675
logger: LexeTracingLogger,
76+
inactivity_timer: InactivityTimer,
7777

7878
// --- Sync --- //
7979
restarting_node: bool,
8080
channel_monitors: Vec<(BlockHash, ChannelMonitorType)>,
8181
channel_manager_blockhash: BlockHash,
82-
activity_rx: mpsc::Receiver<()>,
8382

8483
// --- Run --- //
8584
inbound_payments: PaymentInfoStorageType,
8685
outbound_payments: PaymentInfoStorageType,
87-
shutdown_rx: broadcast::Receiver<()>,
86+
main_thread_shutdown_rx: broadcast::Receiver<()>,
8887
stop_listen_connect: Arc<AtomicBool>,
8988
}
9089

@@ -143,8 +142,7 @@ impl LexeNode {
143142

144143
// Init Tokio channels
145144
let (activity_tx, activity_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
146-
let (shutdown_tx, shutdown_rx) =
147-
broadcast::channel(DEFAULT_CHANNEL_SIZE);
145+
let shutdown = ShutdownChannel::new();
148146
let (channel_monitor_updated_tx, channel_monitor_updated_rx) =
149147
mpsc::channel(DEFAULT_CHANNEL_SIZE);
150148

@@ -167,12 +165,11 @@ impl LexeNode {
167165

168166
// Set up the persister -> chain monitor channel
169167
// TODO(max): Handle the handle
170-
let channel_monitor_updated_shutdown_rx = shutdown_tx.subscribe();
171168
let _channel_monitor_updated_handle =
172169
spawn_channel_monitor_updated_task(
173170
chain_monitor.clone(),
174171
channel_monitor_updated_rx,
175-
channel_monitor_updated_shutdown_rx,
172+
shutdown.rx.channel_monitor_updated,
176173
);
177174

178175
// Read the `ChannelMonitor`s and initialize the `P2PGossipSync`
@@ -255,7 +252,7 @@ impl LexeNode {
255252

256253
// TODO(phlip9): authenticate host<->node
257254
// Start warp service for host
258-
let host_routes = command::server::host_routes(shutdown_tx.clone());
255+
let host_routes = command::server::host_routes(shutdown.tx.host_routes);
259256
let (host_addr, host_service_fut) = warp::serve(host_routes)
260257
// A value of 0 indicates that the OS will assign a port for us
261258
.try_bind_ephemeral(([127, 0, 0, 1], args.host_port.unwrap_or(0)))
@@ -309,7 +306,6 @@ impl LexeNode {
309306

310307
// Start Background Processing
311308
// TODO(max): Handle the handle
312-
let bgp_shutdown_rx = shutdown_tx.subscribe();
313309
let _bgp_handle = LexeBackgroundProcessor::start(
314310
channel_manager.clone(),
315311
peer_manager.clone(),
@@ -318,8 +314,17 @@ impl LexeNode {
318314
invoice_payer.clone(),
319315
gossip_sync.clone(),
320316
scorer.clone(),
321-
shutdown_tx.clone(),
322-
bgp_shutdown_rx,
317+
shutdown.tx.background_processor,
318+
shutdown.rx.background_processor,
319+
);
320+
321+
// Construct (but don't start) the inactivity timer
322+
let inactivity_timer = InactivityTimer::new(
323+
args.shutdown_after_sync_if_no_activity,
324+
args.inactivity_timer_sec,
325+
activity_rx,
326+
shutdown.tx.inactivity_timer,
327+
shutdown.rx.inactivity_timer,
323328
);
324329

325330
// Spawn a task to regularly reconnect to channel peers
@@ -332,10 +337,10 @@ impl LexeNode {
332337
);
333338

334339
// Build and return the LexeNode
340+
let main_thread_shutdown_rx = shutdown.rx.main_thread;
335341
let node = LexeNode {
336342
// General
337343
args,
338-
shutdown_tx,
339344
peer_port,
340345

341346
// Actors
@@ -351,17 +356,17 @@ impl LexeNode {
351356
fee_estimator,
352357
broadcaster,
353358
logger,
359+
inactivity_timer,
354360

355361
// Sync
356362
restarting_node,
357363
channel_manager_blockhash,
358364
channel_monitors,
359-
activity_rx,
360365

361366
// Run
362367
inbound_payments,
363368
outbound_payments,
364-
shutdown_rx,
369+
main_thread_shutdown_rx,
365370
stop_listen_connect,
366371
};
367372
Ok(node)
@@ -392,16 +397,8 @@ impl LexeNode {
392397

393398
// Sync is complete; start the inactivity timer.
394399
debug!("Starting inactivity timer");
395-
let timer_shutdown_rx = self.shutdown_tx.subscribe();
396-
let mut inactivity_timer = InactivityTimer::new(
397-
self.args.shutdown_after_sync_if_no_activity,
398-
self.args.inactivity_timer_sec,
399-
self.activity_rx,
400-
self.shutdown_tx.clone(),
401-
timer_shutdown_rx,
402-
);
403400
tokio::spawn(async move {
404-
inactivity_timer.start().await;
401+
self.inactivity_timer.start().await;
405402
});
406403

407404
// --- Run --- //
@@ -426,7 +423,7 @@ impl LexeNode {
426423
}
427424

428425
// Pause here and wait for the shutdown signal
429-
let _ = self.shutdown_rx.recv().await;
426+
let _ = self.main_thread_shutdown_rx.recv().await;
430427

431428
// --- Shutdown --- //
432429
info!("Main thread shutting down");
@@ -530,6 +527,59 @@ async fn fetch_provisioned_secrets(
530527
}
531528
}
532529

530+
struct ShutdownChannel {
531+
tx: ShutdownTx,
532+
rx: ShutdownRx,
533+
}
534+
535+
struct ShutdownTx {
536+
background_processor: broadcast::Sender<()>,
537+
inactivity_timer: broadcast::Sender<()>,
538+
host_routes: broadcast::Sender<()>,
539+
}
540+
541+
struct ShutdownRx {
542+
main_thread: broadcast::Receiver<()>,
543+
inactivity_timer: broadcast::Receiver<()>,
544+
background_processor: broadcast::Receiver<()>,
545+
channel_monitor_updated: broadcast::Receiver<()>,
546+
}
547+
548+
impl ShutdownChannel {
549+
/// Initializes the `shutdown_tx` / `shutdown_rx` channel senders and
550+
/// receivers. These are initialized as structs to ensure that *all* calls
551+
/// to [`subscribe`] are complete before any values are sent.
552+
///
553+
/// [`subscribe`]: broadcast::Sender::subscribe
554+
fn new() -> Self {
555+
let (shutdown_tx, _) = broadcast::channel(DEFAULT_CHANNEL_SIZE);
556+
557+
// Clone txs
558+
let host_routes = shutdown_tx.clone();
559+
let inactivity_timer = shutdown_tx.clone();
560+
let background_processor = shutdown_tx.clone();
561+
let tx = ShutdownTx {
562+
host_routes,
563+
inactivity_timer,
564+
background_processor,
565+
};
566+
567+
// Subscribe rxs
568+
let main_thread = shutdown_tx.subscribe();
569+
let inactivity_timer = shutdown_tx.subscribe();
570+
let background_processor = shutdown_tx.subscribe();
571+
let channel_monitor_updated = shutdown_tx.subscribe();
572+
let rx = ShutdownRx {
573+
main_thread,
574+
inactivity_timer,
575+
background_processor,
576+
channel_monitor_updated,
577+
};
578+
579+
Self { tx, rx }
580+
}
581+
}
582+
533583
/// Initializes the ChannelMonitors
534584
async fn channel_monitors(
535585
persister: &LexePersister,

0 commit comments

Comments
 (0)