22import signal
33import subprocess
44import tempfile
5+ import psutil
6+ import atexit
7+
8+ # TODO: Global process tracking for proper cleanup
9+ _training_process = None
10+ _inference_process = None
11+ _temp_files = []
512
613
714def start_training (dict : dict ):
15+ global _training_process
16+
17+ # TODO: Stop existing training process if running
18+ if _training_process and _training_process .poll () is None :
19+ print ("Stopping existing training process..." )
20+ stop_training ()
21+
822 path = "pytorch_connectomics/scripts/main.py"
9-
1023 command = ["python" , path ]
1124
1225 for key , value in dict ["arguments" ].items ():
1326 if value is not None :
1427 command .extend ([f"--{ key } " , str (value )])
1528
16- # Write the value to a temporary file
17- with tempfile .NamedTemporaryFile (
29+ # TODO: Write the value to a temporary file and track it for cleanup
30+ temp_file = tempfile .NamedTemporaryFile (
1831 delete = False , mode = "w" , suffix = ".yaml"
19- ) as temp_file :
20- temp_file .write (dict ["trainingConfig" ])
21- temp_filepath = temp_file .name
22- command .extend (["--config-file" , str (temp_filepath )])
23-
24- # Execute the command using subprocess.call
25- print (command )
32+ )
33+ temp_file .write (dict ["trainingConfig" ])
34+ temp_filepath = temp_file .name
35+ temp_file .close ()
36+ _temp_files .append (temp_filepath )
37+
38+ command .extend (["--config-file" , str (temp_filepath )])
39+
40+ # TODO: Execute the command using subprocess.Popen for proper async handling
41+ print ("Starting training with command:" , command )
2642 try :
27- subprocess .call (command )
28- except subprocess .CalledProcessError as e :
29- print (f"Error occurred: { e } " )
30-
31- print ("start_training" )
32- initialize_tensorboard (dict ["logPath" ])
33- print ("initialize_tensorboard" )
43+ _training_process = subprocess .Popen (
44+ command ,
45+ stdout = subprocess .PIPE ,
46+ stderr = subprocess .PIPE ,
47+ text = True
48+ )
49+ print (f"Training process started with PID: { _training_process .pid } " )
50+
51+ # Initialize tensorboard asynchronously
52+ initialize_tensorboard (dict ["logPath" ])
53+ print ("TensorBoard initialized" )
54+
55+ return {"status" : "started" , "pid" : _training_process .pid }
56+ except Exception as e :
57+ print (f"Error starting training: { e } " )
58+ # Cleanup temp file if process failed to start
59+ if os .path .exists (temp_filepath ):
60+ os .unlink (temp_filepath )
61+ _temp_files .remove (temp_filepath )
62+ raise
3463
3564
36- def stop_process (process_name ):
65+ def stop_process_by_name (process_name ):
66+ """Stop processes by name using psutil for better reliability"""
3767 try :
38- process_line = os .popen ("ps ax | grep " + process_name + " | grep -v grep" )
39- print (process_line )
40- fields = process_line .split ()
41- pid = fields [0 ]
42- print (pid )
43- os .kill (int (pid ), signal .SIGKILL )
44- print (f"Process { process_name } Successfully Terminated" )
68+ for proc in psutil .process_iter (['pid' , 'name' , 'cmdline' ]):
69+ try :
70+ if process_name in ' ' .join (proc .info ['cmdline' ] or []):
71+ print (f"Terminating process { proc .info ['pid' ]} : { ' ' .join (proc .info ['cmdline' ])} " )
72+ proc .terminate ()
73+ proc .wait (timeout = 10 ) # Wait up to 10 seconds for graceful termination
74+ except (psutil .NoSuchProcess , psutil .AccessDenied , psutil .TimeoutExpired ):
75+ # Process already terminated or we don't have permission
76+ continue
4577 except Exception as e :
46- print (
47- f"Error Encountered while attempting to stop the process: { process_name } , error: { e } "
48- )
78+ print (f"Error stopping processes by name '{ process_name } ': { e } " )
79+
80+ def cleanup_temp_files ():
81+ """Clean up temporary files created during training/inference"""
82+ global _temp_files
83+ for temp_file in _temp_files [:]: # Create a copy to iterate over
84+ try :
85+ if os .path .exists (temp_file ):
86+ os .unlink (temp_file )
87+ print (f"Cleaned up temp file: { temp_file } " )
88+ _temp_files .remove (temp_file )
89+ except Exception as e :
90+ print (f"Error cleaning up temp file { temp_file } : { e } " )
4991
5092
5193def stop_training ():
52- process_name = "python pytorch_connectomics/scripts/main.py"
53- stop_process (process_name )
94+ global _training_process
95+
96+ # TODO: Stop the tracked training process first
97+ if _training_process and _training_process .poll () is None :
98+ try :
99+ print (f"Terminating training process PID: { _training_process .pid } " )
100+ _training_process .terminate ()
101+ _training_process .wait (timeout = 10 )
102+ except subprocess .TimeoutExpired :
103+ print ("Force killing training process..." )
104+ _training_process .kill ()
105+ _training_process .wait ()
106+ except Exception as e :
107+ print (f"Error stopping training process: { e } " )
108+ finally :
109+ _training_process = None
110+
111+ # Stop any remaining processes by name as fallback
112+ stop_process_by_name ("python pytorch_connectomics/scripts/main.py" )
54113 stop_tensorboard ()
114+ cleanup_temp_files ()
115+ return {"status" : "stopped" }
55116
56117
57118tensorboard_url = None
@@ -77,8 +138,7 @@ def get_tensorboard():
77138
78139
79140def stop_tensorboard ():
80- process_name = "tensorboard"
81- stop_process (process_name )
141+ stop_process_by_name ("tensorboard" )
82142
83143
84144def start_inference (dict : dict ):
0 commit comments