@@ -103,6 +103,8 @@ def is_healthy(self):
103103async def isClusterReady (app ):
104104 sn_count = 0
105105 dn_count = 0
106+ active_sn_ids = app ["active_sn_ids" ]
107+ active_dn_ids = app ["active_dn_ids" ]
106108 target_sn_count = await getTargetNodeCount (app , "sn" )
107109 target_dn_count = await getTargetNodeCount (app , "dn" )
108110 last_create_time = None
@@ -115,9 +117,11 @@ async def isClusterReady(app):
115117 if last_create_time is None or node .create_time > last_create_time :
116118 last_create_time = node .create_time
117119 if node .type == "sn" :
118- sn_count += 1
120+ if node_id in active_sn_ids :
121+ sn_count += 1
119122 else :
120- dn_count += 1
123+ if node_id in active_dn_ids :
124+ dn_count += 1
121125 if sn_count == 0 or dn_count == 0 :
122126 log .debug ("no nodes, cluster not ready" )
123127 return False
@@ -171,6 +175,20 @@ async def info(request):
171175 return resp
172176
173177
178+ def getNodeUrls (nodes , node_ids ):
179+ """ return a list of node urls for the given set of node ids """
180+
181+ node_urls = []
182+ for node_id in node_ids :
183+ if node_id :
184+ node = nodes [node_id ]
185+ node_url = f"http://{ node .host } :{ node .port } "
186+ node_urls .append (node_url )
187+ else :
188+ node_urls .append (None )
189+ return node_urls
190+
191+
174192async def register (request ):
175193 """HTTP method for nodes to register with head node"""
176194 app = request .app
@@ -208,7 +226,7 @@ async def register(request):
208226 log .debug ("register - get ip/port from request.transport" )
209227 peername = request .transport .get_extra_info ("peername" )
210228 if peername is None :
211- msg = "Can not determine caller IP"
229+ msg = "Cannot determine caller IP"
212230 log .error (msg )
213231 raise HTTPBadRequest (reason = msg )
214232 if peername [0 ] is None or peername [0 ] in ("::1" , "127.0.0.1" ):
@@ -255,10 +273,34 @@ async def register(request):
255273 node_host = node_host ,
256274 node_port = node_port ,
257275 )
258- # delete any existing node with the same port
276+ # delete any existing node with the same port and IP
259277 removeNode (app , host = node_host , port = node_port )
260278 nodes [node_id ] = node
261279
280+ # add to the active list if there's an open slot
281+ if node_type == "sn" :
282+ active_list = app ["active_sn_ids" ]
283+ else :
284+ active_list = app ["active_dn_ids" ]
285+
286+ tgt_count = len (active_list )
287+ active_count = sum (id is not None for id in active_list )
288+ if tgt_count == active_count :
289+ # all the slots are filled, see if there is any unhealthy node
290+ # and remove that
291+ for i in range (len (active_list )):
292+ id = active_list [i ]
293+ node = nodes [id ]
294+ if not node .is_healthy ():
295+ active_list [i ] = None # clear the slot
296+ break
297+
298+ for i in range (len (active_list )):
299+ if not active_list [i ]:
300+ log .info (f"Node { node_id } added to { node_type } active list in slot: { i } " )
301+ active_list [i ] = node_id
302+ break
303+
262304 resp = StreamResponse ()
263305 resp .headers ["Content-Type" ] = "application/json"
264306 answer = {}
@@ -267,38 +309,14 @@ async def register(request):
267309 answer ["cluster_state" ] = "READY"
268310 else :
269311 answer ["cluster_state" ] = "WAITING"
270- sn_urls = []
271- dn_urls = []
272- sn_ids = []
273- dn_ids = []
274- for node_id in nodes :
275- node = nodes [node_id ]
276- if not node .is_healthy ():
277- continue
278- node_url = f"http://{ node .host } :{ node .port } "
279- if node .type == "sn" :
280- sn_urls .append (node_url )
281- sn_ids .append (node_id )
282- else :
283- dn_urls .append (node_url )
284- dn_ids .append (node_id )
285312
286- # sort dn_urls so node number can be determined
287- dn_id_map = {}
288- for i in range (len (dn_urls )):
289- dn_url = dn_urls [i ]
290- dn_id = dn_ids [i ]
291- dn_id_map [dn_url ] = dn_id
292-
293- dn_urls .sort ()
294- dn_ids = [] # re-arrange to match url order
295- for dn_url in dn_urls :
296- dn_ids .append (dn_id_map [dn_url ])
313+ sn_urls = getNodeUrls (nodes , app ["active_sn_ids" ])
314+ dn_urls = getNodeUrls (nodes , app ["active_dn_ids" ])
297315
316+ answer ["sn_ids" ] = app ["active_sn_ids" ]
298317 answer ["sn_urls" ] = sn_urls
318+ answer ["dn_ids" ] = app ["active_dn_ids" ]
299319 answer ["dn_urls" ] = dn_urls
300- answer ["sn_ids" ] = sn_ids
301- answer ["dn_ids" ] = dn_ids
302320 answer ["req_ip" ] = node_host
303321 log .debug (f"register returning: { answer } " )
304322 app ["last_health_check" ] = int (time .time ())
@@ -410,7 +428,7 @@ async def nodeinfo(request):
410428async def getTargetNodeCount (app , node_type ):
411429
412430 if node_type == "dn" :
413- key = "target_sn_count "
431+ key = "target_dn_count "
414432 elif node_type == "sn" :
415433 key = "target_sn_count"
416434 else :
@@ -430,7 +448,12 @@ async def getTargetNodeCount(app, node_type):
430448def getActiveNodeCount (app , node_type ):
431449 count = 0
432450 nodes = app ["nodes" ]
433- for node_id in nodes :
451+ if node_type == "sn" :
452+ active_list = app ["active_sn_ids" ]
453+ else :
454+ active_list = app ["active_dn_ids" ]
455+
456+ for node_id in active_list :
434457 node = nodes [node_id ]
435458 if node .type != node_type :
436459 continue
@@ -462,8 +485,6 @@ async def init():
462485
463486 app ["head_port" ] = config .get ("head_port" )
464487
465- nodes = {}
466-
467488 # check to see if we are running in a DCOS cluster
468489 if "MARATHON_APP_ID" in os .environ :
469490 msg = "Found MARATHON_APP_ID environment variable, setting "
@@ -473,7 +494,12 @@ async def init():
473494 else :
474495 log .info ("not setting is_dcos" )
475496
476- app ["nodes" ] = nodes
497+ target_sn_count = await getTargetNodeCount (app , "sn" )
498+ target_dn_count = await getTargetNodeCount (app , "dn" )
499+
500+ app ["nodes" ] = {}
501+ app ["active_sn_ids" ] = [None , ] * target_sn_count
502+ app ["active_dn_ids" ] = [None , ] * target_dn_count
477503 app ["dead_node_ids" ] = set ()
478504 app ["start_time" ] = int (time .time ()) # seconds after epoch
479505 app ["last_health_check" ] = 0
0 commit comments