11use crate :: bpv7:: bundle:: * ;
22use crate :: cla:: manager:: ClaManager ;
3+ use crate :: cla:: TcpPeer ;
34use crate :: config:: { generate_creation_timestamp, Config } ;
5+ use crate :: consts:: BUNDLES_DIR ;
46use crate :: routing:: algorithm:: {
5- ConvergenceSender , RouteEntry , RoutingAlgorithm , RoutingConfig , RoutingTable , TcpSender ,
7+ ClaPeer , RouteEntry , RoutingAlgorithm , RoutingConfig , RoutingTable ,
68} ;
79use crate :: store:: bundle_descriptor:: BundleDescriptor ;
810use crate :: store:: BundleStore ;
911use std:: sync:: { Arc , Mutex } ;
12+ use tokio:: sync:: Mutex as TokioMutex ;
1013
1114use super :: BundleStatus ;
1215
1316/// DTN Node API for managing DTN bundles and network operations
1417pub struct DtnNode {
1518 store : BundleStore ,
1619 store_path : String ,
17- routing_algorithm : Arc < Mutex < Box < dyn RoutingAlgorithm > > > ,
20+ routing_algorithm : Arc < TokioMutex < Box < dyn RoutingAlgorithm > > > ,
1821 routing_table : Arc < Mutex < RoutingTable > > ,
22+ cla_manager : Arc < ClaManager > ,
1923}
2024
2125impl DtnNode {
@@ -26,7 +30,7 @@ impl DtnNode {
2630 Ok ( path) => path,
2731 Err ( _) => match Config :: load ( ) {
2832 Ok ( config) => config. storage . path ,
29- Err ( _) => "./bundles" . to_string ( ) ,
33+ Err ( _) => BUNDLES_DIR . to_string ( ) ,
3034 } ,
3135 } ;
3236
@@ -38,20 +42,22 @@ impl DtnNode {
3842 let store = BundleStore :: new ( store_path) ?;
3943 let config = Config :: load ( ) ?;
4044 let routing_config = RoutingConfig :: new ( config. get_routing_algorithm_type ( ) ) ;
41- let routing_algorithm = Arc :: new ( Mutex :: new ( routing_config. create_algorithm ( ) ) ) ;
45+ let routing_algorithm = Arc :: new ( TokioMutex :: new ( routing_config. create_algorithm ( ) ) ) ;
4246 let routing_table = Arc :: new ( Mutex :: new ( RoutingTable :: new ( ) ) ) ;
47+ let cla_manager = Arc :: new ( ClaManager :: new ( |_bundle| { } ) ) ;
4348
4449 Ok ( Self {
4550 store,
4651 store_path : store_path. to_string ( ) ,
4752 routing_algorithm,
4853 routing_table,
54+ cla_manager,
4955 } )
5056 }
5157
5258 /// Create a new DTN CLI instance with custom configuration
5359 pub fn with_config ( store_path : Option < & str > ) -> anyhow:: Result < Self > {
54- let path = store_path. unwrap_or ( "./bundles" ) ;
60+ let path = store_path. unwrap_or ( BUNDLES_DIR ) ;
5561 Self :: with_store_path ( path)
5662 }
5763
@@ -61,14 +67,16 @@ impl DtnNode {
6167 routing_config : RoutingConfig ,
6268 ) -> anyhow:: Result < Self > {
6369 let store = BundleStore :: new ( store_path) ?;
64- let routing_algorithm = Arc :: new ( Mutex :: new ( routing_config. create_algorithm ( ) ) ) ;
70+ let routing_algorithm = Arc :: new ( TokioMutex :: new ( routing_config. create_algorithm ( ) ) ) ;
6571 let routing_table = Arc :: new ( Mutex :: new ( RoutingTable :: new ( ) ) ) ;
72+ let cla_manager = Arc :: new ( ClaManager :: new ( |_bundle| { } ) ) ;
6673
6774 Ok ( Self {
6875 store,
6976 store_path : store_path. to_string ( ) ,
7077 routing_algorithm,
7178 routing_table,
79+ cla_manager,
7280 } )
7381 }
7482
@@ -82,6 +90,11 @@ impl DtnNode {
8290 }
8391 }
8492
93+ /// Get access to the routing table for advanced operations
94+ pub fn get_routing_table ( & self ) -> Arc < Mutex < RoutingTable > > {
95+ Arc :: clone ( & self . routing_table )
96+ }
97+
8598 /// Get all routes from the routing table
8699 pub fn get_all_routes ( & self ) -> anyhow:: Result < Vec < RouteEntry > > {
87100 if let Ok ( table) = self . routing_table . lock ( ) {
@@ -104,7 +117,7 @@ impl DtnNode {
104117 }
105118
106119 /// Insert a new bundle with the given message
107- pub fn insert_bundle ( & self , message : String ) -> anyhow:: Result < ( ) > {
120+ pub async fn insert_bundle ( & self , message : String ) -> anyhow:: Result < ( ) > {
108121 #[ cfg( test) ]
109122 let config = {
110123 // In tests, use a slightly different timestamp each time to avoid duplicates
@@ -130,58 +143,51 @@ impl DtnNode {
130143
131144 // Notify routing algorithm about new bundle
132145 let descriptor = BundleDescriptor :: new ( bundle) ;
133- if let Ok ( mut algorithm) = self . routing_algorithm . lock ( ) {
134- algorithm. notify_new_bundle ( & descriptor) ;
135- }
146+ let mut algorithm = self . routing_algorithm . lock ( ) . await ;
147+ algorithm. notify_new_bundle ( & descriptor) ;
136148
137149 Ok ( ( ) )
138150 }
139151
140152 /// Select peers for forwarding a bundle (legacy method)
141- pub fn select_peers_for_forwarding (
153+ pub async fn select_peers_for_forwarding (
142154 & self ,
143155 bundle : & Bundle ,
144- ) -> anyhow:: Result < Vec < Box < dyn ConvergenceSender > > > {
156+ ) -> anyhow:: Result < Vec < Box < dyn ClaPeer > > > {
145157 let descriptor = BundleDescriptor :: new ( bundle. clone ( ) ) ;
146158
147- // For now, create some dummy senders for demonstration
148- // In a real implementation, this would come from the CLA manager
149- let senders: Vec < Box < dyn ConvergenceSender > > = vec ! [
150- Box :: new( TcpSender :: new( crate :: bpv7:: EndpointId :: from( "dtn://peer1" ) ) ) ,
151- Box :: new( TcpSender :: new( crate :: bpv7:: EndpointId :: from( "dtn://peer2" ) ) ) ,
152- ] ;
153-
154- if let Ok ( algorithm) = self . routing_algorithm . lock ( ) {
155- let selected_refs = algorithm. select_peers_for_forwarding ( & descriptor, & senders) ;
156-
157- // Convert references back to owned boxes (this is a bit awkward, but necessary for the trait)
158- let result = selected_refs
159- . into_iter ( )
160- . map ( |sender_ref| {
161- let eid = sender_ref. get_peer_endpoint_id ( ) ;
162- Box :: new ( TcpSender :: new ( eid) ) as Box < dyn ConvergenceSender >
163- } )
164- . collect ( ) ;
159+ // Get peers from CLA manager (dummy for now)
160+ let peers = self . cla_manager . list_peers ( ) . await ;
165161
166- Ok ( result)
167- } else {
168- anyhow:: bail!( "Failed to lock routing algorithm" )
169- }
162+ let algorithm = self . routing_algorithm . lock ( ) . await ;
163+ let selected_refs = algorithm. select_peers_for_forwarding ( & descriptor, & peers) ;
164+
165+ // Convert references back to owned boxes (this is a bit awkward, but necessary for the trait)
166+ let result = selected_refs
167+ . into_iter ( )
168+ . map ( |peer_ref| {
169+ let eid = peer_ref. get_peer_endpoint_id ( ) ;
170+ let address = peer_ref. get_connection_address ( ) ;
171+ Box :: new ( TcpPeer :: new ( eid, address) ) as Box < dyn ClaPeer >
172+ } )
173+ . collect ( ) ;
174+
175+ Ok ( result)
170176 }
171177
172178 /// Select routes for forwarding a bundle (new method using routing table)
173- pub fn select_routes_for_forwarding ( & self , bundle : & Bundle ) -> anyhow:: Result < Vec < RouteEntry > > {
179+ pub async fn select_routes_for_forwarding (
180+ & self ,
181+ bundle : & Bundle ,
182+ ) -> anyhow:: Result < Vec < RouteEntry > > {
174183 let descriptor = BundleDescriptor :: new ( bundle. clone ( ) ) ;
175184
176- if let Ok ( algorithm) = self . routing_algorithm . lock ( ) {
177- if let Ok ( table) = self . routing_table . lock ( ) {
178- let routes = algorithm. select_routes_for_forwarding ( & descriptor, & table) ;
179- Ok ( routes)
180- } else {
181- anyhow:: bail!( "Failed to lock routing table" )
182- }
185+ let algorithm = self . routing_algorithm . lock ( ) . await ;
186+ if let Ok ( table) = self . routing_table . lock ( ) {
187+ let routes = algorithm. select_routes_for_forwarding ( & descriptor, & table) ;
188+ Ok ( routes)
183189 } else {
184- anyhow:: bail!( "Failed to lock routing algorithm " )
190+ anyhow:: bail!( "Failed to lock routing table " )
185191 }
186192 }
187193
@@ -261,7 +267,7 @@ impl DtnNode {
261267
262268 /// Start a TCP dialer daemon
263269 pub async fn start_tcp_dialer ( & self , target_addr : String ) -> anyhow:: Result < ( ) > {
264- let cla = Arc :: new ( crate :: cla:: TcpClaDialer { target_addr } ) ;
270+ let cla = Arc :: new ( crate :: cla:: TcpClaClient { target_addr } ) ;
265271 let manager = ClaManager :: new ( |bundle| {
266272 println ! ( "📤 Should not receive here (Dialer): {:?}" , bundle) ;
267273 } ) ;
@@ -270,6 +276,34 @@ impl DtnNode {
270276 tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 2 ) ) . await ;
271277 Ok ( ( ) )
272278 }
279+
280+ /// Select peers for forwarding a bundle with connectivity check (async version)
281+ pub async fn select_peers_for_forwarding_async (
282+ & self ,
283+ bundle : & Bundle ,
284+ ) -> anyhow:: Result < Vec < Box < dyn ClaPeer > > > {
285+ let descriptor = BundleDescriptor :: new ( bundle. clone ( ) ) ;
286+
287+ // Get peers from CLA manager (dummy for now)
288+ let peers = self . cla_manager . list_peers ( ) . await ;
289+
290+ let algorithm = self . routing_algorithm . lock ( ) . await ;
291+ let selected_refs = algorithm
292+ . select_peers_for_forwarding_async ( & descriptor, & peers)
293+ . await ;
294+
295+ // Convert references back to owned boxes
296+ let result = selected_refs
297+ . into_iter ( )
298+ . map ( |peer_ref| {
299+ let eid = peer_ref. get_peer_endpoint_id ( ) ;
300+ let address = peer_ref. get_connection_address ( ) ;
301+ Box :: new ( TcpPeer :: new ( eid, address) ) as Box < dyn ClaPeer >
302+ } )
303+ . collect ( ) ;
304+
305+ Ok ( result)
306+ }
273307}
274308
275309/// Default implementation for DtnNode
0 commit comments