Skip to content

Commit 38d9609

Browse files
authored
Merge pull request #667 from transformerlab/fix/logging-error-burst
fix burst of --logging error when shutting down the worker
2 parents 3f67d9c + 9292c28 commit 38d9609

File tree

1 file changed

+94
-63
lines changed

1 file changed

+94
-63
lines changed

transformerlab/shared/shared.py

Lines changed: 94 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ async def async_run_python_script_and_update_status(python_script: list[str], jo
172172
raise asyncio.CancelledError()
173173

174174

175-
async def read_process_output(process, job_id):
175+
async def read_process_output(process, job_id, log_handle=None):
176176
await process.wait()
177177
returncode = process.returncode
178178
if returncode == 0:
@@ -181,9 +181,22 @@ async def read_process_output(process, job_id):
181181
print("Worker Process stopped by user")
182182
else:
183183
print(f"ERROR: Worker Process ended with exit code {returncode}.")
184-
with open(get_global_log_path(), "a") as log:
185-
log.write(f"Inference Server Terminated with {returncode}.\n")
186-
log.flush()
184+
185+
# Close the log handle if one was passed (from async_run_python_daemon_and_update_status)
186+
if log_handle:
187+
try:
188+
log_handle.close()
189+
except Exception:
190+
pass
191+
192+
# Wrap log write in try-except to handle errors gracefully during shutdown
193+
try:
194+
with open(get_global_log_path(), "a") as log:
195+
log.write(f"Inference Server Terminated with {returncode}.\n")
196+
log.flush()
197+
except Exception:
198+
# Silently ignore logging errors during shutdown to prevent error bursts
199+
pass
187200
# so we should delete the pid file:
188201
from lab.dirs import get_temp_dir
189202

@@ -217,79 +230,97 @@ async def async_run_python_daemon_and_update_status(
217230
break
218231

219232
# Open a file to write the output to:
220-
log = open(get_global_log_path(), "a")
233+
# Use context manager to ensure proper cleanup, but we need to keep it open
234+
# so we'll use a different approach - store the handle and close it later
235+
log = None
236+
try:
237+
log = open(get_global_log_path(), "a")
221238

222-
# Check if plugin has a venv directory
223-
if plugin_location:
224-
plugin_location = os.path.normpath(plugin_location)
225-
from lab.dirs import get_plugin_dir
239+
# Check if plugin has a venv directory
240+
if plugin_location:
241+
plugin_location = os.path.normpath(plugin_location)
242+
from lab.dirs import get_plugin_dir
243+
244+
plugin_dir_root = get_plugin_dir()
245+
if not plugin_location.startswith(plugin_dir_root):
246+
print(f"Plugin location {plugin_location} is not in {plugin_dir_root}")
247+
raise Exception(f"Plugin location {plugin_location} is not in {plugin_dir_root}")
248+
if os.path.exists(os.path.join(plugin_location, "venv")) and os.path.isdir(
249+
os.path.join(plugin_location, "venv")
250+
):
251+
venv_path = os.path.join(plugin_location, "venv")
252+
print(f">Plugin has virtual environment, activating venv from {venv_path}")
253+
venv_python = os.path.join(venv_path, "bin", "python")
254+
command = [venv_python, *python_script]
255+
else:
256+
print(">Using system Python interpreter")
257+
command = [sys.executable, *python_script]
226258

227-
plugin_dir_root = get_plugin_dir()
228-
if not plugin_location.startswith(plugin_dir_root):
229-
print(f"Plugin location {plugin_location} is not in {plugin_dir_root}")
230-
raise Exception(f"Plugin location {plugin_location} is not in {plugin_dir_root}")
231-
if os.path.exists(os.path.join(plugin_location, "venv")) and os.path.isdir(
232-
os.path.join(plugin_location, "venv")
233-
):
234-
venv_path = os.path.join(plugin_location, "venv")
235-
print(f">Plugin has virtual environment, activating venv from {venv_path}")
236-
venv_python = os.path.join(venv_path, "bin", "python")
237-
command = [venv_python, *python_script]
238259
else:
239260
print(">Using system Python interpreter")
240-
command = [sys.executable, *python_script]
241-
242-
else:
243-
print(">Using system Python interpreter")
244-
command = [sys.executable, *python_script] # Skip the original Python interpreter
245-
246-
process = await asyncio.create_subprocess_exec(
247-
*command, stdin=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE
248-
)
249-
250-
pid = process.pid
251-
from lab.dirs import get_temp_dir
261+
command = [sys.executable, *python_script] # Skip the original Python interpreter
252262

253-
pid_file = os.path.join(get_temp_dir(), f"worker_job_{job_id}.pid")
254-
with open(pid_file, "w") as f:
255-
f.write(str(pid))
263+
process = await asyncio.create_subprocess_exec(
264+
*command, stdin=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE
265+
)
256266

257-
# keep a tail of recent lines so we can show them on failure
258-
recent_lines = deque(maxlen=10)
267+
pid = process.pid
268+
from lab.dirs import get_temp_dir
259269

260-
line = await process.stdout.readline()
261-
error_msg = None
262-
while line:
263-
decoded = line.decode()
270+
pid_file = os.path.join(get_temp_dir(), f"worker_job_{job_id}.pid")
271+
with open(pid_file, "w") as f:
272+
f.write(str(pid))
264273

265-
recent_lines.append(decoded.strip())
274+
# keep a tail of recent lines so we can show them on failure:
275+
recent_lines = deque(maxlen=10)
266276

267-
# If we hit the begin_string then the daemon is started and we can return!
268-
if begin_string in decoded:
269-
if set_process_id_function is not None:
270-
if set_process_id_function:
271-
set_process_id_function(process)
272-
print(f"Worker job {job_id} started successfully")
273-
job = job_service.job_get(job_id)
274-
experiment_id = job["experiment_id"]
275-
await job_update_status(job_id=job_id, status="COMPLETE", experiment_id=experiment_id)
277+
line = await process.stdout.readline()
278+
error_msg = None
279+
while line:
280+
decoded = line.decode()
281+
recent_lines.append(decoded.strip())
282+
283+
# If we hit the begin_string then the daemon is started and we can return!
284+
if begin_string in decoded:
285+
if set_process_id_function is not None:
286+
if set_process_id_function:
287+
set_process_id_function(process)
288+
print(f"Worker job {job_id} started successfully")
289+
job = job_service.job_get(job_id)
290+
experiment_id = job["experiment_id"]
291+
await job_update_status(job_id=job_id, status="COMPLETE", experiment_id=experiment_id)
276292

277-
# Schedule the read_process_output coroutine in the current event
278-
# so we can keep watching this process, but return back to the caller
279-
# so that the REST call can complete
280-
asyncio.create_task(read_process_output(process, job_id))
293+
# Schedule the read_process_output coroutine in the current event
294+
# so we can keep watching this process, but return back to the caller
295+
# so that the REST call can complete
296+
# Pass the log handle to read_process_output so it can close it
297+
# Set log to None so the finally block doesn't close it
298+
log_handle_to_pass = log
299+
log = None
300+
asyncio.create_task(read_process_output(process, job_id, log_handle_to_pass))
281301

282-
return process
302+
return process
283303

284-
# Watch the output for any errors and store the latest error
285-
elif ("stderr" in decoded) and ("ERROR" in decoded):
286-
error_msg = decoded.split("| ")[-1]
304+
# Watch the output for any errors and store the latest error
305+
elif ("stderr" in decoded) and ("ERROR" in decoded):
306+
error_msg = decoded.split("| ")[-1]
287307

308+
# Wrap log write in try-except to handle errors gracefully during shutdown
309+
if log:
310+
try:
311+
log.write(decoded)
312+
log.flush()
313+
except Exception:
314+
# Silently ignore logging errors during shutdown
315+
pass
316+
line = await process.stdout.readline()
317+
finally:
318+
# Ensure log file is closed even if there's an error
288319
if log:
289-
log.write(decoded)
290-
log.flush()
291-
log.flush()
292-
line = await process.stdout.readline()
320+
try:
321+
log.close()
322+
except Exception:
323+
pass
293324

294325
# If we're here then stdout didn't return and we didn't start the daemon
295326
# Wait on the process and return the error

0 commit comments

Comments
 (0)