Skip to content

Commit ff5fbd1

Browse files
committed
refactor(Utils): 重构 AccessToken 获取逻辑
- 修复大小写错误:将 config.logger 改为 config.Logger - 优化 get_access_token 函数: - 添加超时和重试机制 - 使用 AccessTokenPayload 直接解析响应数据 - 采用分段睡眠避免长时间阻塞 - 增加异常处理,提高程序健壮性 - 重新组织代码结构,提高可读性和可维护性
1 parent bf42e75 commit ff5fbd1

File tree

2 files changed

+39
-28
lines changed

2 files changed

+39
-28
lines changed

src/Utils/Config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ def __init__ (self):
1616
_config = ConfigBase(**yaml_config)
1717

1818
if _config.Advanced.debug:
19-
_config.logger.level = "DEBUG"
19+
_config.Logger.level = "DEBUG"
2020
else:
21-
_config.logger.level = _config.logger.level
21+
_config.Logger.level = _config.Logger.level
2222

2323
config = _config

src/Utils/GetAccessToken.py

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,45 @@
11
import threading, requests, time
22

3-
43
from src.Utils.Logger import logger
54
from src.Utils.Config import config
6-
from src.Utils.EventClass import AccessTokenPayload, OpenAPIErrorPayload
7-
8-
ACCESS_TOKEN: str = ""
9-
10-
async def get_access_token():
11-
threading.Thread(target=access_token, daemon=True,name="AccessToekn 鉴权获取线程").start()
5+
from src.Utils.EventClass import AccessTokenPayload
126

7+
ACCESS_TOKEN = ""
8+
TOKEN_LOCK = threading.Lock()
139

14-
def access_token():
15-
"""获取access_token"""
10+
def token_manager():
1611
while True:
1712
logger.debug("正在获取access_token...")
18-
response = requests.post("https://bots.qq.com/app/getAppAccessToken",
19-
json={"appId": str(config.Bot.appid), "clientSecret": config.Bot.appsecret}
20-
)
21-
response.raise_for_status
22-
code = response.status_code
23-
response = response.json()
24-
if code != 200:
25-
response = OpenAPIErrorPayload(**response)
26-
logger.error(f"获取access_token失败,错误码:{response.code};原因:{response.message}")
27-
logger.debug("当前输出内容:" + str({"appId": str(config.Bot.appid),"clientSecret": config.Bot.appsecret}))
28-
time.sleep(60)
29-
else:
30-
response_json = AccessTokenPayload(**response)
31-
logger.debug(f"获取AccessToken成功,token: {response_json.access_token} 将于 {response_json.expires_in} s后过期")
32-
global ACCESS_TOKEN
33-
ACCESS_TOKEN = response_json.access_token
34-
time.sleep(response_json.expires_in - 30)
13+
try:
14+
# 添加超时和重试机制
15+
response = requests.post(
16+
"https://bots.qq.com/app/getAppAccessToken",
17+
json={"appId": str(config.Bot.appid), "clientSecret": config.Bot.appsecret},
18+
timeout=10
19+
)
20+
response.raise_for_status() # 修复:添加括号调用方法
21+
22+
token_data = AccessTokenPayload(**response.json())
23+
logger.debug(f"获取AccessToken成功,token: {token_data.access_token} 将于 {token_data.expires_in}s后过期")
24+
25+
with TOKEN_LOCK:
26+
global ACCESS_TOKEN
27+
ACCESS_TOKEN = token_data.access_token
28+
29+
# 分段睡眠,避免长时间阻塞
30+
sleep_time = token_data.expires_in - 30
31+
while sleep_time > 0:
32+
chunk = min(sleep_time, 300) # 每5分钟唤醒一次
33+
time.sleep(chunk)
34+
sleep_time -= chunk
35+
36+
except requests.exceptions.RequestException as e:
37+
logger.error(f"请求失败: {str(e)}")
38+
time.sleep(10) # 短时间等待后重试
39+
except Exception as e:
40+
logger.error(f"意外错误: {str(e)}")
41+
time.sleep(30)
42+
43+
async def get_access_token():
44+
# 启动线程
45+
threading.Thread(target=token_manager, daemon=True, name="AccessToekn 鉴权获取线程").start()

0 commit comments

Comments
 (0)