33from .DeDuplication import Deduplication
44import pymongo
55import datetime
6+ import sys
67
78
89class Scheduling (object ):
@@ -38,18 +39,21 @@ def scheduleDetection(self, source, configName, data):
3839 """
3940 if len (data ) is 0 or not isinstance (data , list ):
4041 self .ioc .getLogger ().trace (
41- "Data provided empty or is not type list type: [{0}] len: [{1}]" .format (str (type (data )), len (data )),
42+ "Data provided empty or is not type list type: [{0}] len: [{1}]" .format (
43+ str (type (data )), len (data )),
4244 trace = True
4345 )
4446 return False
45- self .ioc .getLogger ().trace ("Preparing to schedule [{0}] source objects" .format (len (data )), trace = True )
47+ self .ioc .getLogger ().trace (
48+ "Preparing to schedule [{0}] source objects" .format (len (data )), trace = True )
4649 sourceCollect = self .ioc .getCollection ('SourceData' )
4750 jServerCollect = self .ioc .getCollection ('JobServer' )
4851 # begin scheduling loop of each block
4952 for elem in data :
5053 if not isinstance (elem , dict ):
5154 self .ioc .getLogger ().warning (
52- "Element from data not of type dict! Got [{0}] DROPPED" .format (str (type (elem ))),
55+ "Element from data not of type dict! Got [{0}] DROPPED" .format (
56+ str (type (elem ))),
5357 notify = False
5458 )
5559 continue
@@ -93,7 +97,8 @@ def scheduleDetection(self, source, configName, data):
9397 )
9498 else :
9599 self .ioc .getLogger ().warning (
96- "Failed to find detection server for data object from source [{0}]; DROPPED" .format (source ),
100+ "Failed to find detection server for data object from source [{0}]; DROPPED" .format (
101+ source ),
97102 notify = False
98103 )
99104 self .ioc .getLogger ().warning (
@@ -144,15 +149,34 @@ def determineDetectionServer(self):
144149 tuple: MongoDB Object ID of server & current job count
145150
146151 """
147- result = self .ioc .getCollection ('JobServer' ).find ({
148- 'active' : True ,
149- 'prototypes' : 'detect'
150- }).sort ('jobs' , pymongo .ASCENDING ).limit (1 )
151- if result .count ():
152- return str (result [0 ]['_id' ]), int (result [0 ]['jobs' ])
153- else :
152+ servers = [
153+ (server .get ('_id' ), server .get ('jobs' )) for server in self .ioc .getCollection ('JobServer' ).find (
154+ {
155+ 'active' : True ,
156+ 'prototypes' : 'detect' ,
157+ }
158+ )
159+ ]
160+
161+ best_server = {}
162+ for (server , total_jobs ) in servers :
163+ active_jobs = self .ioc .getCollection ('SourceData' ).find (
164+ {
165+ 'grease_data.detection.server' : server ,
166+ 'grease_data.detection.end' : None ,
167+ }
168+ ).count ()
169+ if active_jobs < best_server .get ('active_jobs' , sys .maxsize ):
170+ best_server ['_id' ] = server
171+ best_server ['total_jobs' ] = total_jobs
172+ best_server ['active_jobs' ] = active_jobs
173+
174+ if not best_server .get ('_id' ):
175+ self .ioc .getLogger ().error ("No active detection server found!" )
154176 return "" , 0
155177
178+ return best_server .get ('_id' ), best_server .get ('total_jobs' )
179+
156180 def determineSchedulingServer (self ):
157181 """Determines scheduling server to use
158182
@@ -162,15 +186,34 @@ def determineSchedulingServer(self):
162186 tuple: MongoDB Object ID of server & current job count
163187
164188 """
165- result = self .ioc .getCollection ('JobServer' ).find ({
166- 'active' : True ,
167- 'prototypes' : 'schedule'
168- }).sort ('jobs' , pymongo .DESCENDING ).limit (1 )
169- if result .count ():
170- return str (result [0 ]['_id' ]), int (result [0 ]['jobs' ])
171- else :
189+ servers = [
190+ (server .get ('_id' ), server .get ('jobs' )) for server in self .ioc .getCollection ('JobServer' ).find (
191+ {
192+ 'active' : True ,
193+ 'prototypes' : 'schedule' ,
194+ }
195+ )
196+ ]
197+
198+ best_server = {}
199+ for (server , total_jobs ) in servers :
200+ active_jobs = self .ioc .getCollection ('SourceData' ).find (
201+ {
202+ 'grease_data.scheduling.server' : server ,
203+ 'grease_data.scheduling.end' : None ,
204+ }
205+ ).count ()
206+ if active_jobs < best_server .get ('active_jobs' , sys .maxsize ):
207+ best_server ['_id' ] = server
208+ best_server ['total_jobs' ] = total_jobs
209+ best_server ['active_jobs' ] = active_jobs
210+
211+ if not best_server .get ('_id' ):
212+ self .ioc .getLogger ().error ("No active scheduling server found!" )
172213 return "" , 0
173214
215+ return best_server .get ('_id' ), best_server .get ('total_jobs' )
216+
174217 def determineExecutionServer (self , role ):
175218 """Determines execution server to use
176219
@@ -180,11 +223,31 @@ def determineExecutionServer(self, role):
180223 str: MongoDB Object ID of server; if one cannot be found then string will be empty
181224
182225 """
183- result = self .ioc .getCollection ('JobServer' ).find ({
184- 'active' : True ,
185- 'roles' : str (role )
186- }).sort ('jobs' , pymongo .DESCENDING ).limit (1 )
187- if result .count ():
188- return str (result [0 ]['_id' ]), int (result [0 ]['jobs' ])
189- else :
226+ servers = [
227+ (server .get ('_id' ), server .get ('jobs' )) for server in self .ioc .getCollection ('JobServer' ).find (
228+ {
229+ 'active' : True ,
230+ 'roles' : str (role ),
231+ }
232+ )
233+ ]
234+
235+ best_server = {}
236+ for (server , total_jobs ) in servers :
237+ active_jobs = self .ioc .getCollection ('SourceData' ).find (
238+ {
239+ 'grease_data.execution.server' : server ,
240+ 'grease_data.execution.completeTime' : None ,
241+ 'grease_data.execution.failures' : {'$lt' : 6 },
242+ }
243+ ).count ()
244+ if active_jobs < best_server .get ('active_jobs' , sys .maxsize ):
245+ best_server ['_id' ] = server
246+ best_server ['total_jobs' ] = total_jobs
247+ best_server ['active_jobs' ] = active_jobs
248+
249+ if not best_server .get ('_id' ):
250+ self .ioc .getLogger ().error ("No active job server found with role {}!" .format (role ))
190251 return "" , 0
252+
253+ return best_server .get ('_id' ), best_server .get ('total_jobs' )
0 commit comments