Skip to content

Commit 960b9b2

Browse files
authored
Merge pull request #2107 from deepssin/fix-gevent-loopexit-dispatcher-crash
Fix dispatcher crash on gevent LoopExit exceptions
2 parents 9af2ef6 + 543ebd0 commit 960b9b2

File tree

1 file changed

+175
-92
lines changed

1 file changed

+175
-92
lines changed

teuthology/dispatcher/__init__.py

Lines changed: 175 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
from typing import Dict, List
1010

11+
try:
12+
from gevent.exceptions import LoopExit
13+
except ImportError:
14+
# gevent might not be available in some environments
15+
LoopExit = Exception
16+
1117
from teuthology import (
1218
# non-modules
1319
setup_log_file,
@@ -99,105 +105,182 @@ def main(args):
99105
keep_running = True
100106
job_procs = set()
101107
worst_returncode = 0
102-
while keep_running:
103-
# Check to see if we have a teuthology-results process hanging around
104-
# and if so, read its return code so that it can exit.
105-
if result_proc is not None and result_proc.poll() is not None:
106-
log.debug("teuthology-results exited with code: %s",
107-
result_proc.returncode)
108-
result_proc = None
109-
110-
if sentinel(restart_file_path):
111-
restart()
112-
elif sentinel(stop_file_path):
113-
stop()
114-
115-
load_config()
116-
for proc in list(job_procs):
117-
rc = proc.poll()
118-
if rc is not None:
119-
worst_returncode = max([worst_returncode, rc])
120-
job_procs.remove(proc)
121-
job = connection.reserve(timeout=60)
122-
if job is None:
123-
if args.exit_on_empty_queue and not job_procs:
124-
log.info("Queue is empty and no supervisor processes running; exiting!")
125-
break
126-
continue
127-
128-
# bury the job so it won't be re-run if it fails
129-
job.bury()
130-
job_id = job.jid
131-
log.info('Reserved job %d', job_id)
132-
log.info('Config is: %s', job.body)
133-
job_config = yaml.safe_load(job.body)
134-
job_config['job_id'] = str(job_id)
135-
136-
if job_config.get('stop_worker'):
137-
keep_running = False
108+
loop_exit_count = 0
109+
max_loop_exits = 10 # Prevent infinite restart loops
138110

111+
while keep_running:
139112
try:
140-
job_config, teuth_bin_path = prep_job(
141-
job_config,
142-
log_file_path,
143-
archive_dir,
113+
# Check to see if we have a teuthology-results process hanging around
114+
# and if so, read its return code so that it can exit.
115+
if result_proc is not None and result_proc.poll() is not None:
116+
log.debug("teuthology-results exited with code: %s",
117+
result_proc.returncode)
118+
result_proc = None
119+
120+
if sentinel(restart_file_path):
121+
restart()
122+
elif sentinel(stop_file_path):
123+
stop()
124+
125+
load_config()
126+
for proc in list(job_procs):
127+
rc = proc.poll()
128+
if rc is not None:
129+
worst_returncode = max([worst_returncode, rc])
130+
job_procs.remove(proc)
131+
job = connection.reserve(timeout=60)
132+
if job is None:
133+
if args.exit_on_empty_queue and not job_procs:
134+
log.info("Queue is empty and no supervisor processes running; exiting!")
135+
break
136+
continue
137+
138+
# bury the job so it won't be re-run if it fails
139+
job.bury()
140+
job_id = job.jid
141+
log.info('Reserved job %d', job_id)
142+
log.info('Config is: %s', job.body)
143+
job_config = yaml.safe_load(job.body)
144+
job_config['job_id'] = str(job_id)
145+
146+
if job_config.get('stop_worker'):
147+
keep_running = False
148+
149+
try:
150+
job_config, teuth_bin_path = prep_job(
151+
job_config,
152+
log_file_path,
153+
archive_dir,
154+
)
155+
except SkipJob:
156+
continue
157+
158+
# lock machines but do not reimage them
159+
if 'roles' in job_config:
160+
try:
161+
job_config = lock_machines(job_config)
162+
except LoopExit as e:
163+
log.critical(
164+
"Caught gevent LoopExit exception during lock_machines for job %s. "
165+
"This is likely due to gevent/urllib3 blocking issues. "
166+
"Marking job as dead.",
167+
job_id
168+
)
169+
log.exception("LoopExit exception details:")
170+
report.try_push_job_info(
171+
job_config,
172+
dict(
173+
status='dead',
174+
failure_reason='gevent LoopExit during machine locking: {}'.format(str(e))
175+
)
176+
)
177+
# Skip this job and continue with the next one
178+
continue
179+
except Exception as e:
180+
log.exception("Unexpected exception during lock_machines for job %s", job_id)
181+
report.try_push_job_info(
182+
job_config,
183+
dict(
184+
status='dead',
185+
failure_reason='Exception during machine locking: {}'.format(str(e))
186+
)
187+
)
188+
continue
189+
190+
run_args = [
191+
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
192+
'-v',
193+
'--bin-path', teuth_bin_path,
194+
'--archive-dir', archive_dir,
195+
]
196+
197+
# Create run archive directory if not already created and
198+
# job's archive directory
199+
create_job_archive(job_config['name'],
200+
job_config['archive_path'],
201+
archive_dir)
202+
job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
203+
204+
# Write initial job config in job archive dir
205+
with open(job_config_path, 'w') as f:
206+
yaml.safe_dump(job_config, f, default_flow_style=False)
207+
208+
run_args.extend(["--job-config", job_config_path])
209+
210+
try:
211+
# Use start_new_session=True to ensure child processes are isolated
212+
# from the dispatcher's process group. This prevents accidental
213+
# termination if the dispatcher crashes or receives signals.
214+
job_proc = subprocess.Popen(
215+
run_args,
216+
stdout=subprocess.DEVNULL,
217+
stderr=subprocess.DEVNULL,
218+
start_new_session=True, # Isolate child process from parent
219+
)
220+
job_procs.add(job_proc)
221+
log.info('Job supervisor PID: %s', job_proc.pid)
222+
except Exception:
223+
error_message = "Saw error while trying to spawn supervisor."
224+
log.exception(error_message)
225+
if 'targets' in job_config:
226+
node_names = job_config["targets"].keys()
227+
lock_ops.unlock_safe(
228+
node_names,
229+
job_config["owner"],
230+
job_config["name"],
231+
job_config["job_id"]
232+
)
233+
report.try_push_job_info(job_config, dict(
234+
status='fail',
235+
failure_reason=error_message))
236+
237+
# This try/except block is to keep the worker from dying when
238+
# beanstalkc throws a SocketError
239+
try:
240+
job.delete()
241+
except Exception:
242+
log.exception("Saw exception while trying to delete job")
243+
244+
# Successful iteration - reset loop exit counter if it was set
245+
if loop_exit_count > 0:
246+
log.info("Successfully completed iteration after LoopExit exception(s). Resetting counter.")
247+
loop_exit_count = 0
248+
249+
except LoopExit:
250+
loop_exit_count += 1
251+
log.critical(
252+
"CRITICAL: Caught gevent LoopExit exception in dispatcher main loop "
253+
"(count: %d/%d). This is likely due to gevent/urllib3 blocking issues. "
254+
"The dispatcher will attempt to continue, but child processes should "
255+
"be isolated and unaffected.",
256+
loop_exit_count,
257+
max_loop_exits
144258
)
145-
except SkipJob:
146-
continue
259+
log.exception("LoopExit exception details:")
147260

148-
# lock machines but do not reimage them
149-
if 'roles' in job_config:
150-
job_config = lock_machines(job_config)
151-
152-
run_args = [
153-
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
154-
'-v',
155-
'--bin-path', teuth_bin_path,
156-
'--archive-dir', archive_dir,
157-
]
158-
159-
# Create run archive directory if not already created and
160-
# job's archive directory
161-
create_job_archive(job_config['name'],
162-
job_config['archive_path'],
163-
archive_dir)
164-
job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
165-
166-
# Write initial job config in job archive dir
167-
with open(job_config_path, 'w') as f:
168-
yaml.safe_dump(job_config, f, default_flow_style=False)
261+
if loop_exit_count >= max_loop_exits:
262+
log.critical(
263+
"Maximum LoopExit exceptions (%d) reached. "
264+
"Dispatcher is exiting to prevent infinite restart loop.",
265+
max_loop_exits
266+
)
267+
# Ensure all tracked job processes are noted as still running
268+
# They should continue independently due to start_new_session=True
269+
log.info("Dispatched %d job supervisor processes that should continue running independently", len(job_procs))
270+
break
169271

170-
run_args.extend(["--job-config", job_config_path])
272+
# Continue to next iteration to attempt recovery
273+
continue
171274

172-
try:
173-
job_proc = subprocess.Popen(
174-
run_args,
175-
stdout=subprocess.DEVNULL,
176-
stderr=subprocess.DEVNULL,
275+
except Exception as e:
276+
log.critical(
277+
"CRITICAL: Uncaught exception in dispatcher main loop: %s",
278+
type(e).__name__
177279
)
178-
job_procs.add(job_proc)
179-
log.info('Job supervisor PID: %s', job_proc.pid)
180-
except Exception:
181-
error_message = "Saw error while trying to spawn supervisor."
182-
log.exception(error_message)
183-
if 'targets' in job_config:
184-
node_names = job_config["targets"].keys()
185-
lock_ops.unlock_safe(
186-
node_names,
187-
job_config["owner"],
188-
job_config["name"],
189-
job_config["job_id"]
190-
)
191-
report.try_push_job_info(job_config, dict(
192-
status='fail',
193-
failure_reason=error_message))
194-
195-
# This try/except block is to keep the worker from dying when
196-
# beanstalkc throws a SocketError
197-
try:
198-
job.delete()
199-
except Exception:
200-
log.exception("Saw exception while trying to delete job")
280+
log.exception("Exception details:")
281+
# Continue the loop to avoid crashing the dispatcher completely
282+
# Child processes should be isolated via start_new_session=True
283+
continue
201284

202285
return worst_returncode
203286

0 commit comments

Comments
 (0)