|
8 | 8 |
|
9 | 9 | from typing import Dict, List |
10 | 10 |
|
| 11 | +try: |
| 12 | + from gevent.exceptions import LoopExit |
| 13 | +except ImportError: |
| 14 | + # gevent might not be available in some environments |
| 15 | + LoopExit = Exception |
| 16 | + |
11 | 17 | from teuthology import ( |
12 | 18 | # non-modules |
13 | 19 | setup_log_file, |
@@ -99,105 +105,182 @@ def main(args): |
99 | 105 | keep_running = True |
100 | 106 | job_procs = set() |
101 | 107 | worst_returncode = 0 |
| 108 | + loop_exit_count = 0 |
| 109 | + max_loop_exits = 10 # Prevent infinite restart loops |
| 110 | + |
102 | 111 | while keep_running: |
103 | | - # Check to see if we have a teuthology-results process hanging around |
104 | | - # and if so, read its return code so that it can exit. |
105 | | - if result_proc is not None and result_proc.poll() is not None: |
106 | | - log.debug("teuthology-results exited with code: %s", |
107 | | - result_proc.returncode) |
108 | | - result_proc = None |
109 | | - |
110 | | - if sentinel(restart_file_path): |
111 | | - restart() |
112 | | - elif sentinel(stop_file_path): |
113 | | - stop() |
114 | | - |
115 | | - load_config() |
116 | | - for proc in list(job_procs): |
117 | | - rc = proc.poll() |
118 | | - if rc is not None: |
119 | | - worst_returncode = max([worst_returncode, rc]) |
120 | | - job_procs.remove(proc) |
121 | | - job = connection.reserve(timeout=60) |
122 | | - if job is None: |
123 | | - if args.exit_on_empty_queue and not job_procs: |
124 | | - log.info("Queue is empty and no supervisor processes running; exiting!") |
125 | | - break |
126 | | - continue |
127 | | - |
128 | | - # bury the job so it won't be re-run if it fails |
129 | | - job.bury() |
130 | | - job_id = job.jid |
131 | | - log.info('Reserved job %d', job_id) |
132 | | - log.info('Config is: %s', job.body) |
133 | | - job_config = yaml.safe_load(job.body) |
134 | | - job_config['job_id'] = str(job_id) |
135 | | - |
136 | | - if job_config.get('stop_worker'): |
137 | | - keep_running = False |
138 | | - |
139 | 112 | try: |
140 | | - job_config, teuth_bin_path = prep_job( |
141 | | - job_config, |
142 | | - log_file_path, |
143 | | - archive_dir, |
| 113 | + # Check to see if we have a teuthology-results process hanging around |
| 114 | + # and if so, read its return code so that it can exit. |
| 115 | + if result_proc is not None and result_proc.poll() is not None: |
| 116 | + log.debug("teuthology-results exited with code: %s", |
| 117 | + result_proc.returncode) |
| 118 | + result_proc = None |
| 119 | + |
| 120 | + if sentinel(restart_file_path): |
| 121 | + restart() |
| 122 | + elif sentinel(stop_file_path): |
| 123 | + stop() |
| 124 | + |
| 125 | + load_config() |
| 126 | + for proc in list(job_procs): |
| 127 | + rc = proc.poll() |
| 128 | + if rc is not None: |
| 129 | + worst_returncode = max([worst_returncode, rc]) |
| 130 | + job_procs.remove(proc) |
| 131 | + job = connection.reserve(timeout=60) |
| 132 | + if job is None: |
| 133 | + if args.exit_on_empty_queue and not job_procs: |
| 134 | + log.info("Queue is empty and no supervisor processes running; exiting!") |
| 135 | + break |
| 136 | + continue |
| 137 | + |
| 138 | + # bury the job so it won't be re-run if it fails |
| 139 | + job.bury() |
| 140 | + job_id = job.jid |
| 141 | + log.info('Reserved job %d', job_id) |
| 142 | + log.info('Config is: %s', job.body) |
| 143 | + job_config = yaml.safe_load(job.body) |
| 144 | + job_config['job_id'] = str(job_id) |
| 145 | + |
| 146 | + if job_config.get('stop_worker'): |
| 147 | + keep_running = False |
| 148 | + |
| 149 | + try: |
| 150 | + job_config, teuth_bin_path = prep_job( |
| 151 | + job_config, |
| 152 | + log_file_path, |
| 153 | + archive_dir, |
| 154 | + ) |
| 155 | + except SkipJob: |
| 156 | + continue |
| 157 | + |
| 158 | + # lock machines but do not reimage them |
| 159 | + if 'roles' in job_config: |
| 160 | + try: |
| 161 | + job_config = lock_machines(job_config) |
| 162 | + except LoopExit as e: |
| 163 | + log.critical( |
| 164 | + "Caught gevent LoopExit exception during lock_machines for job %s. " |
| 165 | + "This is likely due to gevent/urllib3 blocking issues. " |
| 166 | + "Marking job as dead.", |
| 167 | + job_id |
| 168 | + ) |
| 169 | + log.exception("LoopExit exception details:") |
| 170 | + report.try_push_job_info( |
| 171 | + job_config, |
| 172 | + dict( |
| 173 | + status='dead', |
| 174 | + failure_reason='gevent LoopExit during machine locking: {}'.format(str(e)) |
| 175 | + ) |
| 176 | + ) |
| 177 | + # Skip this job and continue with the next one |
| 178 | + continue |
| 179 | + except Exception as e: |
| 180 | + log.exception("Unexpected exception during lock_machines for job %s", job_id) |
| 181 | + report.try_push_job_info( |
| 182 | + job_config, |
| 183 | + dict( |
| 184 | + status='dead', |
| 185 | + failure_reason='Exception during machine locking: {}'.format(str(e)) |
| 186 | + ) |
| 187 | + ) |
| 188 | + continue |
| 189 | + |
| 190 | + run_args = [ |
| 191 | + os.path.join(teuth_bin_path, 'teuthology-supervisor'), |
| 192 | + '-v', |
| 193 | + '--bin-path', teuth_bin_path, |
| 194 | + '--archive-dir', archive_dir, |
| 195 | + ] |
| 196 | + |
| 197 | + # Create run archive directory if not already created and |
| 198 | + # job's archive directory |
| 199 | + create_job_archive(job_config['name'], |
| 200 | + job_config['archive_path'], |
| 201 | + archive_dir) |
| 202 | + job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml') |
| 203 | + |
| 204 | + # Write initial job config in job archive dir |
| 205 | + with open(job_config_path, 'w') as f: |
| 206 | + yaml.safe_dump(job_config, f, default_flow_style=False) |
| 207 | + |
| 208 | + run_args.extend(["--job-config", job_config_path]) |
| 209 | + |
| 210 | + try: |
| 211 | + # Use start_new_session=True to ensure child processes are isolated |
| 212 | + # from the dispatcher's process group. This prevents accidental |
| 213 | + # termination if the dispatcher crashes or receives signals. |
| 214 | + job_proc = subprocess.Popen( |
| 215 | + run_args, |
| 216 | + stdout=subprocess.DEVNULL, |
| 217 | + stderr=subprocess.DEVNULL, |
| 218 | + start_new_session=True, # Isolate child process from parent |
| 219 | + ) |
| 220 | + job_procs.add(job_proc) |
| 221 | + log.info('Job supervisor PID: %s', job_proc.pid) |
| 222 | + except Exception: |
| 223 | + error_message = "Saw error while trying to spawn supervisor." |
| 224 | + log.exception(error_message) |
| 225 | + if 'targets' in job_config: |
| 226 | + node_names = job_config["targets"].keys() |
| 227 | + lock_ops.unlock_safe( |
| 228 | + node_names, |
| 229 | + job_config["owner"], |
| 230 | + job_config["name"], |
| 231 | + job_config["job_id"] |
| 232 | + ) |
| 233 | + report.try_push_job_info(job_config, dict( |
| 234 | + status='fail', |
| 235 | + failure_reason=error_message)) |
| 236 | + |
| 237 | + # This try/except block is to keep the worker from dying when |
| 238 | + # beanstalkc throws a SocketError |
| 239 | + try: |
| 240 | + job.delete() |
| 241 | + except Exception: |
| 242 | + log.exception("Saw exception while trying to delete job") |
| 243 | + |
| 244 | + # Successful iteration - reset loop exit counter if it was set |
| 245 | + if loop_exit_count > 0: |
| 246 | + log.info("Successfully completed iteration after LoopExit exception(s). Resetting counter.") |
| 247 | + loop_exit_count = 0 |
| 248 | + |
| 249 | + except LoopExit as e: |
| 250 | + loop_exit_count += 1 |
| 251 | + log.critical( |
| 252 | + "CRITICAL: Caught gevent LoopExit exception in dispatcher main loop " |
| 253 | + "(count: %d/%d). This is likely due to gevent/urllib3 blocking issues. " |
| 254 | + "The dispatcher will attempt to continue, but child processes should " |
| 255 | + "be isolated and unaffected.", |
| 256 | + loop_exit_count, |
| 257 | + max_loop_exits |
144 | 258 | ) |
145 | | - except SkipJob: |
| 259 | + log.exception("LoopExit exception details:") |
| 260 | + |
| 261 | + if loop_exit_count >= max_loop_exits: |
| 262 | + log.critical( |
| 263 | + "Maximum LoopExit exceptions (%d) reached. " |
| 264 | + "Dispatcher is exiting to prevent infinite restart loop.", |
| 265 | + max_loop_exits |
| 266 | + ) |
| 267 | + # Ensure all tracked job processes are noted as still running |
| 268 | + # They should continue independently due to start_new_session=True |
| 269 | + log.info("Dispatched %d job supervisor processes that should continue running independently", len(job_procs)) |
| 270 | + break |
| 271 | + |
| 272 | + # Continue to next iteration to attempt recovery |
146 | 273 | continue |
147 | | - |
148 | | - # lock machines but do not reimage them |
149 | | - if 'roles' in job_config: |
150 | | - job_config = lock_machines(job_config) |
151 | | - |
152 | | - run_args = [ |
153 | | - os.path.join(teuth_bin_path, 'teuthology-supervisor'), |
154 | | - '-v', |
155 | | - '--bin-path', teuth_bin_path, |
156 | | - '--archive-dir', archive_dir, |
157 | | - ] |
158 | | - |
159 | | - # Create run archive directory if not already created and |
160 | | - # job's archive directory |
161 | | - create_job_archive(job_config['name'], |
162 | | - job_config['archive_path'], |
163 | | - archive_dir) |
164 | | - job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml') |
165 | | - |
166 | | - # Write initial job config in job archive dir |
167 | | - with open(job_config_path, 'w') as f: |
168 | | - yaml.safe_dump(job_config, f, default_flow_style=False) |
169 | | - |
170 | | - run_args.extend(["--job-config", job_config_path]) |
171 | | - |
172 | | - try: |
173 | | - job_proc = subprocess.Popen( |
174 | | - run_args, |
175 | | - stdout=subprocess.DEVNULL, |
176 | | - stderr=subprocess.DEVNULL, |
| 274 | + |
| 275 | + except Exception as e: |
| 276 | + log.critical( |
| 277 | + "CRITICAL: Uncaught exception in dispatcher main loop: %s", |
| 278 | + type(e).__name__ |
177 | 279 | ) |
178 | | - job_procs.add(job_proc) |
179 | | - log.info('Job supervisor PID: %s', job_proc.pid) |
180 | | - except Exception: |
181 | | - error_message = "Saw error while trying to spawn supervisor." |
182 | | - log.exception(error_message) |
183 | | - if 'targets' in job_config: |
184 | | - node_names = job_config["targets"].keys() |
185 | | - lock_ops.unlock_safe( |
186 | | - node_names, |
187 | | - job_config["owner"], |
188 | | - job_config["name"], |
189 | | - job_config["job_id"] |
190 | | - ) |
191 | | - report.try_push_job_info(job_config, dict( |
192 | | - status='fail', |
193 | | - failure_reason=error_message)) |
194 | | - |
195 | | - # This try/except block is to keep the worker from dying when |
196 | | - # beanstalkc throws a SocketError |
197 | | - try: |
198 | | - job.delete() |
199 | | - except Exception: |
200 | | - log.exception("Saw exception while trying to delete job") |
| 280 | + log.exception("Exception details:") |
| 281 | + # Continue the loop to avoid crashing the dispatcher completely |
| 282 | + # Child processes should be isolated via start_new_session=True |
| 283 | + continue |
201 | 284 |
|
202 | 285 | return worst_returncode |
203 | 286 |
|
|
0 commit comments