@@ -16,11 +16,8 @@ class NodeMonitoring(object):
1616
1717 """
1818
19- def __init__ (self , ioc = None ):
20- if isinstance (ioc , GreaseContainer ):
21- self .ioc = ioc
22- else :
23- self .ioc = GreaseContainer ()
19+ def __init__ (self , ioc = GreaseContainer ()):
20+ self .ioc = ioc
2421 self .centralScheduler = Scheduling (self .ioc )
2522 self .scheduler = Scheduler (self .ioc )
2623
@@ -33,47 +30,59 @@ def monitor(self):
3330 """
3431 servers = self .getServers ()
3532 retVal = False
36- self .ioc .getLogger ().debug ("Total servers to monitor [{0}]" .format (len (servers )), trace = True )
33+ self .ioc .getLogger ().debug (
34+ "Total servers to monitor [{0}]" .format (len (servers )), trace = True )
3735 for server in servers :
3836 if self .serverAlive (server .get ('_id' )):
3937 retVal = True
4038 continue
4139 else :
42- self .ioc .getLogger ().warning ("Server [{0}] preparing to be culled from pool" .format (server .get ('_id' )))
43- self .ioc .getLogger ().warning ("Server [{0}] preparing to be deactivated" .format (server .get ('_id' )))
40+ self .ioc .getLogger ().warning (
41+ "Server [{0}] preparing to be culled from pool" .format (server .get ('_id' )))
42+ self .ioc .getLogger ().warning (
43+ "Server [{0}] preparing to be deactivated" .format (server .get ('_id' )))
4444 if not self .deactivateServer (server .get ('_id' )):
4545 self .ioc .getLogger ().error (
46- "Failed deactivating server [{0}]" .format (server .get ('_id' ))
46+ "Failed deactivating server [{0}]" .format (
47+ server .get ('_id' ))
4748 )
4849 retVal = False
4950 break
5051 self .ioc .getLogger ().warning (
51- "Server [{0}] preparing to reallocate detect jobs" .format (server .get ('_id' ))
52+ "Server [{0}] preparing to reallocate detect jobs" .format (
53+ server .get ('_id' ))
5254 )
5355 if not self .rescheduleDetectJobs (server .get ('_id' )):
5456 self .ioc .getLogger ().error (
55- "Failed rescheduling detect jobs [{0}]" .format (server .get ('_id' ))
57+ "Failed rescheduling detect jobs [{0}]" .format (
58+ server .get ('_id' ))
5659 )
5760 retVal = False
5861 break
5962 self .ioc .getLogger ().warning (
60- "Server [{0}] preparing to reallocate schedule jobs" .format (server .get ('_id' ))
63+ "Server [{0}] preparing to reallocate schedule jobs" .format (
64+ server .get ('_id' ))
6165 )
6266 if not self .rescheduleScheduleJobs (server .get ('_id' )):
6367 self .ioc .getLogger ().error (
64- "Failed rescheduling detect jobs [{0}]" .format (server .get ('_id' ))
68+ "Failed rescheduling detect jobs [{0}]" .format (
69+ server .get ('_id' ))
6570 )
6671 retVal = False
6772 break
6873 self .ioc .getLogger ().warning (
69- "Server [{0}] preparing to reallocate jobs" .format (server .get ('_id' ))
74+ "Server [{0}] preparing to reallocate jobs" .format (
75+ server .get ('_id' ))
7076 )
7177 if not self .rescheduleJobs (server .get ('_id' )):
7278 self .ioc .getLogger ().error (
73- "Failed rescheduling detect jobs [{0}]" .format (server .get ('_id' ))
79+ "Failed rescheduling detect jobs [{0}]" .format (
80+ server .get ('_id' ))
7481 )
7582 retVal = False
7683 break
84+
85+ self .schedule_orphans ()
7786 return retVal
7887
7988 def scanComplete (self ):
@@ -118,10 +127,12 @@ def scanComplete(self):
118127 'createTime' : datetime .datetime .utcnow (),
119128 'expiry' : Deduplication .generate_max_expiry_time (1 )
120129 })
121- server = self .ioc .getCollection ('JobServer' ).find_one ({'_id' : ObjectId (self .ioc .getConfig ().NodeIdentity )})
130+ server = self .ioc .getCollection ('JobServer' ).find_one (
131+ {'_id' : ObjectId (self .ioc .getConfig ().NodeIdentity )})
122132 if not server :
123133 self .ioc .getLogger ().critical (
124- "Failed to find server [{0}] after monitoring occurred!" .format (self .ioc .getConfig ().NodeIdentity )
134+ "Failed to find server [{0}] after monitoring occurred!" .format (
135+ self .ioc .getConfig ().NodeIdentity )
125136 )
126137 self .ioc .getCollection ('JobServer' ).update_one ({
127138 '_id' : ObjectId (self .ioc .getConfig ().NodeIdentity )},
@@ -159,7 +170,8 @@ def serverAlive(self, serverId):
159170 Server = coll .find_one ({'server' : ObjectId (serverId )})
160171 if Server :
161172 # We have a server already in the system
162- serverStats = self .ioc .getCollection ('JobServer' ).find_one ({'_id' : ObjectId (serverId )})
173+ serverStats = self .ioc .getCollection (
174+ 'JobServer' ).find_one ({'_id' : ObjectId (serverId )})
163175 if serverStats :
164176 # compare previous results to see if there has been change
165177 if dict (Server ).get ('jobs' , 0 ) < dict (serverStats ).get ('jobs' , 0 ):
@@ -173,28 +185,33 @@ def serverAlive(self, serverId):
173185 }
174186 }
175187 )
176- self .ioc .getLogger ().trace ("JobServer [{0}] is alive" .format (serverId ), trace = True )
188+ self .ioc .getLogger ().trace (
189+ "JobServer [{0}] is alive" .format (serverId ), trace = True )
177190 return True
178191 else :
179192 if dict (Server ).get ('checkTime' , datetime .datetime .utcnow ()) < \
180193 datetime .datetime .utcnow () - datetime .timedelta (minutes = 10 ):
181194 # server has aged out
182195 self .ioc .getLogger ().trace (
183- "JobServer [{0}] is not alive; Timestamp has not changed in ten minutes" .format (serverId ),
196+ "JobServer [{0}] is not alive; Timestamp has not changed in ten minutes" .format (
197+ serverId ),
184198 trace = True
185199 )
186200 return False
187201 else :
188202 # server is in a degraded state
189- self .ioc .getLogger ().warning ("JobServer [{0}] is degraded!" .format (serverId ), trace = True )
203+ self .ioc .getLogger ().warning (
204+ "JobServer [{0}] is degraded!" .format (serverId ), trace = True )
190205 return True
191206 else :
192207 # Failed to find server in JobServer collection
193- self .ioc .getLogger ().error ("JobServer not found during node monitoring! [{0}]" .format (serverId ))
208+ self .ioc .getLogger ().error (
209+ "JobServer not found during node monitoring! [{0}]" .format (serverId ))
194210 return False
195211 else :
196212 # we have a new server
197- serverStats = self .ioc .getCollection ('JobServer' ).find_one ({'_id' : ObjectId (serverId )})
213+ serverStats = self .ioc .getCollection (
214+ 'JobServer' ).find_one ({'_id' : ObjectId (serverId )})
198215 if serverStats :
199216 coll .insert_one (
200217 {
@@ -203,11 +220,13 @@ def serverAlive(self, serverId):
203220 'checkTime' : datetime .datetime .utcnow ()
204221 }
205222 )
206- self .ioc .getLogger ().info ("New JobServer persisted in monitoring [{0}]" .format (serverId ))
223+ self .ioc .getLogger ().info (
224+ "New JobServer persisted in monitoring [{0}]" .format (serverId ))
207225 return True
208226 else :
209227 # Failed to find server in JobServer collection
210- self .ioc .getLogger ().error ("New JobServer not found during node monitoring! [{0}]" .format (serverId ))
228+ self .ioc .getLogger ().error (
229+ "New JobServer not found during node monitoring! [{0}]" .format (serverId ))
211230 return False
212231
213232 def deactivateServer (self , serverId ):
@@ -228,10 +247,12 @@ def deactivateServer(self, serverId):
228247 }
229248 }
230249 ).modified_count < 1 :
231- self .ioc .getLogger ().warning ("Server [{0}] failed to be deactivated" .format (serverId ))
250+ self .ioc .getLogger ().warning (
251+ "Server [{0}] failed to be deactivated" .format (serverId ))
232252 return False
233253 else :
234- self .ioc .getLogger ().warning ("Server [{0}] deactivated" .format (serverId ))
254+ self .ioc .getLogger ().warning (
255+ "Server [{0}] deactivated" .format (serverId ))
235256 return True
236257
237258 def rescheduleDetectJobs (self , serverId ):
@@ -245,10 +266,12 @@ def rescheduleDetectJobs(self, serverId):
245266
246267 """
247268 retval = True
248- server = self .ioc .getCollection ('JobServer' ).find_one ({'_id' : ObjectId (serverId )})
269+ server = self .ioc .getCollection (
270+ 'JobServer' ).find_one ({'_id' : ObjectId (serverId )})
249271 if not server :
250272 self .ioc .getLogger ().error (
251- "Failed to load server details while trying to reschedule detection [{0}]" .format (serverId )
273+ "Failed to load server details while trying to reschedule detection [{0}]" .format (
274+ serverId )
252275 )
253276 return False
254277 for job in self .ioc .getCollection ('SourceData' ).find (
@@ -284,10 +307,12 @@ def rescheduleScheduleJobs(self, serverId):
284307
285308 """
286309 retval = True
287- server = self .ioc .getCollection ('JobServer' ).find_one ({'_id' : ObjectId (serverId )})
310+ server = self .ioc .getCollection (
311+ 'JobServer' ).find_one ({'_id' : ObjectId (serverId )})
288312 if not server :
289313 self .ioc .getLogger ().error (
290- "Failed to load server details while trying to reschedule schedules [{0}]" .format (serverId )
314+ "Failed to load server details while trying to reschedule schedules [{0}]" .format (
315+ serverId )
291316 )
292317 return False
293318 for job in self .ioc .getCollection ('SourceData' ).find (
@@ -323,10 +348,12 @@ def rescheduleJobs(self, serverId):
323348
324349 """
325350 retval = True
326- server = self .ioc .getCollection ('JobServer' ).find_one ({'_id' : ObjectId (serverId )})
351+ server = self .ioc .getCollection (
352+ 'JobServer' ).find_one ({'_id' : ObjectId (serverId )})
327353 if not server :
328354 self .ioc .getLogger ().error (
329- "Failed to load server details while trying to reschedule schedules [{0}]" .format (serverId )
355+ "Failed to load server details while trying to reschedule schedules [{0}]" .format (
356+ serverId )
330357 )
331358 return False
332359 for job in self .ioc .getCollection ('SourceData' ).find (
@@ -351,3 +378,64 @@ def rescheduleJobs(self, serverId):
351378 }
352379 )
353380 return retval
381+
382+ def schedule_orphans (self ):
383+ self .ioc .getLogger ().info ("Checking for orphaned jobs..." , verbose = True )
384+ self .schedule_detection_orphans ()
385+ self .schedule_scheduling_orphans ()
386+ self .schedule_execution_orphans ()
387+
388+ def schedule_detection_orphans (self ):
389+ dead_servers = list (self .ioc .getCollection (
390+ "JobServer" ).find ({'active' : False }))
391+
392+ # Look for any active jobs with an inactive parent server, and reschedule them.
393+ for orphan in self .ioc .getCollection ('SourceData' ).find (
394+ {
395+ 'grease_data.detection.server' : {"$in" : [ObjectId (parent .get ('_id' )) for parent in dead_servers ]},
396+ 'grease_data.detection.end' : None
397+ }
398+ ):
399+ if self .centralScheduler .scheduleDetection (orphan .get ('source' ), orphan .get ('configuration' ), [orphan ]):
400+ self .ioc .getLogger ().info (
401+ "Rescheduled orphan detection record: [{}]" .format (orphan ), verbose = True )
402+ else :
403+ self .ioc .getLogger ().error (
404+ "Unable to reschedule orphan detection record: [{}]" .format (orphan ))
405+
406+ def schedule_scheduling_orphans (self ):
407+ dead_servers = list (self .ioc .getCollection (
408+ "JobServer" ).find ({'active' : False }))
409+
410+ # Look for any active jobs with an inactive parent server, and reschedule them.
411+ for orphan in self .ioc .getCollection ('SourceData' ).find (
412+ {
413+ 'grease_data.scheduling.server' : {"$in" : [ObjectId (parent .get ('_id' )) for parent in dead_servers ]},
414+ 'grease_data.scheduling.end' : None
415+ }
416+ ):
417+ if self .centralScheduler .scheduleScheduling (orphan .get ('_id' )):
418+ self .ioc .getLogger ().info (
419+ "Rescheduled orphan scheduling record: [{}]" .format (orphan ), verbose = True )
420+ else :
421+ self .ioc .getLogger ().error (
422+ "Unable to reschedule orphan detection record: [{}]" .format (orphan ))
423+
424+ def schedule_execution_orphans (self ):
425+ dead_servers = list (self .ioc .getCollection (
426+ "JobServer" ).find ({'active' : False }))
427+
428+ # Look for any active jobs with an inactive parent server, and reschedule them.
429+ for orphan in self .ioc .getCollection ('SourceData' ).find (
430+ {
431+ 'grease_data.execution.server' : {"$in" : [ObjectId (parent .get ('_id' )) for parent in dead_servers ]},
432+ 'grease_data.execution.failures' : {"$lt" : 6 },
433+ 'grease_data.execution.commandSuccess' : False
434+ }
435+ ):
436+ if self .scheduler .schedule (orphan ):
437+ self .ioc .getLogger ().info (
438+ "Rescheduled orphan scheduling record: [{}]" .format (orphan ), verbose = True )
439+ else :
440+ self .ioc .getLogger ().error (
441+ "Unable to reschedule orphan detection record: [{}]" .format (orphan ))
0 commit comments