Skip to content

Commit 543ebd0

Browse files
committed
Fix dispatcher crash on gevent LoopExit exceptions
Handle gevent LoopExit exceptions gracefully to prevent dispatcher crashes. Add exception handling in main loop and lock_machines() call, with loop exit counter (max 10) to prevent infinite restarts. Isolate child processes using start_new_session=True so job supervisors continue running independently if dispatcher encounters exceptions. Signed-off-by: deepssin <deepssin@redhat.com>
1 parent 5067830 commit 543ebd0

File tree

1 file changed

+175
-92
lines changed

1 file changed

+175
-92
lines changed

teuthology/dispatcher/__init__.py

Lines changed: 175 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
from typing import Dict, List
1010

11+
try:
12+
from gevent.exceptions import LoopExit
13+
except ImportError:
14+
# gevent might not be available in some environments
15+
LoopExit = Exception
16+
1117
from teuthology import (
1218
# non-modules
1319
setup_log_file,
@@ -99,105 +105,182 @@ def main(args):
99105
keep_running = True
100106
job_procs = set()
101107
worst_returncode = 0
102-
while keep_running:
103-
# Check to see if we have a teuthology-results process hanging around
104-
# and if so, read its return code so that it can exit.
105-
if result_proc is not None and result_proc.poll() is not None:
106-
log.debug("teuthology-results exited with code: %s",
107-
result_proc.returncode)
108-
result_proc = None
109-
110-
if sentinel(restart_file_path):
111-
restart()
112-
elif sentinel(stop_file_path):
113-
stop()
114-
115-
load_config()
116-
for proc in list(job_procs):
117-
rc = proc.poll()
118-
if rc is not None:
119-
worst_returncode = max([worst_returncode, rc])
120-
job_procs.remove(proc)
121-
job = connection.reserve(timeout=60)
122-
if job is None:
123-
if args.exit_on_empty_queue and not job_procs:
124-
log.info("Queue is empty and no supervisor processes running; exiting!")
125-
break
126-
continue
127-
128-
# bury the job so it won't be re-run if it fails
129-
job.bury()
130-
job_id = job.jid
131-
log.info('Reserved job %d', job_id)
132-
log.info('Config is: %s', job.body)
133-
job_config = yaml.safe_load(job.body)
134-
job_config['job_id'] = str(job_id)
135-
136-
if job_config.get('stop_worker'):
137-
keep_running = False
108+
loop_exit_count = 0
109+
max_loop_exits = 10 # Prevent infinite restart loops
138110

111+
while keep_running:
139112
try:
140-
job_config, teuth_bin_path = prep_job(
141-
job_config,
142-
log_file_path,
143-
archive_dir,
113+
# Check to see if we have a teuthology-results process hanging around
114+
# and if so, read its return code so that it can exit.
115+
if result_proc is not None and result_proc.poll() is not None:
116+
log.debug("teuthology-results exited with code: %s",
117+
result_proc.returncode)
118+
result_proc = None
119+
120+
if sentinel(restart_file_path):
121+
restart()
122+
elif sentinel(stop_file_path):
123+
stop()
124+
125+
load_config()
126+
for proc in list(job_procs):
127+
rc = proc.poll()
128+
if rc is not None:
129+
worst_returncode = max([worst_returncode, rc])
130+
job_procs.remove(proc)
131+
job = connection.reserve(timeout=60)
132+
if job is None:
133+
if args.exit_on_empty_queue and not job_procs:
134+
log.info("Queue is empty and no supervisor processes running; exiting!")
135+
break
136+
continue
137+
138+
# bury the job so it won't be re-run if it fails
139+
job.bury()
140+
job_id = job.jid
141+
log.info('Reserved job %d', job_id)
142+
log.info('Config is: %s', job.body)
143+
job_config = yaml.safe_load(job.body)
144+
job_config['job_id'] = str(job_id)
145+
146+
if job_config.get('stop_worker'):
147+
keep_running = False
148+
149+
try:
150+
job_config, teuth_bin_path = prep_job(
151+
job_config,
152+
log_file_path,
153+
archive_dir,
154+
)
155+
except SkipJob:
156+
continue
157+
158+
# lock machines but do not reimage them
159+
if 'roles' in job_config:
160+
try:
161+
job_config = lock_machines(job_config)
162+
except LoopExit as e:
163+
log.critical(
164+
"Caught gevent LoopExit exception during lock_machines for job %s. "
165+
"This is likely due to gevent/urllib3 blocking issues. "
166+
"Marking job as dead.",
167+
job_id
168+
)
169+
log.exception("LoopExit exception details:")
170+
report.try_push_job_info(
171+
job_config,
172+
dict(
173+
status='dead',
174+
failure_reason='gevent LoopExit during machine locking: {}'.format(str(e))
175+
)
176+
)
177+
# Skip this job and continue with the next one
178+
continue
179+
except Exception as e:
180+
log.exception("Unexpected exception during lock_machines for job %s", job_id)
181+
report.try_push_job_info(
182+
job_config,
183+
dict(
184+
status='dead',
185+
failure_reason='Exception during machine locking: {}'.format(str(e))
186+
)
187+
)
188+
continue
189+
190+
run_args = [
191+
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
192+
'-v',
193+
'--bin-path', teuth_bin_path,
194+
'--archive-dir', archive_dir,
195+
]
196+
197+
# Create run archive directory if not already created and
198+
# job's archive directory
199+
create_job_archive(job_config['name'],
200+
job_config['archive_path'],
201+
archive_dir)
202+
job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
203+
204+
# Write initial job config in job archive dir
205+
with open(job_config_path, 'w') as f:
206+
yaml.safe_dump(job_config, f, default_flow_style=False)
207+
208+
run_args.extend(["--job-config", job_config_path])
209+
210+
try:
211+
# Use start_new_session=True to ensure child processes are isolated
212+
# from the dispatcher's process group. This prevents accidental
213+
# termination if the dispatcher crashes or receives signals.
214+
job_proc = subprocess.Popen(
215+
run_args,
216+
stdout=subprocess.DEVNULL,
217+
stderr=subprocess.DEVNULL,
218+
start_new_session=True, # Isolate child process from parent
219+
)
220+
job_procs.add(job_proc)
221+
log.info('Job supervisor PID: %s', job_proc.pid)
222+
except Exception:
223+
error_message = "Saw error while trying to spawn supervisor."
224+
log.exception(error_message)
225+
if 'targets' in job_config:
226+
node_names = job_config["targets"].keys()
227+
lock_ops.unlock_safe(
228+
node_names,
229+
job_config["owner"],
230+
job_config["name"],
231+
job_config["job_id"]
232+
)
233+
report.try_push_job_info(job_config, dict(
234+
status='fail',
235+
failure_reason=error_message))
236+
237+
# This try/except block is to keep the worker from dying when
238+
# beanstalkc throws a SocketError
239+
try:
240+
job.delete()
241+
except Exception:
242+
log.exception("Saw exception while trying to delete job")
243+
244+
# Successful iteration - reset loop exit counter if it was set
245+
if loop_exit_count > 0:
246+
log.info("Successfully completed iteration after LoopExit exception(s). Resetting counter.")
247+
loop_exit_count = 0
248+
249+
except LoopExit:
250+
loop_exit_count += 1
251+
log.critical(
252+
"CRITICAL: Caught gevent LoopExit exception in dispatcher main loop "
253+
"(count: %d/%d). This is likely due to gevent/urllib3 blocking issues. "
254+
"The dispatcher will attempt to continue, but child processes should "
255+
"be isolated and unaffected.",
256+
loop_exit_count,
257+
max_loop_exits
144258
)
145-
except SkipJob:
146-
continue
259+
log.exception("LoopExit exception details:")
147260

148-
# lock machines but do not reimage them
149-
if 'roles' in job_config:
150-
job_config = lock_machines(job_config)
151-
152-
run_args = [
153-
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
154-
'-v',
155-
'--bin-path', teuth_bin_path,
156-
'--archive-dir', archive_dir,
157-
]
158-
159-
# Create run archive directory if not already created and
160-
# job's archive directory
161-
create_job_archive(job_config['name'],
162-
job_config['archive_path'],
163-
archive_dir)
164-
job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
165-
166-
# Write initial job config in job archive dir
167-
with open(job_config_path, 'w') as f:
168-
yaml.safe_dump(job_config, f, default_flow_style=False)
261+
if loop_exit_count >= max_loop_exits:
262+
log.critical(
263+
"Maximum LoopExit exceptions (%d) reached. "
264+
"Dispatcher is exiting to prevent infinite restart loop.",
265+
max_loop_exits
266+
)
267+
# Ensure all tracked job processes are noted as still running
268+
# They should continue independently due to start_new_session=True
269+
log.info("Dispatched %d job supervisor processes that should continue running independently", len(job_procs))
270+
break
169271

170-
run_args.extend(["--job-config", job_config_path])
272+
# Continue to next iteration to attempt recovery
273+
continue
171274

172-
try:
173-
job_proc = subprocess.Popen(
174-
run_args,
175-
stdout=subprocess.DEVNULL,
176-
stderr=subprocess.DEVNULL,
275+
except Exception as e:
276+
log.critical(
277+
"CRITICAL: Uncaught exception in dispatcher main loop: %s",
278+
type(e).__name__
177279
)
178-
job_procs.add(job_proc)
179-
log.info('Job supervisor PID: %s', job_proc.pid)
180-
except Exception:
181-
error_message = "Saw error while trying to spawn supervisor."
182-
log.exception(error_message)
183-
if 'targets' in job_config:
184-
node_names = job_config["targets"].keys()
185-
lock_ops.unlock_safe(
186-
node_names,
187-
job_config["owner"],
188-
job_config["name"],
189-
job_config["job_id"]
190-
)
191-
report.try_push_job_info(job_config, dict(
192-
status='fail',
193-
failure_reason=error_message))
194-
195-
# This try/except block is to keep the worker from dying when
196-
# beanstalkc throws a SocketError
197-
try:
198-
job.delete()
199-
except Exception:
200-
log.exception("Saw exception while trying to delete job")
280+
log.exception("Exception details:")
281+
# Continue the loop to avoid crashing the dispatcher completely
282+
# Child processes should be isolated via start_new_session=True
283+
continue
201284

202285
return worst_returncode
203286

0 commit comments

Comments
 (0)