Skip to content

Commit d462c55

Browse files
Implement manual handling of liquidity requests in LSPS2 service
1 parent af9cd15 commit d462c55

File tree

4 files changed

+253
-15
lines changed

4 files changed

+253
-15
lines changed

src/builder.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ struct LiquiditySourceConfig {
117117
lsps2_client: Option<LSPS2ClientConfig>,
118118
// Act as an LSPS2 service.
119119
lsps2_service: Option<LSPS2ServiceConfig>,
120+
// Act as a liquidity source with manually handled requests.
121+
manually_handle_liquidity_requests: bool,
120122
}
121123

122124
#[derive(Clone)]
@@ -467,6 +469,20 @@ impl NodeBuilder {
467469
self
468470
}
469471

472+
/// Configures whether liquidity events should be handled manually or automatically.
473+
///
474+
/// If set to `true`, liquidity events will not be processed automatically by the node.
475+
/// Instead, they must be retrieved and handled manually using [`Node::liquidity_next_event_async`].
476+
pub fn set_manually_handle_liquidity_requests(
477+
&mut self, manually_handle_liquidity_requests: bool,
478+
) -> &mut Self {
479+
let liquidity_source_config =
480+
self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default());
481+
liquidity_source_config.manually_handle_liquidity_requests =
482+
manually_handle_liquidity_requests;
483+
self
484+
}
485+
470486
/// Sets the used storage directory path.
471487
pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self {
472488
self.config.storage_dir_path = storage_dir_path;
@@ -1591,6 +1607,9 @@ fn build_with_store_internal(
15911607
liquidity_source_builder.lsps2_service(promise_secret, config.clone())
15921608
});
15931609

1610+
liquidity_source_builder
1611+
.set_manually_handle_liquidity_requests(lsc.manually_handle_liquidity_requests);
1612+
15941613
let liquidity_source = runtime
15951614
.block_on(async move { liquidity_source_builder.build().await.map(Arc::new) })?;
15961615
let custom_message_handler =

src/lib.rs

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -590,22 +590,29 @@ impl Node {
590590

591591
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
592592
let mut stop_liquidity_handler = self.stop_sender.subscribe();
593-
let liquidity_handler = Arc::clone(&liquidity_source);
594-
let liquidity_logger = Arc::clone(&self.logger);
595-
self.runtime.spawn_background_task(async move {
596-
loop {
597-
tokio::select! {
598-
_ = stop_liquidity_handler.changed() => {
599-
log_debug!(
600-
liquidity_logger,
601-
"Stopping processing liquidity events.",
602-
);
603-
return;
593+
if !liquidity_source.get_manually_handle_liquidity_requests() {
594+
let liquidity_handler = Arc::clone(&liquidity_source);
595+
let liquidity_logger = Arc::clone(&self.logger);
596+
self.runtime.spawn_background_task(async move {
597+
loop {
598+
tokio::select! {
599+
_ = stop_liquidity_handler.changed() => {
600+
log_debug!(
601+
liquidity_logger,
602+
"Stopping processing liquidity events.",
603+
);
604+
return;
605+
}
606+
_ = liquidity_handler.handle_next_event() => {}
604607
}
605-
_ = liquidity_handler.handle_next_event() => {}
606608
}
607-
}
608-
});
609+
});
610+
} else {
611+
log_debug!(
612+
self.logger,
613+
"Liquidity source is configured to manually handle liquidity requests.",
614+
);
615+
}
609616
}
610617

611618
log_info!(self.logger, "Startup complete.");
@@ -935,6 +942,35 @@ impl Node {
935942
))
936943
}
937944

945+
/// Returns the next liquidity event, if available.
946+
///
947+
/// This method can only be used if the liquidity source is configured with
948+
/// `manually_handle_liquidity_requests` set to `true`. If set to `false`, liquidity events
949+
/// are processed automatically by the node, and this method should not be called.
950+
pub async fn liquidity_next_event_async(
951+
&self,
952+
) -> Result<lightning_liquidity::events::LiquidityEvent, NodeError> {
953+
match self.liquidity_source.as_ref() {
954+
Some(ls) => ls.next_event_async().await,
955+
None => Err(Error::NoLiquiditySourceConfigured),
956+
}
957+
}
958+
959+
/// Processes a liquidity event using the default LDK handling logic.
960+
///
961+
/// This function implements the standard way LDK processes liquidity events automatically.
962+
/// It can be used in conjunction with manual handling (when `manually_handle_liquidity_requests`
963+
/// is enabled), allowing selective manual processing of some events while delegating others
964+
/// to default LDK processing.
965+
pub async fn liquidity_process_event_defaults(
966+
&self, event: lightning_liquidity::events::LiquidityEvent,
967+
) -> Result<(), NodeError> {
968+
match self.liquidity_source.as_ref() {
969+
Some(ls) => Ok(ls.process_event_default(event).await),
970+
None => Err(Error::NoLiquiditySourceConfigured),
971+
}
972+
}
973+
938974
/// Returns a liquidity handler allowing to request channels via the [bLIP-51 / LSPS1] protocol.
939975
///
940976
/// [bLIP-51 / LSPS1]: https://github.com/lightning/blips/blob/master/blip-0051.md

src/liquidity.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ where
198198
kv_store: Arc<DynStore>,
199199
config: Arc<Config>,
200200
logger: L,
201+
manually_handle_liquidity_requests: bool,
201202
}
202203

203204
impl<L: Deref> LiquiditySourceBuilder<L>
@@ -211,6 +212,7 @@ where
211212
let lsps1_client = None;
212213
let lsps2_client = None;
213214
let lsps2_service = None;
215+
let manually_handle_liquidity_requests = false;
214216
Self {
215217
lsps1_client,
216218
lsps2_client,
@@ -222,6 +224,7 @@ where
222224
kv_store,
223225
config,
224226
logger,
227+
manually_handle_liquidity_requests,
225228
}
226229
}
227230

@@ -273,6 +276,13 @@ where
273276
self
274277
}
275278

279+
pub(crate) fn set_manually_handle_liquidity_requests(
280+
&mut self, manually_handle: bool,
281+
) -> &mut Self {
282+
self.manually_handle_liquidity_requests = manually_handle;
283+
self
284+
}
285+
276286
pub(crate) async fn build(self) -> Result<LiquiditySource<L>, BuildError> {
277287
let liquidity_service_config = self.lsps2_service.as_ref().and_then(|s| {
278288
let lsps2_service_config = Some(s.ldk_service_config.clone());
@@ -326,6 +336,7 @@ where
326336
liquidity_manager,
327337
config: self.config,
328338
logger: self.logger,
339+
manually_handle_liquidity_requests: self.manually_handle_liquidity_requests,
329340
})
330341
}
331342
}
@@ -344,6 +355,7 @@ where
344355
liquidity_manager: Arc<LiquidityManager>,
345356
config: Arc<Config>,
346357
logger: L,
358+
manually_handle_liquidity_requests: bool,
347359
}
348360

349361
impl<L: Deref> LiquiditySource<L>
@@ -366,15 +378,34 @@ where
366378
self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone()))
367379
}
368380

381+
pub(crate) fn get_manually_handle_liquidity_requests(&self) -> bool {
382+
self.manually_handle_liquidity_requests.clone()
383+
}
384+
369385
fn get_lsps2_service_config(&self) -> Option<LSPS2ServiceConfig> {
370386
self.lsps2_service
371387
.as_ref()
372388
.and_then(|s| s.service_config.read().ok())
373389
.map(|cfg| cfg.clone()) // Clone to release lock
374390
}
375391

392+
pub(crate) async fn next_event_async(&self) -> Result<LiquidityEvent, Error> {
393+
if !self.get_manually_handle_liquidity_requests() {
394+
log_error!(
395+
self.logger,
396+
"Liquidity source is not configured for manual handling of events!"
397+
);
398+
return Err(Error::LiquiditySourceUnavailable);
399+
}
400+
Ok(self.liquidity_manager.next_event_async().await)
401+
}
402+
376403
pub(crate) async fn handle_next_event(&self) {
377-
match self.liquidity_manager.next_event_async().await {
404+
self.process_event_default(self.liquidity_manager.next_event_async().await).await;
405+
}
406+
407+
pub(crate) async fn process_event_default(&self, event: LiquidityEvent) {
408+
match event {
378409
LiquidityEvent::LSPS1Client(LSPS1ClientEvent::SupportedOptionsReady {
379410
request_id,
380411
counterparty_node_id,

tests/integration_tests_rust.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,6 +1799,158 @@ fn lsps2_client_service_integration() {
17991799
assert_eq!(channel_value_sats, expected_channel_size_sat);
18001800
}
18011801

1802+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1803+
async fn lsps2_client_service_integration_manual_event_request() {
1804+
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
1805+
1806+
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
1807+
1808+
let sync_config = EsploraSyncConfig { background_sync_config: None };
1809+
1810+
// Setup three nodes: service, client, and payer
1811+
let channel_opening_fee_ppm = 10_000;
1812+
let channel_over_provisioning_ppm = 100_000;
1813+
let lsps2_service_config = LSPS2ServiceConfig {
1814+
require_token: None,
1815+
advertise_service: false,
1816+
channel_opening_fee_ppm,
1817+
channel_over_provisioning_ppm,
1818+
max_payment_size_msat: 1_000_000_000,
1819+
min_payment_size_msat: 0,
1820+
min_channel_lifetime: 100,
1821+
min_channel_opening_fee_msat: 0,
1822+
max_client_to_self_delay: 1024,
1823+
};
1824+
1825+
let service_config = random_config(true);
1826+
setup_builder!(service_builder, service_config.node_config);
1827+
service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config));
1828+
service_builder.set_liquidity_provider_lsps2(lsps2_service_config);
1829+
service_builder.set_manually_handle_liquidity_requests(true);
1830+
let service_node = Arc::new(service_builder.build().unwrap());
1831+
service_node.start().unwrap();
1832+
1833+
let service_node_id = service_node.node_id();
1834+
let service_addr = service_node.listening_addresses().unwrap().first().unwrap().clone();
1835+
1836+
let client_config = random_config(true);
1837+
setup_builder!(client_builder, client_config.node_config);
1838+
client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config));
1839+
client_builder.set_liquidity_source_lsps2(service_node_id, service_addr, None);
1840+
client_builder.set_manually_handle_liquidity_requests(true);
1841+
let client_node = Arc::new(client_builder.build().unwrap());
1842+
client_node.start().unwrap();
1843+
1844+
let payer_config = random_config(true);
1845+
setup_builder!(payer_builder, payer_config.node_config);
1846+
payer_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config));
1847+
let payer_node = payer_builder.build().unwrap();
1848+
payer_node.start().unwrap();
1849+
1850+
let service_addr = service_node.onchain_payment().new_address().unwrap();
1851+
let client_addr = client_node.onchain_payment().new_address().unwrap();
1852+
let payer_addr = payer_node.onchain_payment().new_address().unwrap();
1853+
1854+
let premine_amount_sat = 10_000_000;
1855+
1856+
premine_and_distribute_funds(
1857+
&bitcoind.client,
1858+
&electrsd.client,
1859+
vec![service_addr, client_addr, payer_addr],
1860+
Amount::from_sat(premine_amount_sat),
1861+
);
1862+
service_node.sync_wallets().unwrap();
1863+
client_node.sync_wallets().unwrap();
1864+
payer_node.sync_wallets().unwrap();
1865+
1866+
// Spawn tasks to manually handle liquidity events by processing them with defaults
1867+
let node = Arc::clone(&service_node);
1868+
let service_handle = tokio::spawn(async move {
1869+
loop {
1870+
let event = node.liquidity_next_event_async().await.unwrap();
1871+
node.liquidity_process_event_defaults(event).await.unwrap();
1872+
}
1873+
});
1874+
1875+
let node = Arc::clone(&client_node);
1876+
let client_handle = tokio::spawn(async move {
1877+
loop {
1878+
let event = node.liquidity_next_event_async().await.unwrap();
1879+
node.liquidity_process_event_defaults(event).await.unwrap();
1880+
}
1881+
});
1882+
1883+
// Open a channel payer -> service that will allow paying the JIT invoice
1884+
println!("Opening channel payer_node -> service_node!");
1885+
open_channel(&payer_node, &service_node, 5_000_000, false, &electrsd);
1886+
1887+
generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6);
1888+
service_node.sync_wallets().unwrap();
1889+
payer_node.sync_wallets().unwrap();
1890+
1891+
expect_channel_ready_event!(payer_node, service_node.node_id());
1892+
expect_channel_ready_event!(service_node, payer_node.node_id());
1893+
1894+
let invoice_description =
1895+
Bolt11InvoiceDescription::Direct(Description::new(String::from("asdf")).unwrap());
1896+
let jit_amount_msat = 100_000_000;
1897+
1898+
println!("Generating JIT invoice!");
1899+
let jit_invoice = client_node
1900+
.bolt11_payment()
1901+
.receive_via_jit_channel(jit_amount_msat, &invoice_description.into(), 1024, None)
1902+
.unwrap();
1903+
1904+
// Have the payer_node pay the invoice, therby triggering channel open service_node -> client_node.
1905+
println!("Paying JIT invoice!");
1906+
let payment_id = payer_node.bolt11_payment().send(&jit_invoice, None).unwrap();
1907+
1908+
expect_channel_pending_event!(service_node, client_node.node_id());
1909+
expect_channel_ready_event!(service_node, client_node.node_id());
1910+
expect_event!(service_node, PaymentForwarded);
1911+
expect_channel_pending_event!(client_node, service_node.node_id());
1912+
expect_channel_ready_event!(client_node, service_node.node_id());
1913+
1914+
let service_fee_msat = (jit_amount_msat * channel_opening_fee_ppm as u64) / 1_000_000;
1915+
let expected_received_amount_msat = jit_amount_msat - service_fee_msat;
1916+
expect_payment_successful_event!(payer_node, Some(payment_id), None);
1917+
let client_payment_id =
1918+
expect_payment_received_event!(client_node, expected_received_amount_msat).unwrap();
1919+
let client_payment = client_node.payment(&client_payment_id).unwrap();
1920+
match client_payment.kind {
1921+
PaymentKind::Bolt11Jit { counterparty_skimmed_fee_msat, .. } => {
1922+
assert_eq!(counterparty_skimmed_fee_msat, Some(service_fee_msat));
1923+
},
1924+
_ => panic!("Unexpected payment kind"),
1925+
}
1926+
1927+
let expected_channel_overprovisioning_msat =
1928+
(expected_received_amount_msat * channel_over_provisioning_ppm as u64) / 1_000_000;
1929+
let expected_channel_size_sat =
1930+
(expected_received_amount_msat + expected_channel_overprovisioning_msat) / 1000;
1931+
let channel_value_sats = client_node.list_channels().first().unwrap().channel_value_sats;
1932+
assert_eq!(channel_value_sats, expected_channel_size_sat);
1933+
1934+
println!("Generating regular invoice!");
1935+
let invoice_description =
1936+
Bolt11InvoiceDescription::Direct(Description::new(String::from("asdf")).unwrap()).into();
1937+
let amount_msat = 5_000_000;
1938+
let invoice =
1939+
client_node.bolt11_payment().receive(amount_msat, &invoice_description, 1024).unwrap();
1940+
1941+
// Have the payer_node pay the invoice, to check regular forwards service_node -> client_node
1942+
// are working as expected.
1943+
println!("Paying regular invoice!");
1944+
let payment_id = payer_node.bolt11_payment().send(&invoice, None).unwrap();
1945+
expect_payment_successful_event!(payer_node, Some(payment_id), None);
1946+
expect_event!(service_node, PaymentForwarded);
1947+
expect_payment_received_event!(client_node, amount_msat);
1948+
1949+
// Abort the spawned tasks at the end of the test
1950+
service_handle.abort();
1951+
client_handle.abort();
1952+
}
1953+
18021954
#[test]
18031955
fn facade_logging() {
18041956
let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();

0 commit comments

Comments
 (0)