Skip to content

Commit 6b8b874

Browse files
committed
ln: om: add option to internally queue forwards to offline peers
Adds an option to `OnionMessenger` to have it queue all onion message forwards to offline peers. Context: when someone requests or pays a BOLT12 invoice to one of our user nodes, our LSP may need to first spin up their node before it can finally forward the onion message. Why not use `intercept_messages_for_offline_peers`? We'd have to rebuild almost the exact same `message_recipients` queue outside LDK. With this change, the logic turned out super simple on our end since we just have to handle the `ConnectionNeeded` event. All the pending messages then automatically forward once the peer connects. If the connection fails or it's not one of our peers, the pending messages will just naturally churn out in a few timer ticks.
1 parent 8d44e80 commit 6b8b874

File tree

1 file changed

+69
-3
lines changed

1 file changed

+69
-3
lines changed

lightning/src/onion_message/messenger.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ pub struct OnionMessenger<
318318
dns_resolver_handler: DRH,
319319
custom_handler: CMH,
320320
intercept_messages_for_offline_peers: bool,
321+
queue_messages_for_offline_peers: bool,
321322
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
322323
pending_peer_connected_events: Mutex<Vec<Event>>,
323324
pending_events_processor: AtomicBool,
@@ -1233,6 +1234,8 @@ where
12331234
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
12341235
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
12351236
) -> Self {
1237+
let intercept_messages_for_offline_peers = false;
1238+
let queue_messages_for_offline_peers = false;
12361239
Self::new_inner(
12371240
entropy_source,
12381241
node_signer,
@@ -1243,7 +1246,8 @@ where
12431246
async_payments_handler,
12441247
dns_resolver,
12451248
custom_handler,
1246-
false,
1249+
intercept_messages_for_offline_peers,
1250+
queue_messages_for_offline_peers,
12471251
)
12481252
}
12491253

@@ -1272,6 +1276,8 @@ where
12721276
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
12731277
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
12741278
) -> Self {
1279+
let intercept_messages_for_offline_peers = true;
1280+
let queue_messages_for_offline_peers = false;
12751281
Self::new_inner(
12761282
entropy_source,
12771283
node_signer,
@@ -1282,15 +1288,48 @@ where
12821288
async_payments_handler,
12831289
dns_resolver,
12841290
custom_handler,
1285-
true,
1291+
intercept_messages_for_offline_peers,
1292+
queue_messages_for_offline_peers,
1293+
)
1294+
}
1295+
1296+
/// Similar to [`Self::new`], but rather than dropping onion messages that
1297+
/// are intended to be forwarded to offline peers, we'll queue them until
1298+
/// the peer connects or two timer ticks pass.
1299+
///
1300+
/// A single [`Event::ConnectionNeeded`] event with just the NodeId and no
1301+
/// addresses will be generated once a message queues for an offline peer.
1302+
pub fn new_with_offline_peer_queueing(
1303+
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
1304+
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
1305+
) -> Self {
1306+
let intercept_messages_for_offline_peers = false;
1307+
let queue_messages_for_offline_peers = true;
1308+
Self::new_inner(
1309+
entropy_source,
1310+
node_signer,
1311+
logger,
1312+
node_id_lookup,
1313+
message_router,
1314+
offers_handler,
1315+
async_payments_handler,
1316+
dns_resolver,
1317+
custom_handler,
1318+
intercept_messages_for_offline_peers,
1319+
queue_messages_for_offline_peers,
12861320
)
12871321
}
12881322

12891323
fn new_inner(
12901324
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
12911325
offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
1292-
intercept_messages_for_offline_peers: bool,
1326+
intercept_messages_for_offline_peers: bool, queue_messages_for_offline_peers: bool,
12931327
) -> Self {
1328+
debug_assert!(
1329+
!(intercept_messages_for_offline_peers && queue_messages_for_offline_peers),
1330+
"Can't generate intercept events and queue messages for offline peers at the same time",
1331+
);
1332+
12941333
let mut secp_ctx = Secp256k1::new();
12951334
secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
12961335
OnionMessenger {
@@ -1306,6 +1345,7 @@ where
13061345
dns_resolver_handler: dns_resolver,
13071346
custom_handler,
13081347
intercept_messages_for_offline_peers,
1348+
queue_messages_for_offline_peers,
13091349
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
13101350
pending_peer_connected_events: Mutex::new(Vec::new()),
13111351
pending_events_processor: AtomicBool::new(false),
@@ -2023,6 +2063,32 @@ where
20232063
.entry(next_node_id)
20242064
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()));
20252065

2066+
// When enabled, we'll queue all messages, even for offline peers
2067+
// and peers pending connection.
2068+
if self.queue_messages_for_offline_peers {
2069+
match message_recipients.entry(next_node_id) {
2070+
hash_map::Entry::Occupied(mut e) => {
2071+
log_trace!(logger, "Forwarding onion message peer {next_node_id}");
2072+
e.get_mut().enqueue_message(onion_message);
2073+
},
2074+
hash_map::Entry::Vacant(e) => {
2075+
log_trace!(
2076+
logger,
2077+
"Forwarding onion message to disconnected peer {next_node_id}: \
2078+
awaiting connection"
2079+
);
2080+
let addrs = Vec::new();
2081+
e.insert(OnionMessageRecipient::pending_connection(addrs))
2082+
.enqueue_message(onion_message);
2083+
// Notify the background processor that we need to
2084+
// connect to this peer.
2085+
self.event_notifier.notify();
2086+
},
2087+
};
2088+
return;
2089+
}
2090+
2091+
// Otherwise, only forward to connected peers.
20262092
match message_recipients.entry(next_node_id) {
20272093
hash_map::Entry::Occupied(mut e)
20282094
if matches!(e.get(), OnionMessageRecipient::ConnectedPeer(..)) =>

0 commit comments

Comments
 (0)