-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_duplicate.py
More file actions
51 lines (42 loc) · 1.24 KB
/
test_duplicate.py
File metadata and controls
51 lines (42 loc) · 1.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""
测试重复处理问题
"""
import json
import asyncio
from schedule import schedule
# 模拟消息数据
test_message = {
"Data": {
"FromUserName": {"string": "12345@chatroom"},
"Content": {"string": "wxid_test:776434921968981 催件"},
"PushContent": "测试用户:776434921968981 催件",
"MsgType": 1,
"CreateTime": 1737012345
}
}
# 模拟群组数据
test_group_dict = {
"测试群": "12345@chatroom",
"申通售后群": "50066595729@chatroom"
}
async def test_duplicate():
"""测试是否会重复处理"""
print("=" * 60)
print("开始测试重复处理问题")
print("=" * 60)
message_json = json.dumps(test_message, indent=4, ensure_ascii=False)
print("\n测试消息:")
print(message_json)
print("\n开始处理消息...")
print("=" * 60)
# 调用一次 schedule 函数
await schedule(test_group_dict, message_json)
print("=" * 60)
print("处理完成")
print("=" * 60)
print("\n请检查:")
print("1. 是否向申通售后群发送了几次消息?")
print("2. Excel 中保存了几条数据?")
print("3. 控制台输出了几次'处理催件任务'?")
if __name__ == "__main__":
asyncio.run(test_duplicate())