@@ -49,6 +49,7 @@ class NodeManager {
4949 protected taskManager : TaskManager ;
5050 protected refreshBucketDelay : number ;
5151 protected refreshBucketDelayJitter : number ;
52+ protected retrySeedConnectionsDelay : number ;
5253 protected pendingNodes : Map < number , Map < string , NodeAddress > > = new Map ( ) ;
5354
5455 public readonly basePath = this . constructor . name ;
@@ -118,6 +119,60 @@ class NodeManager {
118119 } ;
119120 public readonly pingAndSetNodeHandlerId : TaskHandlerId =
120121 `${ this . basePath } .${ this . pingAndSetNodeHandler . name } .pingAndSetNodeHandlerId` as TaskHandlerId ;
122+ protected checkSeedConnectionsHandler : TaskHandler = async (
123+ ctx ,
124+ taskInfo ,
125+ ) => {
126+ this . logger . info ( 'Checking seed connections' ) ;
127+ // Check for existing seed node connections
128+ const seedNodes = this . nodeConnectionManager . getSeedNodes ( ) ;
129+ const allInactive = ! seedNodes
130+ . map ( ( nodeId ) => this . nodeConnectionManager . hasConnection ( nodeId ) )
131+ . reduce ( ( a , b ) => a || b ) ;
132+ try {
133+ if ( allInactive ) {
134+ this . logger . info (
135+ 'No active seed connections were found, retrying network entry' ,
136+ ) ;
137+ // If no seed node connections exist then we redo syncNodeGraph
138+ await this . syncNodeGraph ( true , undefined , ctx ) ;
139+ } else {
140+ // Doing this concurrently, we don't care about the results
141+ await Promise . allSettled (
142+ seedNodes . map ( ( nodeId ) => {
143+ // Retry any failed seed node connections
144+ if ( ! this . nodeConnectionManager . hasConnection ( nodeId ) ) {
145+ this . logger . info (
146+ `Re-establishing seed connection for ${ nodesUtils . encodeNodeId (
147+ nodeId ,
148+ ) } `,
149+ ) ;
150+ return this . nodeConnectionManager . withConnF (
151+ nodeId ,
152+ async ( ) => {
153+ // Do nothing, we just want to establish a connection
154+ } ,
155+ ctx ,
156+ ) ;
157+ }
158+ } ) ,
159+ ) ;
160+ }
161+ } finally {
162+ this . logger . info ( 'Checked seed connections' ) ;
163+ // Re-schedule this task
164+ await this . taskManager . scheduleTask ( {
165+ delay : taskInfo . delay ,
166+ deadline : taskInfo . deadline ,
167+ handlerId : this . checkSeedConnectionsHandlerId ,
168+ lazy : true ,
169+ path : [ this . basePath , this . checkSeedConnectionsHandlerId ] ,
170+ priority : taskInfo . priority ,
171+ } ) ;
172+ }
173+ } ;
174+ public readonly checkSeedConnectionsHandlerId : TaskHandlerId =
175+ `${ this . basePath } .${ this . checkSeedConnectionsHandler . name } .checkSeedConnectionsHandler` as TaskHandlerId ;
121176
122177 constructor ( {
123178 db,
@@ -128,6 +183,7 @@ class NodeManager {
128183 taskManager,
129184 refreshBucketDelay = 3600000 , // 1 hour in milliseconds
130185 refreshBucketDelayJitter = 0.5 , // Multiple of refreshBucketDelay to jitter by
186+ retrySeedConnectionsDelay = 120000 , // 2 minuets
131187 logger,
132188 } : {
133189 db : DB ;
@@ -138,6 +194,7 @@ class NodeManager {
138194 taskManager : TaskManager ;
139195 refreshBucketDelay ?: number ;
140196 refreshBucketDelayJitter ?: number ;
197+ retrySeedConnectionsDelay ?: number ;
141198 longTaskTimeout ?: number ;
142199 logger ?: Logger ;
143200 } ) {
@@ -154,6 +211,7 @@ class NodeManager {
154211 0 ,
155212 Math . min ( refreshBucketDelayJitter , 1 ) ,
156213 ) ;
214+ this . retrySeedConnectionsDelay = retrySeedConnectionsDelay ;
157215 }
158216
159217 public async start ( ) {
@@ -171,7 +229,17 @@ class NodeManager {
171229 this . pingAndSetNodeHandlerId ,
172230 this . pingAndSetNodeHandler ,
173231 ) ;
232+ this . taskManager . registerHandler (
233+ this . checkSeedConnectionsHandlerId ,
234+ this . checkSeedConnectionsHandler ,
235+ ) ;
174236 await this . setupRefreshBucketTasks ( ) ;
237+ await this . taskManager . scheduleTask ( {
238+ delay : this . retrySeedConnectionsDelay ,
239+ handlerId : this . checkSeedConnectionsHandlerId ,
240+ lazy : true ,
241+ path : [ this . basePath , this . checkSeedConnectionsHandlerId ] ,
242+ } ) ;
175243 this . logger . info ( `Started ${ this . constructor . name } ` ) ;
176244 }
177245
@@ -195,6 +263,7 @@ class NodeManager {
195263 this . taskManager . deregisterHandler ( this . refreshBucketHandlerId ) ;
196264 this . taskManager . deregisterHandler ( this . gcBucketHandlerId ) ;
197265 this . taskManager . deregisterHandler ( this . pingAndSetNodeHandlerId ) ;
266+ this . taskManager . deregisterHandler ( this . checkSeedConnectionsHandlerId ) ;
198267 this . logger . info ( `Stopped ${ this . constructor . name } ` ) ;
199268 }
200269
@@ -1044,51 +1113,57 @@ class NodeManager {
10441113 logger . info ( 'Syncing nodeGraph' ) ;
10451114 // Getting the seed node connection information
10461115 const seedNodes = this . nodeConnectionManager . getSeedNodes ( ) ;
1047- // FIXME: remove or uncomment, need to decide
1048- // const addresses = await Promise.all(
1049- // await this.db.withTransactionF(async (tran) =>
1050- // seedNodes.map(
1051- // async (seedNode) =>
1052- // (
1053- // await this.nodeGraph.getNode(seedNode, tran)
1054- // )?.address,
1055- // ),
1056- // ),
1057- // );
1058- // const filteredAddresses = addresses.filter(
1059- // (address) => address != null,
1060- // ) as Array<NodeAddress>;
1061- // logger.info(
1062- // `establishing multi-connection to the following seed nodes ${seedNodes.map(
1063- // (nodeId) => nodesUtils.encodeNodeId(nodeId),
1064- // )}`,
1065- // );
1066- // logger.info(
1067- // `and addresses addresses ${filteredAddresses.map(
1068- // (address) => `${address.host}:${address.port}`,
1069- // )}`,
1070- // );
1071- // // Establishing connections to the seed nodes
1072- // const connections =
1073- // await this.nodeConnectionManager.establishMultiConnection(
1074- // seedNodes,
1075- // filteredAddresses,
1076- // pingTimeout,
1077- // undefined,
1078- // { signal: ctx.signal },
1079- // );
1080- // logger.info(`Multi-connection established for`);
1081- // connections.forEach((address, key) => {
1082- // logger.info(
1083- // `${nodesUtils.encodeNodeId(key)}@${address.host}:${address.port}`,
1084- // );
1085- // });
1116+ const addresses = await Promise . all (
1117+ await this . db . withTransactionF ( async ( tran ) =>
1118+ seedNodes . map (
1119+ async ( seedNode ) =>
1120+ (
1121+ await this . nodeGraph . getNode ( seedNode , tran )
1122+ ) ?. address ,
1123+ ) ,
1124+ ) ,
1125+ ) ;
1126+ const filteredAddresses = addresses . filter (
1127+ ( address ) => address != null ,
1128+ ) as Array < NodeAddress > ;
1129+ logger . info (
1130+ `establishing multi-connection to the following seed nodes ${ seedNodes . map (
1131+ ( nodeId ) => nodesUtils . encodeNodeId ( nodeId ) ,
1132+ ) } `,
1133+ ) ;
1134+ logger . info (
1135+ `and addresses addresses ${ filteredAddresses . map (
1136+ ( address ) => `${ address . host } :${ address . port } ` ,
1137+ ) } `,
1138+ ) ;
1139+ // Establishing connections to the seed nodes
1140+ const connections =
1141+ await this . nodeConnectionManager . establishMultiConnection (
1142+ seedNodes ,
1143+ filteredAddresses ,
1144+ pingTimeout ,
1145+ undefined ,
1146+ { signal : ctx . signal } ,
1147+ ) ;
1148+ logger . info ( `Multi-connection established for` ) ;
1149+ connections . forEach ( ( address , key ) => {
1150+ logger . info (
1151+ `${ nodesUtils . encodeNodeId ( key ) } @${ address . host } :${ address . port } ` ,
1152+ ) ;
1153+ } ) ;
1154+ if ( connections . size === 0 ) {
1155+ // Not explicitly a failure but we do want to stop here
1156+ this . logger . warn (
1157+ 'Failed to connect to any seed nodes when syncing node graph' ,
1158+ ) ;
1159+ return ;
1160+ }
10861161 // Using a map to avoid duplicates
10871162 const closestNodesAll : Map < NodeId , NodeData > = new Map ( ) ;
10881163 const localNodeId = this . keyManager . getNodeId ( ) ;
10891164 let closestNode : NodeId | null = null ;
10901165 logger . info ( 'Getting closest nodes' ) ;
1091- for ( const nodeId of seedNodes ) {
1166+ for ( const [ nodeId ] of connections ) {
10921167 const closestNodes =
10931168 await this . nodeConnectionManager . getRemoteNodeClosestNodes (
10941169 nodeId ,
0 commit comments