1
1
#![ deny( rustdoc:: broken_intra_doc_links) ]
2
2
3
+ use self :: clock:: Clock ;
3
4
use async_trait:: async_trait;
4
5
use bitcoin:: secp256k1:: PublicKey ;
5
6
use bitcoin:: Network ;
@@ -545,7 +546,7 @@ impl SimulationCfg {
545
546
/// The simulator can execute both predefined payment patterns and generate random payment activity
546
547
/// based on configuration parameters.
547
548
#[ derive( Clone ) ]
548
- pub struct Simulation {
549
+ pub struct Simulation < C : Clock + ' static > {
549
550
/// Config for the simulation itself.
550
551
cfg : SimulationCfg ,
551
552
/// The lightning node that is being simulated.
@@ -558,6 +559,8 @@ pub struct Simulation {
558
559
/// High level triggers used to manage simulation tasks and shutdown.
559
560
shutdown_trigger : Trigger ,
560
561
shutdown_listener : Listener ,
562
+ /// Clock for the simulation.
563
+ clock : Arc < C > ,
561
564
}
562
565
563
566
/// Configuration for writing simulation results to CSV files.
@@ -579,11 +582,12 @@ struct ExecutorKit {
579
582
payment_generator : Box < dyn PaymentGenerator > ,
580
583
}
581
584
582
- impl Simulation {
585
+ impl < C : Clock + ' static > Simulation < C > {
583
586
pub fn new (
584
587
cfg : SimulationCfg ,
585
588
nodes : HashMap < PublicKey , Arc < Mutex < dyn LightningNode > > > ,
586
589
tasks : TaskTracker ,
590
+ clock : Arc < C > ,
587
591
shutdown_trigger : Trigger ,
588
592
shutdown_listener : Listener ,
589
593
) -> Self {
@@ -594,6 +598,7 @@ impl Simulation {
594
598
tasks,
595
599
shutdown_trigger,
596
600
shutdown_listener,
601
+ clock,
597
602
}
598
603
}
599
604
@@ -771,6 +776,7 @@ impl Simulation {
771
776
if let Some ( total_time) = self . cfg . total_time {
772
777
let shutdown = self . shutdown_trigger . clone ( ) ;
773
778
let listener = self . shutdown_listener . clone ( ) ;
779
+ let clock = self . clock . clone ( ) ;
774
780
775
781
self . tasks . spawn ( async move {
776
782
select ! {
@@ -779,7 +785,7 @@ impl Simulation {
779
785
log:: debug!( "Timeout task exited on listener signal" ) ;
780
786
}
781
787
782
- _ = time :: sleep( total_time) => {
788
+ _ = clock . sleep( total_time) => {
783
789
log:: info!(
784
790
"Simulation run for {}s. Shutting down." ,
785
791
total_time. as_secs( )
@@ -846,23 +852,27 @@ impl Simulation {
846
852
847
853
let result_logger_clone = result_logger. clone ( ) ;
848
854
let result_logger_listener = listener. clone ( ) ;
855
+ let clock = self . clock . clone ( ) ;
849
856
tasks. spawn ( async move {
850
857
log:: debug!( "Starting results logger." ) ;
851
858
run_results_logger (
852
859
result_logger_listener,
853
860
result_logger_clone,
854
861
Duration :: from_secs ( 60 ) ,
862
+ clock,
855
863
)
856
864
. await ;
857
865
log:: debug!( "Exiting results logger." ) ;
858
866
} ) ;
859
867
860
868
// csr: consume simulation results
861
869
let csr_write_results = self . cfg . write_results . clone ( ) ;
870
+ let clock = self . clock . clone ( ) ;
862
871
tasks. spawn ( async move {
863
872
log:: debug!( "Starting simulation results consumer." ) ;
864
873
if let Err ( e) = consume_simulation_results (
865
874
result_logger,
875
+ clock,
866
876
results_receiver,
867
877
listener,
868
878
csr_write_results,
@@ -1003,11 +1013,12 @@ impl Simulation {
1003
1013
let ce_shutdown = self . shutdown_trigger . clone ( ) ;
1004
1014
let ce_output_sender = output_sender. clone ( ) ;
1005
1015
let ce_node = node. clone ( ) ;
1016
+ let clock = self . clock . clone ( ) ;
1006
1017
tasks. spawn ( async move {
1007
1018
let node_info = ce_node. lock ( ) . await . get_info ( ) . clone ( ) ;
1008
1019
log:: debug!( "Starting events consumer for {}." , node_info) ;
1009
1020
if let Err ( e) =
1010
- consume_events ( ce_node, receiver, ce_output_sender, ce_listener) . await
1021
+ consume_events ( ce_node, clock , receiver, ce_output_sender, ce_listener) . await
1011
1022
{
1012
1023
ce_shutdown. trigger ( ) ;
1013
1024
log:: error!( "Event consumer for node {node_info} exited with error: {e:?}." ) ;
@@ -1040,6 +1051,8 @@ impl Simulation {
1040
1051
let pe_shutdown = self . shutdown_trigger . clone ( ) ;
1041
1052
let pe_listener = self . shutdown_listener . clone ( ) ;
1042
1053
let pe_sender = sender. clone ( ) ;
1054
+ let clock = self . clock . clone ( ) ;
1055
+
1043
1056
tasks. spawn ( async move {
1044
1057
let source = executor. source_info . clone ( ) ;
1045
1058
@@ -1053,6 +1066,7 @@ impl Simulation {
1053
1066
executor. source_info ,
1054
1067
executor. network_generator ,
1055
1068
executor. payment_generator ,
1069
+ clock,
1056
1070
pe_sender,
1057
1071
pe_listener,
1058
1072
)
@@ -1074,6 +1088,7 @@ impl Simulation {
1074
1088
/// event being executed is piped into a channel to handle the result of the event.
1075
1089
async fn consume_events (
1076
1090
node : Arc < Mutex < dyn LightningNode > > ,
1091
+ clock : Arc < dyn Clock > ,
1077
1092
mut receiver : Receiver < SimulationEvent > ,
1078
1093
sender : Sender < SimulationOutput > ,
1079
1094
listener : Listener ,
@@ -1095,7 +1110,7 @@ async fn consume_events(
1095
1110
hash: None ,
1096
1111
amount_msat: amt_msat,
1097
1112
destination: dest. pubkey,
1098
- dispatch_time: SystemTime :: now( ) ,
1113
+ dispatch_time: clock . now( ) ,
1099
1114
} ;
1100
1115
1101
1116
let outcome = match node. send_payment( dest. pubkey, amt_msat) . await {
@@ -1157,6 +1172,7 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
1157
1172
source : NodeInfo ,
1158
1173
network_generator : Arc < Mutex < N > > ,
1159
1174
node_generator : Box < A > ,
1175
+ clock : Arc < dyn Clock > ,
1160
1176
sender : Sender < SimulationEvent > ,
1161
1177
listener : Listener ,
1162
1178
) -> Result < ( ) , SimulationError > {
@@ -1180,7 +1196,7 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
1180
1196
} ,
1181
1197
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
1182
1198
// destination.
1183
- _ = time :: sleep( wait) => {
1199
+ _ = clock . sleep( wait) => {
1184
1200
let ( destination, capacity) = network_generator. lock( ) . await . choose_destination( source. pubkey) . map_err( SimulationError :: DestinationGenerationError ) ?;
1185
1201
1186
1202
// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
@@ -1256,13 +1272,14 @@ fn get_payment_delay<A: PaymentGenerator + ?Sized>(
1256
1272
1257
1273
async fn consume_simulation_results (
1258
1274
logger : Arc < Mutex < PaymentResultLogger > > ,
1275
+ clock : Arc < dyn Clock > ,
1259
1276
mut receiver : Receiver < ( Payment , PaymentResult ) > ,
1260
1277
listener : Listener ,
1261
1278
write_results : Option < WriteResults > ,
1262
1279
) -> Result < ( ) , SimulationError > {
1263
1280
let mut writer = match write_results {
1264
1281
Some ( res) => {
1265
- let duration = SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) ?;
1282
+ let duration = clock . now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) ?;
1266
1283
let file = res
1267
1284
. results_dir
1268
1285
. join ( format ! ( "simulation_{:?}.csv" , duration) ) ;
@@ -1361,6 +1378,7 @@ async fn run_results_logger(
1361
1378
listener : Listener ,
1362
1379
logger : Arc < Mutex < PaymentResultLogger > > ,
1363
1380
interval : Duration ,
1381
+ clock : Arc < dyn Clock > ,
1364
1382
) {
1365
1383
log:: info!( "Summary of results will be reported every {:?}." , interval) ;
1366
1384
@@ -1371,7 +1389,7 @@ async fn run_results_logger(
1371
1389
break
1372
1390
}
1373
1391
1374
- _ = time :: sleep( interval) => {
1392
+ _ = clock . sleep( interval) => {
1375
1393
log:: info!( "{}" , logger. lock( ) . await )
1376
1394
}
1377
1395
}
0 commit comments