forked from TonyJiangWJ/mimotion
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
328 lines (292 loc) · 12.5 KB
/
main.py
File metadata and controls
328 lines (292 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# -*- coding: utf8 -*-
import math
import traceback
from datetime import datetime
import pytz
import uuid
import json
import random
import re
import time
import os
from util.aes_help import encrypt_data, decrypt_data
import util.zepp_helper as zeppHelper
import util.push_util as push_util
# 获取默认值转int
def get_int_value_default(_config: dict, _key, default):
_config.setdefault(_key, default)
try:
return int(_config.get(_key))
except (ValueError, TypeError):
print(f"配置项{_key}不是有效整数,使用默认值{default}")
return default
# 移除时间打折逻辑,直接返回配置的最小/最大步数
def get_min_max_by_time():
min_step = get_int_value_default(config, 'MIN_STEP', 18000)
max_step = get_int_value_default(config, 'MAX_STEP', 25000)
# 确保最小值小于最大值,避免随机数生成异常
if min_step >= max_step:
print(f"警告:MIN_STEP({min_step}) 大于等于 MAX_STEP({max_step}),已自动调整为 MIN_STEP={max_step - 1000}, MAX_STEP={max_step}")
min_step = max_step - 1000
return min_step, max_step
# 虚拟ip地址
def fake_ip():
# 随便找的国内IP段:223.64.0.0 - 223.117.255.255
return f"{223}.{random.randint(64, 117)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
# 账号脱敏
def desensitize_user_name(user):
if len(user) <= 8:
ln = max(math.floor(len(user) / 3), 1)
return f'{user[:ln]}***{user[-ln:]}'
return f'{user[:3]}****{user[-4:]}'
# 获取北京时间
def get_beijing_time():
target_timezone = pytz.timezone('Asia/Shanghai')
# 获取当前时间
return datetime.now().astimezone(target_timezone)
# 格式化时间
def format_now():
return get_beijing_time().strftime("%Y-%m-%d %H:%M:%S")
# 获取时间戳
def get_time():
current_time = get_beijing_time()
return "%.0f" % (current_time.timestamp() * 1000)
# 获取登录code
def get_access_token(location):
code_pattern = re.compile("(?<=access=).*?(?=&)")
result = code_pattern.findall(location)
if result is None or len(result) == 0:
return None
return result[0]
def get_error_code(location):
code_pattern = re.compile("(?<=error=).*?(?=&)")
result = code_pattern.findall(location)
if result is None or len(result) == 0:
return None
return result[0]
class MiMotionRunner:
def __init__(self, _user, _passwd):
self.user_id = None
self.device_id = str(uuid.uuid4())
user = str(_user)
password = str(_passwd)
self.invalid = False
self.log_str = ""
if user == '' or password == '':
self.error = "用户名或密码填写有误!"
self.invalid = True
pass
self.password = password
if (user.startswith("+86")) or "@" in user:
user = user
else:
user = "+86" + user
if user.startswith("+86"):
self.is_phone = True
else:
self.is_phone = False
self.user = user
# self.fake_ip_addr = fake_ip()
# self.log_str += f"创建虚拟ip地址:{self.fake_ip_addr}\n"
# 登录
def login(self):
user_token_info = user_tokens.get(self.user)
if user_token_info is not None:
access_token = user_token_info.get("access_token")
login_token = user_token_info.get("login_token")
app_token = user_token_info.get("app_token")
self.device_id = user_token_info.get("device_id")
self.user_id = user_token_info.get("user_id")
if self.device_id is None:
self.device_id = str(uuid.uuid4())
user_token_info["device_id"] = self.device_id
ok, msg = zeppHelper.check_app_token(app_token)
if ok:
self.log_str += "使用加密保存的app_token\n"
return app_token
else:
self.log_str += f"app_token失效 重新获取 last grant time: {user_token_info.get('app_token_time')}\n"
# 检查login_token是否可用
app_token, msg = zeppHelper.grant_app_token(login_token)
if app_token is None:
self.log_str += f"login_token 失效 重新获取 last grant time: {user_token_info.get('login_token_time')}\n"
login_token, app_token, user_id, msg = zeppHelper.grant_login_tokens(access_token, self.device_id,
self.is_phone)
if login_token is None:
self.log_str += f"access_token 已失效:{msg} last grant time:{user_token_info.get('access_token_time')}\n"
else:
user_token_info["login_token"] = login_token
user_token_info["app_token"] = app_token
user_token_info["user_id"] = user_id
user_token_info["login_token_time"] = get_time()
user_token_info["app_token_time"] = get_time()
self.user_id = user_id
return app_token
else:
self.log_str += "重新获取app_token成功\n"
user_token_info["app_token"] = app_token
user_token_info["app_token_time"] = get_time()
return app_token
# access_token 失效 或者没有保存加密数据
access_token, msg = zeppHelper.login_access_token(self.user, self.password)
if access_token is None:
self.log_str += "登录获取accessToken失败:%s" % msg
return None
# print(f"device_id:{self.device_id} isPhone: {self.is_phone}")
login_token, app_token, user_id, msg = zeppHelper.grant_login_tokens(access_token, self.device_id,
self.is_phone)
if login_token is None:
self.log_str += f"登录提取的 access_token 无效:{msg}"
return None
user_token_info = dict()
user_token_info["access_token"] = access_token
user_token_info["login_token"] = login_token
user_token_info["app_token"] = app_token
user_token_info["user_id"] = user_id
# 记录token获取时间
user_token_info["access_token_time"] = get_time()
user_token_info["login_token_time"] = get_time()
user_token_info["app_token_time"] = get_time()
if self.device_id is None:
self.device_id = uuid.uuid4()
user_token_info["device_id"] = self.device_id
user_tokens[self.user] = user_token_info
return app_token
# 主函数
def login_and_post_step(self, min_step, max_step):
if self.invalid:
return "账号或密码配置有误", False
app_token = self.login()
if app_token is None:
return "登陆失败!", False
step = str(random.randint(min_step, max_step))
self.log_str += f"已设置为随机步数范围({min_step}~{max_step}) 随机值:{step}\n"
ok, msg = zeppHelper.post_fake_brand_data(step, app_token, self.user_id)
return f"修改步数({step})[" + msg + "]", ok
def run_single_account(total, idx, user_mi, passwd_mi):
idx_info = ""
if idx is not None:
idx_info = f"[{idx + 1}/{total}]"
log_str = f"[{format_now()}]\n{idx_info}账号:{desensitize_user_name(user_mi)}\n"
try:
runner = MiMotionRunner(user_mi, passwd_mi)
exec_msg, success = runner.login_and_post_step(min_step, max_step)
log_str += runner.log_str
log_str += f'{exec_msg}\n'
exec_result = {"user": user_mi, "success": success,
"msg": exec_msg}
except:
log_str += f"执行异常:{traceback.format_exc()}\n"
log_str += traceback.format_exc()
exec_result = {"user": user_mi, "success": False,
"msg": f"执行异常:{traceback.format_exc()}"}
print(log_str)
return exec_result
def execute():
user_list = users.split('#')
passwd_list = passwords.split('#')
exec_results = []
if len(user_list) == len(passwd_list):
idx, total = 0, len(user_list)
if use_concurrent:
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
exec_results = executor.map(lambda x: run_single_account(total, x[0], *x[1]),
enumerate(zip(user_list, passwd_list)))
else:
for user_mi, passwd_mi in zip(user_list, passwd_list):
exec_results.append(run_single_account(total, idx, user_mi, passwd_mi))
idx += 1
if idx < total:
# 每个账号之间间隔一定时间请求一次,避免接口请求过于频繁导致异常
time.sleep(sleep_seconds)
if encrypt_support:
persist_user_tokens()
success_count = 0
push_results = []
for result in exec_results:
push_results.append(result)
if result['success'] is True:
success_count += 1
summary = f"\n执行账号总数{total},成功:{success_count},失败:{total - success_count}"
print(summary)
push_util.push_results(push_results, summary, push_config)
else:
print(f"账号数长度[{len(user_list)}]和密码数长度[{len(passwd_list)}]不匹配,跳过执行")
exit(1)
def prepare_user_tokens() -> dict:
data_path = r"encrypted_tokens.data"
if os.path.exists(data_path):
with open(data_path, 'rb') as f:
data = f.read()
try:
decrypted_data = decrypt_data(data, aes_key, None)
# 假设原始明文为 UTF-8 编码文本
return json.loads(decrypted_data.decode('utf-8', errors='strict'))
except:
print("密钥不正确或者加密内容损坏 放弃token")
return dict()
else:
return dict()
def persist_user_tokens():
data_path = r"encrypted_tokens.data"
origin_str = json.dumps(user_tokens, ensure_ascii=False)
cipher_data = encrypt_data(origin_str.encode("utf-8"), aes_key, None)
with open(data_path, 'wb') as f:
f.write(cipher_data)
f.flush()
f.close()
if __name__ == "__main__":
# 北京时间
time_bj = get_beijing_time()
encrypt_support = False
user_tokens = dict()
if os.environ.__contains__("AES_KEY") is True:
aes_key = os.environ.get("AES_KEY")
if aes_key is not None:
aes_key = aes_key.encode('utf-8')
if len(aes_key) == 16:
encrypt_support = True
if encrypt_support:
user_tokens = prepare_user_tokens()
else:
print("AES_KEY未设置或者无效 无法使用加密保存功能")
if os.environ.__contains__("CONFIG") is False:
print("未配置CONFIG变量,无法执行")
exit(1)
else:
# region 初始化参数
config = dict()
try:
config = dict(json.loads(os.environ.get("CONFIG")))
except:
print("CONFIG格式不正确,请检查Secret配置,请严格按照JSON格式:使用双引号包裹字段和值,逗号不能多也不能少")
traceback.print_exc()
exit(1)
# 创建推送配置对象
push_config = push_util.PushConfig(
push_plus_token=config.get('PUSH_PLUS_TOKEN'),
push_plus_hour=config.get('PUSH_PLUS_HOUR'),
push_plus_max=get_int_value_default(config, 'PUSH_PLUS_MAX', 30),
push_wechat_webhook_key=config.get('PUSH_WECHAT_WEBHOOK_KEY'),
telegram_bot_token=config.get('TELEGRAM_BOT_TOKEN'),
telegram_chat_id=config.get('TELEGRAM_CHAT_ID')
)
sleep_seconds = config.get('SLEEP_GAP')
if sleep_seconds is None or sleep_seconds == '':
sleep_seconds = 5
sleep_seconds = float(sleep_seconds)
users = config.get('USER')
passwords = config.get('PWD')
if users is None or passwords is None:
print("未正确配置账号密码,无法执行")
exit(1)
min_step, max_step = get_min_max_by_time()
use_concurrent = config.get('USE_CONCURRENT')
if use_concurrent is not None and use_concurrent == 'True':
use_concurrent = True
else:
print(f"多账号执行间隔:{sleep_seconds}")
use_concurrent = False
# endregion
execute()