@@ -22,18 +22,17 @@ class Report:
2222 Internal use only.
2323 """
2424
25- def __init__ (self , md5 , report , error , start_time , status = "failed" ):
25+ def __init__ (self , key , report , error , start_time , status = "failed" ):
2626 self .start_time = start_time
2727 self .end_time = time .time ()
2828 self .process_time = self .end_time - self .start_time
29- self .md5 = md5
29+ self .key = key
3030 self .report = report
3131 self .error = error
3232 self .status = status
3333
34- def to_json (self ):
34+ def to_dict (self ):
3535 resp = self .__dict__
36- # resp["report"] = json.loads(self.report)
3736 return resp
3837
3938
@@ -46,36 +45,36 @@ class JobExecutor:
4645 executor : Executor
4746
4847 @staticmethod
49- def make_key (md5 ) -> str :
50- return f"job_{ md5 } "
48+ def make_key (k ) -> str :
49+ return f"job_{ k } "
5150
52- def get_job (self , md5 ):
53- key = self .make_key (md5 )
51+ def get_job (self , k ):
52+ key = self .make_key (k )
5453 return self .executor .futures ._futures .get (key , None )
5554
56- def cancel_job (self , md5 ):
57- key = self .make_key (md5 )
55+ def cancel_job (self , k ):
56+ key = self .make_key (k )
5857 return self .executor .futures ._futures .get (key ).cancel ()
5958
6059 def new_job (self , ** kwargs ):
6160 return self .executor .submit_stored (** kwargs )
6261
63- def pop_job (self , md5 ):
64- key = self .make_key (md5 )
62+ def pop_job (self , k ):
63+ key = self .make_key (k )
6564 return self .executor .futures .pop (key )
6665
6766 def __init__ (self , executor ) -> None :
6867 self .executor = executor
6968
70- def run_command (self , cmd , md5 ) -> Report :
69+ def run_command (self , cmd , key ) -> Report :
7170 """
7271 This function is called by the executor to run given command
7372 using a subprocess asynchronously.
7473
7574 :returns:
7675 A ConcurrentFuture object where future.result = Report()
7776 """
78- job_key : str = self .make_key (md5 )
77+ job_key : str = self .make_key (key )
7978 start_time = time .time ()
8079 try :
8180 proc = subprocess .Popen (cmd , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
@@ -89,10 +88,9 @@ def run_command(self, cmd, md5) -> Report:
8988 else :
9089 status = "failed"
9190
92- stdout = {"result" : stdout }
9391 logger .info (f"Job: '{ job_key } ' finished with status: '{ status } '." )
9492 return Report (
95- md5 = md5 ,
93+ key = key ,
9694 report = stdout ,
9795 error = stderr ,
9896 start_time = start_time ,
@@ -101,10 +99,10 @@ def run_command(self, cmd, md5) -> Report:
10199
102100 except Exception as e :
103101 str_err = str (e )
104- self .cancel_job (md5 )
102+ self .cancel_job (key )
105103 logger .error (f"Job: '{ job_key } ' failed. Reason: { str_err } ." )
106104 return Report (
107- md5 = md5 ,
105+ key = key ,
108106 report = None ,
109107 error = str_err ,
110108 start_time = start_time ,
@@ -125,7 +123,7 @@ def save_result(self, future) -> None:
125123 """
126124 # get job result from future
127125 job_res = future .result ()
128- self .__results .update ({job_res .md5 : job_res })
126+ self .__results .update ({job_res .key : job_res })
129127
130128 def get_all (self ):
131129 return self .__results
0 commit comments