Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 175 additions & 92 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

from typing import Dict, List

try:
from gevent.exceptions import LoopExit
except ImportError:
# gevent might not be available in some environments
LoopExit = Exception

from teuthology import (
# non-modules
setup_log_file,
Expand Down Expand Up @@ -99,105 +105,182 @@ def main(args):
keep_running = True
job_procs = set()
worst_returncode = 0
while keep_running:
# Check to see if we have a teuthology-results process hanging around
# and if so, read its return code so that it can exit.
if result_proc is not None and result_proc.poll() is not None:
log.debug("teuthology-results exited with code: %s",
result_proc.returncode)
result_proc = None

if sentinel(restart_file_path):
restart()
elif sentinel(stop_file_path):
stop()

load_config()
for proc in list(job_procs):
rc = proc.poll()
if rc is not None:
worst_returncode = max([worst_returncode, rc])
job_procs.remove(proc)
job = connection.reserve(timeout=60)
if job is None:
if args.exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue

# bury the job so it won't be re-run if it fails
job.bury()
job_id = job.jid
log.info('Reserved job %d', job_id)
log.info('Config is: %s', job.body)
job_config = yaml.safe_load(job.body)
job_config['job_id'] = str(job_id)

if job_config.get('stop_worker'):
keep_running = False
loop_exit_count = 0
max_loop_exits = 10 # Prevent infinite restart loops

while keep_running:
try:
job_config, teuth_bin_path = prep_job(
job_config,
log_file_path,
archive_dir,
# Check to see if we have a teuthology-results process hanging around
# and if so, read its return code so that it can exit.
if result_proc is not None and result_proc.poll() is not None:
log.debug("teuthology-results exited with code: %s",
result_proc.returncode)
result_proc = None

if sentinel(restart_file_path):
restart()
elif sentinel(stop_file_path):
stop()

load_config()
for proc in list(job_procs):
rc = proc.poll()
if rc is not None:
worst_returncode = max([worst_returncode, rc])
job_procs.remove(proc)
job = connection.reserve(timeout=60)
if job is None:
if args.exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue

# bury the job so it won't be re-run if it fails
job.bury()
job_id = job.jid
log.info('Reserved job %d', job_id)
log.info('Config is: %s', job.body)
job_config = yaml.safe_load(job.body)
job_config['job_id'] = str(job_id)

if job_config.get('stop_worker'):
keep_running = False

try:
job_config, teuth_bin_path = prep_job(
job_config,
log_file_path,
archive_dir,
)
except SkipJob:
continue

# lock machines but do not reimage them
if 'roles' in job_config:
try:
job_config = lock_machines(job_config)
except LoopExit as e:
log.critical(
"Caught gevent LoopExit exception during lock_machines for job %s. "
"This is likely due to gevent/urllib3 blocking issues. "
"Marking job as dead.",
job_id
)
log.exception("LoopExit exception details:")
report.try_push_job_info(
job_config,
dict(
status='dead',
failure_reason='gevent LoopExit during machine locking: {}'.format(str(e))
)
)
# Skip this job and continue with the next one
continue
except Exception as e:
log.exception("Unexpected exception during lock_machines for job %s", job_id)
report.try_push_job_info(
job_config,
dict(
status='dead',
failure_reason='Exception during machine locking: {}'.format(str(e))
)
)
continue

run_args = [
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
'-v',
'--bin-path', teuth_bin_path,
'--archive-dir', archive_dir,
]

# Create run archive directory if not already created and
# job's archive directory
create_job_archive(job_config['name'],
job_config['archive_path'],
archive_dir)
job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')

# Write initial job config in job archive dir
with open(job_config_path, 'w') as f:
yaml.safe_dump(job_config, f, default_flow_style=False)

run_args.extend(["--job-config", job_config_path])

try:
# Use start_new_session=True to ensure child processes are isolated
# from the dispatcher's process group. This prevents accidental
# termination if the dispatcher crashes or receives signals.
job_proc = subprocess.Popen(
run_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True, # Isolate child process from parent
)
job_procs.add(job_proc)
log.info('Job supervisor PID: %s', job_proc.pid)
except Exception:
error_message = "Saw error while trying to spawn supervisor."
log.exception(error_message)
if 'targets' in job_config:
node_names = job_config["targets"].keys()
lock_ops.unlock_safe(
node_names,
job_config["owner"],
job_config["name"],
job_config["job_id"]
)
report.try_push_job_info(job_config, dict(
status='fail',
failure_reason=error_message))

# This try/except block is to keep the worker from dying when
# beanstalkc throws a SocketError
try:
job.delete()
except Exception:
log.exception("Saw exception while trying to delete job")

# Successful iteration - reset loop exit counter if it was set
if loop_exit_count > 0:
log.info("Successfully completed iteration after LoopExit exception(s). Resetting counter.")
loop_exit_count = 0

except LoopExit:
loop_exit_count += 1
log.critical(
"CRITICAL: Caught gevent LoopExit exception in dispatcher main loop "
"(count: %d/%d). This is likely due to gevent/urllib3 blocking issues. "
"The dispatcher will attempt to continue, but child processes should "
"be isolated and unaffected.",
loop_exit_count,
max_loop_exits
)
except SkipJob:
continue
log.exception("LoopExit exception details:")

# lock machines but do not reimage them
if 'roles' in job_config:
job_config = lock_machines(job_config)

run_args = [
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
'-v',
'--bin-path', teuth_bin_path,
'--archive-dir', archive_dir,
]

# Create run archive directory if not already created and
# job's archive directory
create_job_archive(job_config['name'],
job_config['archive_path'],
archive_dir)
job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')

# Write initial job config in job archive dir
with open(job_config_path, 'w') as f:
yaml.safe_dump(job_config, f, default_flow_style=False)
if loop_exit_count >= max_loop_exits:
log.critical(
"Maximum LoopExit exceptions (%d) reached. "
"Dispatcher is exiting to prevent infinite restart loop.",
max_loop_exits
)
# Ensure all tracked job processes are noted as still running
# They should continue independently due to start_new_session=True
log.info("Dispatched %d job supervisor processes that should continue running independently", len(job_procs))
break

run_args.extend(["--job-config", job_config_path])
# Continue to next iteration to attempt recovery
continue

try:
job_proc = subprocess.Popen(
run_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
except Exception as e:
log.critical(
"CRITICAL: Uncaught exception in dispatcher main loop: %s",
type(e).__name__
)
job_procs.add(job_proc)
log.info('Job supervisor PID: %s', job_proc.pid)
except Exception:
error_message = "Saw error while trying to spawn supervisor."
log.exception(error_message)
if 'targets' in job_config:
node_names = job_config["targets"].keys()
lock_ops.unlock_safe(
node_names,
job_config["owner"],
job_config["name"],
job_config["job_id"]
)
report.try_push_job_info(job_config, dict(
status='fail',
failure_reason=error_message))

# This try/except block is to keep the worker from dying when
# beanstalkc throws a SocketError
try:
job.delete()
except Exception:
log.exception("Saw exception while trying to delete job")
log.exception("Exception details:")
# Continue the loop to avoid crashing the dispatcher completely
# Child processes should be isolated via start_new_session=True
continue

return worst_returncode

Expand Down
Loading