1+ import time
2+ import typing
13from collections import OrderedDict
24import logging
35from queue import Queue
@@ -36,8 +38,11 @@ def mount(cls, job: JobExecutor):
3638 return job
3739
3840 job = cls .compilers [job .job_type ](job )
41+ return job
42+
3943 except Exception as e :
4044 logging .error (f"Failed to mount job { job .uid } . Cause: { repr (e )} " )
45+ raise
4146
4247 @staticmethod
4348 def keepWorking ():
@@ -49,40 +54,46 @@ def register(cls, key: str, compiler: Callable):
4954
5055 @classmethod
5156 def manage (cls , job : JobExecutor ):
52- if job .uid not in cls .jobs :
53- cls .jobs [job .uid ] = job
54-
55- for observer in cls .observers :
56- observer (job , "JOB_MANAGED" )
57+ try :
58+ if job .uid not in cls .jobs :
59+ cls .jobs [job .uid ] = job
60+ logging .info ("Managing the job: " + str (job .uid ))
61+ for observer in cls .observers :
62+ observer (job , "JOB_MANAGED" )
63+ except Exception as e :
64+ import traceback
65+ logging .error (f"Failed to manage job { job .uid } . Cause: { repr (e )} " )
66+ logging .error (traceback .format_exc ())
5767
5868 @classmethod
5969 def broadcast (cls , event , ** kwargs ):
6070 raise NotImplementedError ("Broadcasting not implemented yet" )
6171
62- @classmethod
63- def send (cls , uid , event , ** kwargs ):
64- try :
65- job = cls .jobs .get (uid )
66- client = cls .connections .connect (job .host ) # TODO client should be optional
67- if job .task_handler :
68- job .task_handler (cls , uid , event , client = client , ** kwargs )
69- except Exception as e :
70- logging .error (f"Failed to send event { event } to job { uid } . Cause: { repr (e )} " )
72+ # @classmethod
73+ # def send(cls, uid, event, retry=False, **kwargs):
74+ # try:
75+ # job = cls.jobs.get(uid)
76+ # client = cls.connections.connect(job.host) # TODO client should be optional
77+ # if job.task_handler:
78+ # job.task_handler(cls, uid, event, client=client, **kwargs)
79+ # except errors.SSHException as e:
80+ # logging.warning(f"Failed to send event {event} to job {uid}. Cause: {repr(e)}")
81+ # if retry:
82+ # time.sleep(1) # avoid flooding the queue
83+ # cls.agenda.put((uid, event)) # pass retry here
84+ # except Exception as e:
85+ # logging.error(f"Failed to send event {event} to job {uid}. Cause: {repr(e)}")
86+ # raise
7187
7288 @classmethod
73- def communicate (cls , uid , event , ** kwargs ):
89+ def locked_send (cls , uid , event , ** kwargs ):
7490 with cls .read_lock :
7591 try :
7692 job = cls .jobs .get (uid , None )
7793 if job and (job .status not in cls .endstates ):
78- cls .send (uid , event , ** kwargs )
79- except errors .SSHException as e :
80- print ("communicate function failed ON CONNECTION: " , repr (e ))
81- cls .agenda .put ((uid , event ))
94+ job .process (event , cls , cls .connections , ** kwargs )
8295 except Exception as e :
83- print ("communicate function failed: " , repr (e ))
84- finally :
85- pass
96+ print (f"Failed to deliver event { event } to job { uid } . Cause: { repr (e )} " )
8697
8798 @classmethod
8899 def add_observer (cls , observer : Callable ):
@@ -91,18 +102,23 @@ def add_observer(cls, observer: Callable):
91102
92103 @classmethod
93104 def set_state (
94- cls ,
95- uid ,
96- status ,
97- progress = None ,
98- message = None ,
99- traceback = None ,
100- start_time = None ,
101- end_time = None ,
102- details : Dict = None ,
105+ cls ,
106+ uid ,
107+ status ,
108+ progress = None ,
109+ message = None ,
110+ traceback : typing . Union [ str , Dict , None ] = None ,
111+ start_time = None ,
112+ end_time = None ,
113+ details : Dict = None ,
103114 ):
115+ job = cls .jobs .get (uid )
116+
117+ if job is None :
118+ logging .info (f"Job { uid } removed. Skipping this state change." )
119+ return
120+
104121 try :
105- job = cls .jobs .get (uid )
106122 job .status = status
107123 job .progress = progress or job .progress
108124 job .message = message or job .message
@@ -130,17 +146,10 @@ def set_state(
130146
131147 for observer in cls .observers :
132148 observer (job , "JOB_MODIFIED" )
133- except AttributeError :
134- logging .info ("Job not existing anymore, skipping set_state" )
135- import traceback
136-
137- traceback .print_exc ()
138- logging .info ("---------------" )
139149 except Exception as e :
140150 import traceback
141-
142151 traceback .print_exc ()
143- logging .error (f"on jobs.JobManager.set_state = { repr (e )} " )
152+ logging .error (f"Failed to set state for job { uid } . Cause: { repr (e )} " )
144153 finally :
145154 pass
146155
@@ -178,39 +187,44 @@ def persist(cls, uid):
178187
179188 @classmethod
180189 def resume (cls , job ):
190+
191+ if job is None or not isinstance (job , JobExecutor ):
192+ return
193+
194+ uid = job .uid
195+
181196 try :
182197 client = ConnectionManager .connect (job .host )
198+ mounted_job = cls .mount (job )
183199
184- job = cls .mount ( job )
200+ cls .manage ( mounted_job )
185201
186- cls .manage (job )
202+ if client and mounted_job and mounted_job .status == "IDLE" :
203+ cls .set_state (uid , status = "RUNNING" )
187204
188- if client and job and job .status == "IDLE" :
189- cls .set_state (job .uid , status = "RUNNING" )
190-
191- cls .schedule (job .uid , "PROGRESS" )
205+ cls .schedule (uid , "PROGRESS" )
192206 return True
193207 except errors .AuthException as e :
194208 logging .warning (repr (e ))
195- if job .host .rsa_key :
209+ if mounted_job .host .rsa_key :
196210 cls .set_state (
197- job . uid ,
211+ uid ,
198212 status = "IDLE" ,
199213 traceback = {
200214 "[ERROR] Authentication failed" : "Please check your credentials (Identity file) and reconnect manually."
201215 },
202216 )
203217 else :
204218 cls .set_state (
205- job . uid ,
219+ uid ,
206220 status = "IDLE" ,
207221 traceback = {"[ERROR] Authentication failed" : "Password required, please reconnect manually." },
208222 )
209223 return False
210224 except errors .BadHostKeyException as e :
211225 logging .warning (repr (e ))
212226 cls .set_state (
213- job . uid ,
227+ uid ,
214228 status = "IDLE" ,
215229 traceback = {
216230 "[ERROR] Authentication failed" : "Please check your credentials (Identity file) and reconnect manually."
@@ -221,7 +235,7 @@ def resume(cls, job):
221235 # TODO return for accounts instead of login
222236 import traceback
223237
224- cls .set_state (job . uid , status = "IDLE" , traceback = {"[ERROR] Unable to connect" : traceback .format_exc ()})
238+ cls .set_state (uid , status = "IDLE" , traceback = {"[ERROR] Unable to connect" : traceback .format_exc ()})
225239
226240 return False
227241
@@ -232,7 +246,7 @@ def resume_all(cls):
232246 job .status = "IDLE"
233247
234248 @classmethod
235- def load (cls ):
249+ def load_jobs (cls ):
236250 try :
237251 jobfile = cls .storage
238252 djobs = cls .loadjson (jobfile )
@@ -283,65 +297,19 @@ def loadjson(path: Path):
283297
284298
285299def start_monitor ():
286- def jobspy ():
300+ def monitor ():
287301 while JobManager .keepWorking ():
288302 try :
289303 uid , event = JobManager .agenda .get ()
290- JobManager .communicate (uid , event )
304+ if event == "SHUTDOWN" :
305+ break
306+
307+ JobManager .locked_send (uid , event )
291308 except :
292309 pass
293310
294- t = Thread (target = jobspy , daemon = False )
311+ t = Thread (target = monitor , daemon = False )
295312 t .start ()
296313
297314 return t
298315
299- # @classmethod
300- # def _persist(cls):
301- # if cls.storage is None:
302- # raise ValueError("No storage path set.")
303-
304- # data = []
305- # for job in cls.jobs.values():
306- # blob = pickle.dumps(cls.jobs).encode('utf-8')
307-
308- # djob = {
309- # "host": asdict(job.host),
310- # "uid": job.uid,
311- # "handler": blob,
312- # "status": job.status,
313- # "progress": job.progress,
314- # "message": job.message,
315- # "traceback": job.traceback
316- # }
317-
318- # data.append(djob)
319-
320- # content = {"jobs": data}
321-
322- # cls.targets_storage.parent.mkdir(parents=True, exist_ok=True)
323-
324- # with open(cls.targets_storage, "w") as file:
325- # json.dump(content, file, indent=2)
326-
327- # @classmethod
328- # def load_from_remote(cls):
329- # if cls.targets_storage is None:
330- # raise ValueError("No storage path set.")
331-
332- # if not cls.targets_storage.exists():
333- # logging.warning(f"Target storage {cls.targets_storage} does not exist. Starting with empty targets.")
334- # cls.targets = {}
335- # return
336-
337- # try:
338- # with open(cls.targets_storage, "r") as file:
339- # content = json.load(file)
340- # cls.targets = {host["name"]: Host(**host) for host in content["hosts"]}
341- # except Exception as e:
342- # logging.error(f"Error loading targets: {e}")
343- # cls.targets = {}
344-
345- # @classmethod
346- # def load(cls):
347- # pass
0 commit comments