88import json
99import threading
1010from Core .Logger import logger
11+ from Core .Config import config
1112from datetime import timedelta
1213
13- # 从 QQ 机器人控制台获取的 Bot Secret
14- BOT_SECRET = "您的BOTSecret"
15-
16- # 从 QQ 机器人控制台获取的 App ID 和 Client Secret
17- APP_ID = "您的APPID"
18- # 全局变量存储 access_token 和过期时间
19- access_token = None
20- expires_at = 0
21-
22- start_time = None
23-
24- # 验证签名
25- def verify_signature (bot_secret : str , signature_hex : str , timestamp : str , http_body : bytes ) -> bool :
26- seed = bot_secret .encode ("utf-8" )
27- while len (seed ) < nacl .bindings .crypto_sign_SEEDBYTES :
28- seed += seed
29- seed = seed [:nacl .bindings .crypto_sign_SEEDBYTES ]
30- signing_key = nacl .signing .SigningKey (seed )
31- verify_key = signing_key .verify_key
32-
33- msg = timestamp .encode ("utf-8" ) + http_body
34- try :
35- signature = nacl .encoding .HexEncoder .decode (signature_hex )
36- verify_key .verify (msg , signature )
37- return True
38- except nacl .exceptions .BadSignatureError :
39- return False
40-
41- # 生成签名
42- def generate_signature (bot_secret : str , plain_token : str , event_ts : str ) -> str :
43- seed = bot_secret .encode ("utf-8" )
44- while len (seed ) < nacl .bindings .crypto_sign_SEEDBYTES :
45- seed += seed
46- seed = seed [:nacl .bindings .crypto_sign_SEEDBYTES ]
47- signing_key = nacl .signing .SigningKey (seed )
48-
49- msg = event_ts .encode ("utf-8" ) + plain_token .encode ("utf-8" )
50- signed_msg = signing_key .sign (msg )
51- signature = signed_msg .signature .hex ()
52- logger .debug (f"框架 >>> 生成签名: { signature } " )
53- return signature
54-
55- # 验证回调请求
56- async def validate_callback (request : Request ):
57- signature_hex = request .headers .get ("X-Signature-Ed25519" )
58- timestamp = request .headers .get ("X-Signature-Timestamp" )
59- http_body = await request .body ()
60-
61- if not verify_signature (BOT_SECRET , signature_hex , timestamp , http_body ):
62- raise HTTPException (status_code = 401 , detail = "Invalid signature" )
63- # 获取 access_token 的函数
64- def get_access_token (app_id : str , client_secret : str ) -> dict :
65- url = "https://bots.qq.com/app/getAppAccessToken"
66- headers = {"Content-Type" : "application/json" }
67- data = {"appId" : app_id , "clientSecret" : client_secret }
68- response = requests .post (url , headers = headers , data = json .dumps (data ))
69- if response .status_code == 200 :
70- return response .json ()
71- else :
72- logger .error (f"框架 >>> 获取 access token 失败: { response .text } " )
73- raise Exception (f"Failed to get access token: { response .text } " )
74-
75- # 守护线程函数
76- def token_refresh_thread (app_id : str , client_secret : str ):
77- global access_token , expires_at , start_time
78- start_time = time .time ()
79- while True :
14+
15+ class Auth :
16+ def __init__ (self , Bot_Secret : str , AppId : str ):
17+ self .BOTSECRET = Bot_Secret
18+ self .APPID = AppId
19+ self .access_token = None
20+ self .expires_at = 0
21+ self .start_time = None
22+
23+ def verify_signature (self , signature_hex : str , timestamp : str , http_body : bytes ) -> bool :
24+ seed = self .BOTSECRET .encode ("utf-8" )
25+ while len (seed ) < nacl .bindings .crypto_sign_SEEDBYTES :
26+ seed += seed
27+ seed = seed [:nacl .bindings .crypto_sign_SEEDBYTES ]
28+ signing_key = nacl .signing .SigningKey (seed )
29+ verify_key = signing_key .verify_key
30+
31+ msg = timestamp .encode ("utf-8" ) + http_body
8032 try :
81- # 获取新的 access_token
82- response = get_access_token (app_id , client_secret )
83- access_token = response ["access_token" ]
84- expires_in = int (response ["expires_in" ])
85- expires_at = time .time () + int (expires_in ) - 45 # 提前45s刷新 / 详见:https://bot.q.qq.com/wiki/develop/api-v2/dev-prepare/interface-framework/api-use.html#%E8%8E%B7%E5%8F%96%E8%B0%83%E7%94%A8%E5%87%AD%E8%AF%81
86- logger .debug (f"框架 >>> Access token 已刷新: { access_token } , 将于 { expires_in } s 后过期" )
87- except Exception as e :
88- logger .error (f"框架 >>> 刷新 access token 失败: { e } " )
89- raise e
90- wait_time = expires_at - time .time () # 等待时间
91- if wait_time > 0 :
92- time .sleep (wait_time )
33+ signature = nacl .encoding .HexEncoder .decode (signature_hex )
34+ verify_key .verify (msg , signature )
35+ return True
36+ except nacl .exceptions .BadSignatureError :
37+ return False
38+
39+ def generate_signature (self , plain_token : str , event_ts : str ) -> str :
40+ seed = self .BOTSECRET .encode ("utf-8" )
41+ while len (seed ) < nacl .bindings .crypto_sign_SEEDBYTES :
42+ seed += seed
43+ seed = seed [:nacl .bindings .crypto_sign_SEEDBYTES ]
44+ signing_key = nacl .signing .SigningKey (seed )
45+
46+ msg = event_ts .encode ("utf-8" ) + plain_token .encode ("utf-8" )
47+ signed_msg = signing_key .sign (msg )
48+ signature = signed_msg .signature .hex ()
49+ logger .debug (f"框架 >>> 生成签名: { signature } " )
50+ return signature
51+
52+ async def validate_callback (self , request : Request ):
53+ signature_hex = request .headers .get ("X-Signature-Ed25519" )
54+ timestamp = request .headers .get ("X-Signature-Timestamp" )
55+ http_body = await request .body ()
56+
57+ if not self .verify_signature (signature_hex , timestamp , http_body ):
58+ raise HTTPException (status_code = 401 , detail = "Invalid signature" )
59+
60+ def get_access_token (self ) -> dict :
61+ url = "https://bots.qq.com/app/getAppAccessToken"
62+ headers = {"Content-Type" : "application/json" }
63+ data = {"appId" : self .APPID , "clientSecret" : self .BOTSECRET }
64+ response = requests .post (url , headers = headers , data = json .dumps (data ))
65+ if response .status_code == 200 :
66+ return response .json ()
9367 else :
94- logger .warning ("框架 >>> Access token 已失效...立即获取新token" )
95- pass
96-
97- def start_token_refresh (app_id : str , client_secret : str ):
98- thread = threading .Thread (target = token_refresh_thread , args = (app_id , client_secret ))
99- thread .daemon = True # 设置为守护线程
100- thread .start ()
101- logger .debug ("框架 >>> Access Token刷新线程启动" )
102-
103- def get_current_access_token ():
104- if access_token is None or time .time () >= expires_at :
105- raise Exception ("Access token is not available or has expired." )
106- return access_token
107-
108-
109- def format_timedelta (delta : timedelta ) -> str :
110- """
111- 将 timedelta 对象格式化为 'XX天 XX时 XX分 XX秒' 的字符串
112- """
113- days = delta .days
114- hours , remainder = divmod (delta .seconds , 3600 )
115- minutes , seconds = divmod (remainder , 60 )
116- return f"{ days } 天 { hours :02d} 时 { minutes :02d} 分 { seconds :02d} 秒"
117-
118- def get_current_run_time (string_out : bool = True ) -> timedelta or str :
119- """
120- 获取当前运行时间
121-
122- Args:
123- string_out (bool): 是否返回格式化后的字符串。默认为 True,返回格式化后的字符串。
124- 如果设置为 False,则返回 timedelta 对象。
125-
126- Returns:
127- timedelta or str: 根据 string_out 参数返回 timedelta 对象或格式化后的字符串。
128- """
129- global start_time
130- if start_time is None :
131- start_time = time .time ()
132- now = time .time ()
133- elapsed_time = timedelta (seconds = int (now - start_time ))
134-
135- if string_out :
136- return format_timedelta (elapsed_time )
137- else :
138- return elapsed_time
68+ logger .error (f"框架 >>> 获取 access token 失败: { response .text } " )
69+ raise Exception (f"Failed to access get token: { response .text } " )
70+
71+ def token_refresh_thread (self ):
72+ self .start_time = time .time ()
73+ while True :
74+ try :
75+ response = self .get_access_token ()
76+ logger .debug (f'框架 >>> 获得返回内容:{ response } ' )
77+ self .access_token = response ["access_token" ]
78+ expires_in = int (response ["expires_in" ])
79+ self .expires_at = time .time () + int (expires_in ) - 45
80+ logger .debug (f"框架 >>> Access token 已刷新: { self .access_token } , 将于 { expires_in } s 后过期" )
81+ except Exception as e :
82+ logger .error (f"框架 >>> 刷新 access token 失败: { e } " )
83+ raise e
84+ wait_time = self .expires_at - time .time ()
85+ if wait_time > 0 :
86+ time .sleep (wait_time )
87+ else :
88+ logger .warning ("框架 >>> Access token 已失效...立即获取新token" )
89+
90+ def start_token_refresh (self ):
91+ thread = threading .Thread (target = self .token_refresh_thread )
92+ thread .daemon = True
93+ thread .start ()
94+ logger .debug ("框架 >>> Access Token刷新线程启动" )
95+
96+ def get_current_access_token (self ):
97+ if self .access_token is None or time .time () >= self .expires_at :
98+ raise Exception ("Access token is not available or has expired." )
99+ return self .access_token
100+
101+ def format_timedelta (self , delta : timedelta ) -> str :
102+ days = delta .days
103+ hours , remainder = divmod (delta .seconds , 3600 )
104+ minutes , seconds = divmod (remainder , 60 )
105+ return f"{ days } 天 { hours :02d} 时 { minutes :02d} 分 { seconds :02d} 秒"
106+
107+ def get_current_run_time (self , string_out : bool = True ):
108+ if self .start_time is None :
109+ self .start_time = time .time ()
110+ now = time .time ()
111+ elapsed_time = timedelta (seconds = int (now - self .start_time ))
112+
113+ if string_out :
114+ return self .format_timedelta (elapsed_time )
115+ else :
116+ return elapsed_time
117+
118+
119+ # 实例化 Auth 类
120+ auth = Auth (config .bot_secret , str (config .appid ))
0 commit comments