1+ import dgram from 'node:dgram' ;
2+ import crypto from 'node:crypto' ;
13import { Schema , model } from 'mongoose' ;
24import uniqueValidator from 'mongoose-unique-validator' ;
5+ import { LOG_WARN } from '@/logger' ;
36import type { IServer , IServerConnectInfo , IServerMethods , ServerModel } from '@/types/mongoose/server' ;
47
8+ // * Kinda ugly to slap this in with the Mongoose stuff but it's fine for now
9+ // TODO - Maybe move this one day?
10+ const socket = dgram . createSocket ( 'udp4' ) ;
11+ const pendingHealthCheckRequests = new Map < string , ( ) => void > ( ) ;
12+
13+ socket . on ( 'message' , ( msg : Buffer , _rinfo : dgram . RemoteInfo ) => {
14+ const uuid = msg . toString ( ) ;
15+ const resolve = pendingHealthCheckRequests . get ( uuid ) ;
16+
17+ if ( resolve ) {
18+ resolve ( ) ;
19+ }
20+ } ) ;
21+
22+ socket . bind ( ) ;
23+
24+ function healthCheck ( target : { host : string ; port : number } ) : Promise < string > {
25+ return new Promise ( ( resolve , reject ) => {
26+ const uuid = crypto . randomUUID ( ) ;
27+
28+ const timeout = setTimeout ( ( ) => {
29+ pendingHealthCheckRequests . delete ( uuid ) ;
30+ reject ( new Error ( 'No valid response received' ) ) ;
31+ } , 2 * 1000 ) ; // TODO - Make this configurable? 2 seconds seems fine for now
32+
33+ pendingHealthCheckRequests . set ( uuid , ( ) => {
34+ clearTimeout ( timeout ) ;
35+ pendingHealthCheckRequests . delete ( uuid ) ;
36+ resolve ( target . host ) ;
37+ } ) ;
38+
39+ socket . send ( Buffer . from ( uuid ) , target . port , target . host , ( error ) => {
40+ if ( error ) {
41+ clearTimeout ( timeout ) ;
42+ pendingHealthCheckRequests . delete ( uuid ) ;
43+ reject ( error ) ;
44+ }
45+ } ) ;
46+ } ) ;
47+ }
48+
549const ServerSchema = new Schema < IServer , ServerModel , IServerMethods > ( {
650 client_id : String ,
751 ip : {
@@ -20,7 +64,11 @@ const ServerSchema = new Schema<IServer, ServerModel, IServerMethods>({
2064 access_mode : String ,
2165 maintenance_mode : Boolean ,
2266 device : Number ,
23- aes_key : String
67+ aes_key : String ,
68+ health_check_port : {
69+ type : Number ,
70+ required : false
71+ }
2472} ) ;
2573
2674ServerSchema . plugin ( uniqueValidator , { message : '{PATH} already in use.' } ) ;
@@ -31,9 +79,43 @@ ServerSchema.method('getServerConnectInfo', async function (): Promise<IServerCo
3179 throw new Error ( `No IP configured for server ${ this . _id } ` ) ;
3280 }
3381
34- const randomIp = ipList [ Math . floor ( Math . random ( ) * ipList . length ) ] ;
82+ const randomIP = ipList [ Math . floor ( Math . random ( ) * ipList . length ) ] ;
83+
84+ if ( ! this . health_check_port ) {
85+ return {
86+ ip : randomIP ,
87+ port : this . port
88+ } ;
89+ }
90+
91+ // * Remove the random IP from the race pool to remove the duplicate health check
92+ const healthCheckTargets = ipList . filter ( ip => ip !== randomIP ) . map ( ip => ( {
93+ host : ip ,
94+ port : this . health_check_port !
95+ } ) ) ;
96+
97+ // * Default to the random IP in case nothing responded in time
98+ // * and just Hope For The Best:tm:
99+ let target = randomIP ;
100+
101+ // * Check the random IP and start the race at the same time, preferring
102+ // * the result of the random IP should it succeed. Worst case scenario
103+ // * this takes 2 seconds to complete
104+ const [ randomResult , raceResult ] = await Promise . allSettled ( [
105+ healthCheck ( { host : randomIP , port : this . health_check_port ! } ) ,
106+ Promise . race ( healthCheckTargets . map ( target => healthCheck ( target ) ) )
107+ ] ) ;
108+
109+ if ( randomResult . status === 'rejected' ) {
110+ if ( raceResult . status === 'fulfilled' ) {
111+ target = raceResult . value ;
112+ } else {
113+ LOG_WARN ( `Server ${ this . service_name } failed to find healthy NEX server. Using the randomly selected IP ${ target } ` ) ;
114+ }
115+ }
116+
35117 return {
36- ip : randomIp ,
118+ ip : target ,
37119 port : this . port
38120 } ;
39121} ) ;
0 commit comments