Skip to content

Commit 8dc3d4a

Browse files
authored
Merge pull request #274 from elnosh/custom-records
add custom records to htlc
2 parents 64dd234 + 295aa5c commit 8dc3d4a

File tree

3 files changed

+118
-49
lines changed

3 files changed

+118
-49
lines changed

sim-cli/src/main.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use std::sync::Arc;
33
use clap::Parser;
44
use log::LevelFilter;
55
use sim_cli::parsing::{create_simulation, create_simulation_with_network, parse_sim_params, Cli};
6-
use simln_lib::{latency_interceptor::LatencyIntercepor, sim_node::Interceptor};
6+
use simln_lib::{
7+
latency_interceptor::LatencyIntercepor,
8+
sim_node::{CustomRecords, Interceptor},
9+
};
710
use simple_logger::SimpleLogger;
811
use tokio_util::task::TaskTracker;
912

@@ -38,7 +41,14 @@ async fn main() -> anyhow::Result<()> {
3841
} else {
3942
vec![]
4043
};
41-
create_simulation_with_network(&cli, &sim_params, tasks.clone(), interceptors).await?
44+
create_simulation_with_network(
45+
&cli,
46+
&sim_params,
47+
tasks.clone(),
48+
interceptors,
49+
CustomRecords::default(),
50+
)
51+
.await?
4252
};
4353
let sim2 = sim.clone();
4454

sim-cli/src/parsing.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use log::LevelFilter;
55
use serde::{Deserialize, Serialize};
66
use simln_lib::clock::SimulationClock;
77
use simln_lib::sim_node::{
8-
ln_node_from_graph, populate_network_graph, ChannelPolicy, Interceptor, SimGraph,
9-
SimulatedChannel,
8+
ln_node_from_graph, populate_network_graph, ChannelPolicy, CustomRecords, Interceptor,
9+
SimGraph, SimulatedChannel,
1010
};
1111
use simln_lib::{
1212
cln, cln::ClnNode, eclair, eclair::EclairNode, lnd, lnd::LndNode, serializers,
@@ -229,6 +229,7 @@ pub async fn create_simulation_with_network(
229229
sim_params: &SimParams,
230230
tasks: TaskTracker,
231231
interceptors: Vec<Arc<dyn Interceptor>>,
232+
custom_records: CustomRecords,
232233
) -> Result<(Simulation<SimulationClock>, Vec<ActivityDefinition>), anyhow::Error> {
233234
let cfg: SimulationCfg = SimulationCfg::try_from(cli)?;
234235
let SimParams {
@@ -259,6 +260,7 @@ pub async fn create_simulation_with_network(
259260
channels.clone(),
260261
tasks.clone(),
261262
interceptors,
263+
custom_records,
262264
(shutdown_trigger.clone(), shutdown_listener.clone()),
263265
)
264266
.map_err(|e| SimulationError::SimulatedNetworkError(format!("{:?}", e)))?,

simln-lib/src/sim_node.rs

Lines changed: 102 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,7 @@ async fn handle_intercepted_htlc(
824824
shutdown_listener: Listener,
825825
) -> Result<Result<CustomRecords, ForwardingError>, CriticalError> {
826826
if interceptors.is_empty() {
827-
return Ok(Ok(HashMap::new()));
827+
return Ok(Ok(request.incoming_custom_records));
828828
}
829829

830830
let mut attached_custom_records: CustomRecords = HashMap::new();
@@ -925,6 +925,9 @@ pub struct SimGraph {
925925
/// trigger a shutdown signal to other interceptors.
926926
interceptors: Vec<Arc<dyn Interceptor>>,
927927

928+
/// Custom records that will be added to the first outgoing HTLC in a payment.
929+
default_custom_records: CustomRecords,
930+
928931
/// Shutdown signal that can be used to trigger a shutdown if a critical error occurs. Listener
929932
/// can be used to listen for shutdown signals coming from upstream.
930933
shutdown_signal: (Trigger, Listener),
@@ -936,6 +939,7 @@ impl SimGraph {
936939
graph_channels: Vec<SimulatedChannel>,
937940
tasks: TaskTracker,
938941
interceptors: Vec<Arc<dyn Interceptor>>,
942+
default_custom_records: CustomRecords,
939943
shutdown_signal: (Trigger, Listener),
940944
) -> Result<Self, SimulationError> {
941945
let mut nodes: HashMap<PublicKey, Vec<u64>> = HashMap::new();
@@ -971,6 +975,7 @@ impl SimGraph {
971975
channels: Arc::new(Mutex::new(channels)),
972976
tasks,
973977
interceptors,
978+
default_custom_records,
974979
shutdown_signal,
975980
})
976981
}
@@ -1091,15 +1096,16 @@ impl SimNetwork for SimGraph {
10911096
},
10921097
};
10931098

1094-
self.tasks.spawn(propagate_payment(
1095-
self.channels.clone(),
1099+
self.tasks.spawn(propagate_payment(PropagatePaymentRequest {
1100+
nodes: Arc::clone(&self.channels),
10961101
source,
1097-
path.clone(),
1102+
route: path.clone(),
10981103
payment_hash,
10991104
sender,
1100-
self.interceptors.clone(),
1101-
self.shutdown_signal.clone(),
1102-
));
1105+
interceptors: self.interceptors.clone(),
1106+
custom_records: self.default_custom_records.clone(),
1107+
shutdown_signal: self.shutdown_signal.clone(),
1108+
}));
11031109
}
11041110

11051111
/// lookup_node fetches a node's information and channel capacities.
@@ -1149,13 +1155,14 @@ async fn add_htlcs(
11491155
route: Path,
11501156
payment_hash: PaymentHash,
11511157
interceptors: Vec<Arc<dyn Interceptor>>,
1158+
custom_records: CustomRecords,
11521159
shutdown_listener: Listener,
11531160
) -> Result<Result<(), (Option<usize>, ForwardingError)>, CriticalError> {
11541161
let mut outgoing_node = source;
11551162
let mut outgoing_amount = route.fee_msat() + route.final_value_msat();
11561163
let mut outgoing_cltv = route.hops.iter().map(|hop| hop.cltv_expiry_delta).sum();
11571164

1158-
let mut incoming_custom_records = HashMap::new();
1165+
let mut incoming_custom_records = custom_records;
11591166

11601167
// Tracks the hop index that we need to remove htlcs from on payment completion (both success and failure).
11611168
// Given a payment from A to C, over the route A -- B -- C, this index has the following meanings:
@@ -1237,7 +1244,7 @@ async fn add_htlcs(
12371244
forwarding_node: hop.pubkey,
12381245
payment_hash,
12391246
incoming_htlc: incoming_htlc.clone(),
1240-
incoming_custom_records: incoming_custom_records.clone(),
1247+
incoming_custom_records,
12411248
outgoing_channel_id: next_scid,
12421249
incoming_amount_msat: outgoing_amount,
12431250
outgoing_amount_msat: outgoing_amount - hop.fee_msat,
@@ -1338,43 +1345,47 @@ async fn remove_htlcs(
13381345
Ok(())
13391346
}
13401347

1341-
/// Finds a payment path from the source to destination nodes provided, and propagates the appropriate htlcs through
1342-
/// the simulated network, notifying the sender channel provided of the payment outcome. If a critical error occurs,
1343-
/// ie a breakdown of our state machine, it will still notify the payment outcome and will use the shutdown trigger
1344-
/// to signal that we should exit.
1345-
async fn propagate_payment(
1348+
struct PropagatePaymentRequest {
13461349
nodes: Arc<Mutex<HashMap<ShortChannelID, SimulatedChannel>>>,
13471350
source: PublicKey,
13481351
route: Path,
13491352
payment_hash: PaymentHash,
13501353
sender: Sender<Result<PaymentResult, LightningError>>,
13511354
interceptors: Vec<Arc<dyn Interceptor>>,
1355+
custom_records: CustomRecords,
13521356
shutdown_signal: (Trigger, Listener),
1353-
) {
1357+
}
1358+
1359+
/// Finds a payment path from the source to destination nodes provided, and propagates the appropriate htlcs through
1360+
/// the simulated network, notifying the sender channel provided of the payment outcome. If a critical error occurs,
1361+
/// ie a breakdown of our state machine, it will still notify the payment outcome and will use the shutdown trigger
1362+
/// to signal that we should exit.
1363+
async fn propagate_payment(request: PropagatePaymentRequest) {
13541364
let notify_result = match add_htlcs(
1355-
nodes.clone(),
1356-
source,
1357-
route.clone(),
1358-
payment_hash,
1359-
interceptors.clone(),
1360-
shutdown_signal.1,
1365+
request.nodes.clone(),
1366+
request.source,
1367+
request.route.clone(),
1368+
request.payment_hash,
1369+
request.interceptors.clone(),
1370+
request.custom_records,
1371+
request.shutdown_signal.1,
13611372
)
13621373
.await
13631374
{
13641375
Ok(Ok(_)) => {
13651376
// If we successfully added the htlc, go ahead and remove all the htlcs in the route with successful resolution.
13661377
if let Err(e) = remove_htlcs(
1367-
nodes,
1368-
route.hops.len() - 1,
1369-
source,
1370-
route,
1371-
payment_hash,
1378+
request.nodes,
1379+
request.route.hops.len() - 1,
1380+
request.source,
1381+
request.route,
1382+
request.payment_hash,
13721383
true,
1373-
interceptors,
1384+
request.interceptors,
13741385
)
13751386
.await
13761387
{
1377-
shutdown_signal.0.trigger();
1388+
request.shutdown_signal.0.trigger();
13781389
log::error!("Could not remove htlcs from channel: {e}.");
13791390
}
13801391
PaymentResult {
@@ -1387,35 +1398,35 @@ async fn propagate_payment(
13871398
// state. It's possible that we failed with the very first add, and then we don't need to clean anything up.
13881399
if let Some(resolution_idx) = fail_idx {
13891400
if remove_htlcs(
1390-
nodes,
1401+
request.nodes,
13911402
resolution_idx,
1392-
source,
1393-
route,
1394-
payment_hash,
1403+
request.source,
1404+
request.route,
1405+
request.payment_hash,
13951406
false,
1396-
interceptors,
1407+
request.interceptors,
13971408
)
13981409
.await
13991410
.is_err()
14001411
{
1401-
shutdown_signal.0.trigger();
1412+
request.shutdown_signal.0.trigger();
14021413
}
14031414
}
14041415

14051416
log::debug!(
14061417
"Forwarding failure for simulated payment {}: {fwd_err}",
1407-
hex::encode(payment_hash.0)
1418+
hex::encode(request.payment_hash.0)
14081419
);
14091420
PaymentResult {
14101421
htlc_count: 0,
14111422
payment_outcome: PaymentOutcome::Unknown,
14121423
}
14131424
},
14141425
Err(critical_err) => {
1415-
shutdown_signal.0.trigger();
1426+
request.shutdown_signal.0.trigger();
14161427
log::debug!(
14171428
"Critical error in simulated payment {}: {critical_err}",
1418-
hex::encode(payment_hash.0)
1429+
hex::encode(request.payment_hash.0)
14191430
);
14201431
PaymentResult {
14211432
htlc_count: 0,
@@ -1424,7 +1435,7 @@ async fn propagate_payment(
14241435
},
14251436
};
14261437

1427-
if let Err(e) = sender.send(Ok(notify_result)) {
1438+
if let Err(e) = request.sender.send(Ok(notify_result)) {
14281439
log::error!("Could not notify payment result: {:?}.", e);
14291440
}
14301441
}
@@ -1962,7 +1973,11 @@ mod tests {
19621973
/// Alice (100) --- (0) Bob (100) --- (0) Carol (100) --- (0) Dave
19631974
///
19641975
/// The nodes pubkeys in this chain of channels are provided in-order for easy access.
1965-
async fn new(capacity: u64, interceptors: Vec<Arc<dyn Interceptor>>) -> Self {
1976+
async fn new(
1977+
capacity: u64,
1978+
interceptors: Vec<Arc<dyn Interceptor>>,
1979+
custom_records: CustomRecords,
1980+
) -> Self {
19661981
let shutdown_signal = triggered::trigger();
19671982
let channels = create_simulated_channels(3, capacity);
19681983
let routing_graph = Arc::new(
@@ -1989,6 +2004,7 @@ mod tests {
19892004
channels.clone(),
19902005
TaskTracker::new(),
19912006
interceptors,
2007+
custom_records,
19922008
shutdown_signal,
19932009
)
19942010
.expect("could not create test graph"),
@@ -2069,7 +2085,8 @@ mod tests {
20692085
#[tokio::test]
20702086
async fn test_successful_dispatch() {
20712087
let chan_capacity = 500_000_000;
2072-
let mut test_kit = DispatchPaymentTestKit::new(chan_capacity, vec![]).await;
2088+
let mut test_kit =
2089+
DispatchPaymentTestKit::new(chan_capacity, vec![], CustomRecords::default()).await;
20732090

20742091
// Send a payment that should succeed from Alice -> Dave.
20752092
let mut amt = 20_000;
@@ -2134,7 +2151,8 @@ mod tests {
21342151
#[tokio::test]
21352152
async fn test_successful_multi_hop() {
21362153
let chan_capacity = 500_000_000;
2137-
let mut test_kit = DispatchPaymentTestKit::new(chan_capacity, vec![]).await;
2154+
let mut test_kit =
2155+
DispatchPaymentTestKit::new(chan_capacity, vec![], CustomRecords::default()).await;
21382156

21392157
// Send a payment that should succeed from Alice -> Dave.
21402158
let amt = 20_000;
@@ -2164,7 +2182,8 @@ mod tests {
21642182
#[tokio::test]
21652183
async fn test_single_hop_payments() {
21662184
let chan_capacity = 500_000_000;
2167-
let mut test_kit = DispatchPaymentTestKit::new(chan_capacity, vec![]).await;
2185+
let mut test_kit =
2186+
DispatchPaymentTestKit::new(chan_capacity, vec![], CustomRecords::default()).await;
21682187

21692188
// Send a single hop payment from Alice -> Bob, it will succeed because Alice has all the liquidity.
21702189
let amt = 150_000;
@@ -2196,7 +2215,8 @@ mod tests {
21962215
#[tokio::test]
21972216
async fn test_multi_hop_faiulre() {
21982217
let chan_capacity = 500_000_000;
2199-
let mut test_kit = DispatchPaymentTestKit::new(chan_capacity, vec![]).await;
2218+
let mut test_kit =
2219+
DispatchPaymentTestKit::new(chan_capacity, vec![], CustomRecords::default()).await;
22002220

22012221
// Drain liquidity between Bob and Carol to force failures on Bob's outgoing linke.
22022222
test_kit
@@ -2242,7 +2262,7 @@ mod tests {
22422262
channel_id: ShortChannelID(0),
22432263
index: 0,
22442264
},
2245-
incoming_custom_records: CustomRecords::new(),
2265+
incoming_custom_records: CustomRecords::default(),
22462266
outgoing_channel_id: None,
22472267
incoming_amount_msat: 0,
22482268
outgoing_amount_msat: 0,
@@ -2404,7 +2424,8 @@ mod tests {
24042424
.returning(|_| Ok(()));
24052425

24062426
let mock_1 = Arc::new(mock_interceptor_1);
2407-
let mut test_kit = DispatchPaymentTestKit::new(500_000_000, vec![mock_1]).await;
2427+
let mut test_kit =
2428+
DispatchPaymentTestKit::new(500_000_000, vec![mock_1], CustomRecords::default()).await;
24082429
let (_, result) = test_kit
24092430
.send_test_payment(test_kit.nodes[0], test_kit.nodes[3], 150_000_000)
24102431
.await;
@@ -2425,4 +2446,40 @@ mod tests {
24252446
test_kit.graph.tasks.close();
24262447
test_kit.graph.tasks.wait().await;
24272448
}
2449+
2450+
/// Tests custom records set for interceptors in multi-hop payment.
2451+
#[tokio::test]
2452+
async fn test_custom_records() {
2453+
let custom_records = HashMap::from([(1000, vec![1])]);
2454+
2455+
let mut mock_interceptor_1 = MockTestInterceptor::new();
2456+
let custom_records_clone = custom_records.clone();
2457+
mock_interceptor_1
2458+
.expect_intercept_htlc()
2459+
.withf(move |req: &InterceptRequest| {
2460+
// Check custom records passed to interceptor are the default ones set.
2461+
req.incoming_custom_records == custom_records_clone
2462+
})
2463+
.returning(|_| Ok(Ok(CustomRecords::default()))); // Set empty records for 2nd hop.
2464+
mock_interceptor_1
2465+
.expect_notify_resolution()
2466+
.returning(|_| Ok(()))
2467+
.times(2);
2468+
2469+
mock_interceptor_1
2470+
.expect_intercept_htlc()
2471+
.withf(move |req: &InterceptRequest| {
2472+
// On this 2nd hop, the custom records should be empty.
2473+
req.incoming_custom_records == CustomRecords::default()
2474+
})
2475+
.returning(|_| Ok(Ok(CustomRecords::default())));
2476+
2477+
let chan_capacity = 500_000_000;
2478+
let mock_1: Arc<dyn Interceptor> = Arc::new(mock_interceptor_1);
2479+
let mut test_kit =
2480+
DispatchPaymentTestKit::new(chan_capacity, vec![mock_1], custom_records).await;
2481+
let _ = test_kit
2482+
.send_test_payment(test_kit.nodes[0], test_kit.nodes[2], 150_000)
2483+
.await;
2484+
}
24282485
}

0 commit comments

Comments
 (0)