@@ -535,27 +535,19 @@ def run(self):
535535 if extension == ".yaml" :
536536 workflow = read_and_write_yaml (self .name )
537537 else :
538- workflow = xml_to_yaml .execution ("documents/" + self .name )
539- log .info ("BEFORE OPENING" )
540- log .info (workflow .get ("workflow_type" ))
541- log .info ("DESCRIPTION WORKFLOW" )
542- log .info (workflow )
538+ workflow = xml_to_yaml .execution ("documents/" + self .name )
543539 machine_found = Machine .objects .get (id = self .request .session ['machine_chosen' ])
544540 fqdn = machine_found .fqdn
545541 machine_folder = extract_substring (fqdn )
546542 userMachine = machine_found .user
547543 workflow_name = workflow .get ("workflow_type" )
548- log .info ("workflow_name" )
549- log .info (workflow_name )
550544 principal_folder = machine_found .wdir
551545 wdirPath , nameWdir = wdir_folder (principal_folder )
552546 cmd1 = "source /etc/profile; mkdir -p " + principal_folder + "/" + nameWdir + "/workflows/; echo " + str (
553547 workflow ) + " > " + principal_folder + "/" + nameWdir + "/workflows/" + str (
554548 self .name ) + "; cd " + principal_folder + "; BACKUPDIR=$(ls -td ./*/ | head -1); echo EXECUTION_FOLDER:$BACKUPDIR;"
555549 ssh = connection (self .request .session ["content" ], machine_found .id )
556550 stdin , stdout , stderr = ssh .exec_command (cmd1 )
557- log .info ("COMMAND 1" )
558- log .info (cmd1 )
559551 execution_folder = wdirPath + "/execution"
560552 workflow_folder = wdirPath + "/workflows"
561553
@@ -609,6 +601,7 @@ def run(self):
609601 self .request .session ['jobID' ] = jobID
610602 self .request .session ['execution_folder' ] = execution_folder
611603 os .remove ("documents/" + str (self .name ))
604+
612605 return
613606
614607
@@ -969,6 +962,7 @@ def executions(request):
969962 return redirect ('accounts:run_sim' )
970963
971964 elif 'disconnectButton' in request .POST :
965+ global dict_thread
972966 Connection .objects .filter (idConn_id = request .session ["idConn" ]).update (status = "Disconnect" )
973967 for key in list (request .session .keys ()):
974968 if not key .startswith ("_" ): # skip keys set by the django system
@@ -1011,6 +1005,7 @@ def executions(request):
10111005 request .session ["idConn" ] = c .idConn_id
10121006 threadUpdate = updateExecutions (request , c .idConn_id )
10131007 threadUpdate .start ()
1008+ monitor_checkpoint (request .user , content , c .idConn_id )
10141009 checkConnBool = checkConnection (request )
10151010 if not checkConnBool :
10161011 machines_done = populate_executions_machines (request )
@@ -1134,6 +1129,54 @@ def run(self):
11341129 render_right (self .request )
11351130 return
11361131
1132+ dict_thread = {}
1133+
1134+ class auto_restart_thread (threading .Thread ):
1135+ def __init__ (self , user ,content , conn_id ):
1136+ threading .Thread .__init__ (self )
1137+ super ().__init__ ()
1138+ self .user = user
1139+ self .content = content
1140+ self .conn_id = conn_id
1141+ self ._stop_event = threading .Event ()
1142+
1143+ def run (self ):
1144+ global dict_thread
1145+ if self .user not in dict_thread :
1146+ dict_thread [self .user ]= self
1147+ wait_timeout_new (self .user , self .content , self .conn_id , self ._stop_event )
1148+ return
1149+
1150+ def stop (self ):
1151+ self ._stop_event .set () # Set the event to stop the thread
1152+
1153+
1154+ def wait_timeout_new (user , content , conn_id , stop_event ):
1155+ global dict_thread
1156+ while not stop_event .is_set ():
1157+ executions = Execution .objects .all ().filter (author = user , autorestart = True ).filter (
1158+ Q (status = "PENDING" ) | Q (status = "RUNNING" ) | Q (status = "INITIALIZING" ) | Q (status = "TIMEOUT" ))
1159+ conn = Connection .objects .get (idConn_id = conn_id )
1160+ if not executions and conn .status == "Disconnect" :
1161+ dict_thread .pop (user )
1162+ stop_event .wait (timeout = 5 )
1163+ break # Exit the loop and terminate the thread
1164+ else :
1165+ executionTimeout = Execution .objects .all ().filter (author = user , autorestart = True , status = "TIMEOUT" )
1166+ if executionTimeout :
1167+ for executionT in executionTimeout :
1168+ checkpointing (executionT .jobID , content , user , executionT .machine_id )
1169+ executionT .status = "CONTINUE"
1170+ time .sleep (5 )
1171+ stop_event .wait (timeout = 5 )
1172+ return
1173+
1174+
1175+ def monitor_checkpoint (user , content , conn_id ):
1176+ auto_restart_obj = auto_restart_thread (user ,content , conn_id )
1177+ auto_restart_obj .start ()
1178+ return
1179+
11371180
11381181def update_table (request ):
11391182 machine_found = Machine .objects .get (id = request .session ['machine_chosen' ])
@@ -1158,10 +1201,6 @@ def update_table(request):
11581201 if not (str (values [4 ]) == "FAILED" and executionE .status == "INITIALIZING" ):
11591202 Execution .objects .filter (jobID = executionE .jobID ).update (status = values [4 ], time = values [3 ],
11601203 nodes = int (values [2 ]))
1161- executionTimeout = Execution .objects .all ().filter (author = request .user , autorestart = True , status = "TIMEOUT" )
1162- for executionT in executionTimeout :
1163- executionT .status = "CONTINUE"
1164- checkpointing (executionT .jobID , request , executionT .machine_id )
11651204 return True
11661205
11671206
@@ -1205,9 +1244,9 @@ def stopExecution(eIDstop, request):
12051244 'executionsFailed' : executionsFailed , 'executionsTimeout' : executionTimeout })
12061245
12071246
1208- def checkpointing (jobIDCheckpoint , request , machine_id ):
1209- ssh = connection (request . session [ ' content' ] , machine_id )
1210- checkpointID = Execution .objects .all ().get (author = request . user , jobID = jobIDCheckpoint )
1247+ def checkpointing (jobIDCheckpoint , content , user , machine_id ):
1248+ ssh = connection (content , machine_id )
1249+ checkpointID = Execution .objects .all ().get (author = user , jobID = jobIDCheckpoint )
12111250 command = "source /etc/profile; cd " + checkpointID .wdir + "; source checkpoint_script.sh;"
12121251 stdin , stdout , stderr = ssh .exec_command (command )
12131252 stdout = stdout .readlines ()
@@ -1225,7 +1264,7 @@ def checkpointing(jobIDCheckpoint, request, machine_id):
12251264 form .eID = uuid .uuid4 ()
12261265 form .machine_id = checkpointID .machine_id
12271266 form .user = checkpointID .user
1228- form .author = request . user
1267+ form .author = user
12291268 form .nodes = checkpointID .nodes
12301269 form .status = "PENDING"
12311270 form .checkpoint = checkpointID .jobID
@@ -1244,7 +1283,7 @@ def checkpointing(jobIDCheckpoint, request, machine_id):
12441283 form .g_bool = checkpointID .g_bool
12451284 form .branch = checkpointID .branch
12461285 form .save ()
1247- checkpointID = Execution .objects .all ().get (author = request . user , jobID = jobIDCheckpoint )
1286+ checkpointID = Execution .objects .all ().get (author = user , jobID = jobIDCheckpoint )
12481287 checkpointID .status = "CONTINUE"
12491288 checkpointID .save ()
12501289 # monitor_checkpoint(request.session['jobID'], request, execTime, machine_id)
0 commit comments