Skip to content

Commit f108df0

Browse files
[python] 修复进程重启机制不完善与Scheduler停止时的AttributeError (#359)
* [python] 修复进程重启机制不完善与Scheduler停止时的AttributeError * [python] 修复 HeartBeatWatchThread 重复调用 shutdown() 方法的问题 * [python] 添加主进程异常退出的日志记录 * [python] 移除无用的代码 * [python] 修改检视意见
1 parent a74353d commit f108df0

File tree

4 files changed

+186
-14
lines changed

4 files changed

+186
-14
lines changed

framework/fit/python/fitframework/__init__.py

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,17 @@ def heart_beat_exit_unexpectedly() -> bool:
180180
pass
181181

182182

183-
def determine_should_terminate_main() -> bool:
183+
def _safe_check(checker, desc: str) -> bool:
184+
"""
185+
安全执行checker,异常时打印日志并返回False
186+
"""
184187
try:
185-
return heart_beat_exit_unexpectedly() or get_should_terminate_main()
188+
return checker()
186189
except:
187190
except_type, except_value, except_traceback = sys.exc_info()
188-
fit_logger.warning(f"get should terminate main error, error type: {except_type}, value: {except_value}, "
189-
f"trace back:\n{''.join(traceback.format_tb(except_traceback))}")
191+
fit_logger.warning(f"check {desc} error, error type: {except_type}, "
192+
f"value: {except_value}, trace back:\n"
193+
f"{''.join(traceback.format_tb(except_traceback))}")
190194
return False
191195

192196

@@ -205,7 +209,17 @@ def main():
205209
fit_logger.info(f"fit framework is now available in version {_FIT_FRAMEWORK_VERSION}.")
206210
if get_terminate_main_enabled():
207211
fit_logger.info("terminate main enabled.")
208-
while not determine_should_terminate_main():
212+
while True:
213+
# 明确区分退出原因并打印日志
214+
hb_exit = _safe_check(heart_beat_exit_unexpectedly, "heart_beat_exit_unexpectedly")
215+
should_terminate = _safe_check(get_should_terminate_main, "get_should_terminate_main")
216+
if hb_exit:
217+
fit_logger.warning("main process will exit due to heartbeat background job exited unexpectedly.")
218+
break
219+
if should_terminate:
220+
# 详细原因已在 terminate_main 插件内部按条件分别打印,这里汇总打印一次
221+
fit_logger.info("main process will exit due to terminate-main condition matched.")
222+
break
209223
time.sleep(1)
210224
fit_logger.info("main process terminated.")
211225
shutdown()
@@ -215,7 +229,33 @@ def main():
215229
if platform.system() in ('Windows', 'Darwin'): # Windows 或 macOS
216230
main()
217231
else: # Linux 及其他
232+
from fitframework.utils.restart_policy import create_default_restart_policy
233+
234+
restart_policy = create_default_restart_policy()
235+
fit_logger.info(f"Starting process manager with restart policy: {restart_policy.get_status()}")
236+
218237
while True:
219-
main_process = Process(target=main, name='MainProcess')
220-
main_process.start()
221-
main_process.join()
238+
exit_code = None
239+
try:
240+
main_process = Process(target=main, name='MainProcess')
241+
main_process.start()
242+
fit_logger.info(f"Main process started with PID: {main_process.pid}")
243+
main_process.join()
244+
exit_code = main_process.exitcode
245+
except Exception as e:
246+
fit_logger.error(f"Error during process management: {e}")
247+
exit_code = -1
248+
249+
fit_logger.info(f"Main process exited with code: {exit_code}")
250+
# 使用重启策略判断是否应该重启
251+
if not restart_policy.should_restart(exit_code):
252+
fit_logger.info("Restart policy indicates no restart needed, stopping")
253+
break
254+
255+
# 获取重启延迟
256+
restart_delay = restart_policy.get_restart_delay()
257+
status = restart_policy.get_status()
258+
259+
fit_logger.warning(f"Main process exited unexpectedly, restarting in {restart_delay:.2f} seconds... "
260+
f"(attempt {status['current_attempt']}/{status['max_attempts']})")
261+
time.sleep(restart_delay)
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# -- encoding: utf-8 --
2+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
3+
# This file is a part of the ModelEngine Project.
4+
# Licensed under the MIT License. See License.txt in the project root for license information.
5+
# ======================================================================================================================
6+
"""
7+
功 能:进程重启策略配置
8+
"""
9+
import time
10+
from typing import Dict, Any
11+
from fitframework.api.logging import fit_logger
12+
13+
14+
class RestartPolicy:
15+
"""进程重启策略"""
16+
17+
def __init__(self,
18+
max_attempts: int = 10,
19+
base_delay: float = 5.0,
20+
max_delay: float = 300.0,
21+
backoff_multiplier: float = 1.5,
22+
reset_after_success: bool = True):
23+
self.max_attempts = max_attempts
24+
self.base_delay = base_delay
25+
self.max_delay = max_delay
26+
self.backoff_multiplier = backoff_multiplier
27+
self.reset_after_success = reset_after_success
28+
29+
self.current_attempt = 0
30+
self.current_delay = base_delay
31+
self.last_success_time = time.time()
32+
33+
def should_restart(self, exit_code: int) -> bool:
34+
"""判断是否应该重启"""
35+
# 正常退出不重启
36+
if exit_code == 0:
37+
if self.reset_after_success:
38+
self._reset()
39+
return False
40+
41+
# 超过最大尝试次数不重启
42+
if self.current_attempt >= self.max_attempts:
43+
fit_logger.error(f"Maximum restart attempts ({self.max_attempts}) reached")
44+
return False
45+
46+
return True
47+
48+
def get_restart_delay(self) -> float:
49+
"""获取重启延迟时间"""
50+
delay = min(self.current_delay, self.max_delay)
51+
self.current_attempt += 1
52+
53+
# 指数退避
54+
if self.current_attempt < self.max_attempts:
55+
self.current_delay = min(self.current_delay * self.backoff_multiplier, self.max_delay)
56+
57+
return delay
58+
59+
def _reset(self):
60+
"""重置策略状态"""
61+
self.current_attempt = 0
62+
self.current_delay = self.base_delay
63+
self.last_success_time = time.time()
64+
65+
def get_status(self) -> Dict[str, Any]:
66+
"""获取当前状态"""
67+
return {
68+
'current_attempt': self.current_attempt,
69+
'max_attempts': self.max_attempts,
70+
'current_delay': self.current_delay,
71+
'base_delay': self.base_delay,
72+
'max_delay': self.max_delay,
73+
'last_success_time': self.last_success_time
74+
}
75+
76+
77+
def create_default_restart_policy() -> RestartPolicy:
78+
"""创建默认重启策略"""
79+
return RestartPolicy(
80+
max_attempts=10,
81+
base_delay=5.0,
82+
max_delay=300.0,
83+
backoff_multiplier=1.5,
84+
reset_after_success=True
85+
)
86+
87+
88+
def create_aggressive_restart_policy() -> RestartPolicy:
89+
"""创建激进重启策略(快速重启)"""
90+
return RestartPolicy(
91+
max_attempts=20,
92+
base_delay=2.0,
93+
max_delay=60.0,
94+
backoff_multiplier=1.2,
95+
reset_after_success=True
96+
)
97+
98+
99+
def create_conservative_restart_policy() -> RestartPolicy:
100+
"""创建保守重启策略(慢速重启)"""
101+
return RestartPolicy(
102+
max_attempts=5,
103+
base_delay=10.0,
104+
max_delay=600.0,
105+
backoff_multiplier=2.0,
106+
reset_after_success=True
107+
)

framework/fit/python/fitframework/utils/scheduler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,11 @@ def _start():
109109
@register_event(FrameworkEvent.FRAMEWORK_STOPPING)
110110
def _stop():
111111
global _timer
112-
_timer.cancel()
113-
fit_logger.info("timer stopped")
112+
if _timer is not None:
113+
_timer.cancel()
114+
fit_logger.info("timer stopped")
115+
else:
116+
fit_logger.warning("timer was not initialized, skip stopping")
114117

115118

116119
class _Timer:

framework/fit/python/plugin/fit_py_heart_beat_agent/heart_beat_agent.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import traceback
1919
from multiprocessing import Process
2020
from queue import Empty
21-
from threading import Thread
21+
from threading import Thread, Lock
2222
from typing import List
2323

2424
from fitframework import const
@@ -36,6 +36,10 @@
3636
_LAST_HEART_BEAT_SUCCESS_TIME = time.time()
3737
# 心跳进程是否意外退出
3838
_HEART_BEAT_EXIT_UNEXPECTEDLY = False
39+
40+
# 是否正在关闭中(用于区分正常关闭和意外退出)
41+
_SHUTDOWN_IN_PROGRESS = False
42+
_SHUTDOWN_LOCK = Lock()
3943
# 上次注册服务的时间,用于避免频繁注册覆盖热加载的服务
4044
_LAST_REGISTRY_TIME = 0
4145

@@ -118,13 +122,18 @@ def _try_heart_beat_once():
118122

119123
sys_plugin_logger.debug(f'heart beating success.')
120124
_LAST_HEART_BEAT_SUCCESS_TIME = heart_beat_finish_time
121-
except:
125+
except Exception as e:
122126
_FAIL_COUNT += 1
123127
except_type, except_value, except_traceback = sys.exc_info()
124128
sys_plugin_logger.warning(f"heart beat failed. [fail_count={_FAIL_COUNT}]")
125129
sys_plugin_logger.warning(f"heart beat error type: {except_type}, value: {except_value}, "
126130
f"trace back:\n{''.join(traceback.format_tb(except_traceback))}")
127131

132+
# 如果连续失败次数过多,考虑重启心跳任务
133+
if _FAIL_COUNT >= 5:
134+
sys_plugin_logger.error(f"heart beat failed too many times ({_FAIL_COUNT}), considering restart")
135+
# 这里可以添加重启心跳任务的逻辑
136+
128137

129138
def _heart_beat_task(queue: multiprocessing.Queue):
130139
while True:
@@ -143,9 +152,18 @@ def _heart_beat_task(queue: multiprocessing.Queue):
143152
def _heart_beat_monitor(heart_beat_background_job):
144153
while heart_beat_background_job.is_alive():
145154
time.sleep(1)
146-
global _HEART_BEAT_EXIT_UNEXPECTEDLY
147-
_HEART_BEAT_EXIT_UNEXPECTEDLY = True
155+
global _HEART_BEAT_EXIT_UNEXPECTEDLY, _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK
156+
# 检查是否正在关闭中
157+
with _SHUTDOWN_LOCK:
158+
if _SHUTDOWN_IN_PROGRESS:
159+
# 如果是正常关闭,心跳任务退出是预期的,不需要再次调用shutdown
160+
sys_plugin_logger.info("heart beat job exited during graceful shutdown, no action needed.")
161+
return
162+
# 如果不是正常关闭,则认为是意外退出
163+
_HEART_BEAT_EXIT_UNEXPECTEDLY = True
148164
sys_plugin_logger.error("heart beat job is not alive, runtime should shutdown immediately.")
165+
# 添加延迟,给进程重启机制一些时间
166+
time.sleep(2)
149167
shutdown()
150168

151169

@@ -168,7 +186,11 @@ def online() -> None:
168186
@register_event(Fit_Event.FRAMEWORK_STOPPING)
169187
def offline():
170188
""" Runtime关闭前应主动向心跳代理申请offline,心跳代理停止发送heartbeat并调用心跳服务端leave接口 """
189+
global _SHUTDOWN_IN_PROGRESS, _SHUTDOWN_LOCK
171190
sys_plugin_logger.info("heart beat agent offline")
191+
# 设置关闭标志,表示正在正常关闭
192+
with _SHUTDOWN_LOCK:
193+
_SHUTDOWN_IN_PROGRESS = True
172194
_HEART_BEAT_FINISH_QUEUE.put(None)
173195

174196

0 commit comments

Comments
 (0)