-
Notifications
You must be signed in to change notification settings - Fork 72
Open
Description
建议在 app/api/routes.py 下的get_next_new_message接口返回who(chat_name)和chat_type ,看了一下大佬的源码。貌似只需要在app/wechat_adapter.py 的方法里添加chat_name和chat_type(大概在1501行哪里)加上就可以了;
另外,非科班码农没怎么玩过github(大专毕业上的培训班,两眼一抹黑),不知道这样提建议有没有得罪人,如有得罪,希望作者多多包涵,也一定要告诉我。
wechat_adapter.py :
msg_dict = {
'type': getattr(msg, 'type', 'unknown'),
'content': getattr(msg, 'content', str(msg)),
'sender': getattr(msg, 'sender', ''),
'time': getattr(msg, 'time', ''),
'id': getattr(msg, 'id', ''),
'mtype': getattr(msg, 'mtype', None),
'sender_remark': getattr(msg, 'sender_remark', None),
'file_path': getattr(msg, 'file_path', None),
'who': result.get('chat_name', getattr(msg, 'sender_remark', None)),
'chat_type': result.get('chat_type', 'unknown'),
}routes.py -> get_next_new_message :
@api_bp.route('/message/get-next-new', methods=['GET'])
@require_api_key
def get_next_new_message():
wx_instance = wechat_manager.get_instance()
if not wx_instance:
logger.error("微信未初始化")
return jsonify({
'code': 2001,
'message': '微信未初始化',
'data': None
}), 400
try:
# 更灵活的布尔值处理
def parse_bool(value):
if isinstance(value, bool):
return value
if isinstance(value, str):
value = value.lower()
if value in ('true', '1', 'yes', 'y', 'on'):
return True
if value in ('false', '0', 'no', 'n', 'off'):
return False
return False
# 获取参数并设置默认值
savepic = parse_bool(request.args.get('savepic', 'false'))
savevideo = parse_bool(request.args.get('savevideo', 'false'))
savefile = parse_bool(request.args.get('savefile', 'false'))
savevoice = parse_bool(request.args.get('savevoice', 'false'))
parseurl = parse_bool(request.args.get('parseurl', 'false'))
logger.debug(f"处理参数: savepic={savepic}, savevideo={savevideo}, savefile={savefile}, savevoice={savevoice}, parseurl={parseurl}")
# 获取当前使用的库
lib_name = getattr(wx_instance, '_lib_name', 'wxauto')
logger.debug(f"当前使用的库: {lib_name}")
# 根据不同的库构建不同的参数
if lib_name == 'wxautox':
# wxautox的GetNextNewMessage只支持filter_mute参数
params = {
'filter_mute': False # 默认不过滤免打扰消息
}
logger.debug(f"使用wxautox参数: {params}")
else:
# wxauto的GetNextNewMessage可能不支持任何参数,使用空参数
params = {}
logger.debug(f"使用wxauto参数: {params}")
# 不再设置wxauto保存路径,避免导入错误
logger.debug("跳过wxauto保存路径设置,使用默认路径")
# 调用GetNextNewMessage方法
try:
messages = wx_instance.GetNextNewMessage(**params)
except Exception as e:
logger.error(f"获取新消息失败: {str(e)}")
# 如果出现异常,返回空字典表示没有新消息
messages = {}
if not messages:
return jsonify({
'code': 0,
'message': '没有新消息',
'data': {'messages': {}}
})
# 辅助函数:清理群名中的人数信息
def clean_group_name(name):
# 匹配群名后面的 (数字) 模式
import re
return re.sub(r'\s*\(\d+\)$', '', name)
# 格式化消息 - 处理不同库的返回格式
formatted_messages = {}
# 检查当前使用的库
lib_name = wx_instance.get_lib_name() if hasattr(wx_instance, 'get_lib_name') else 'wxauto'
if lib_name == "wxautox":
# wxautox返回格式: {'chat_name': 'name', 'chat_type': 'type', 'msg': [messages]}
if isinstance(messages, dict) and 'chat_name' in messages and 'msg' in messages:
chat_name = messages.get('chat_name', 'Unknown')
msg_list = messages.get('msg', [])
# 清理群名中的人数信息
clean_name = clean_group_name(chat_name)
formatted_messages[clean_name] = []
for msg in msg_list:
try:
# 检查msg是否是对象
if hasattr(msg, 'type'):
# 检查消息类型
if hasattr(msg, 'type') and getattr(msg, 'type', '') in ['image', 'file', 'video', 'voice']:
# 检查文件是否存在且大小大于0
if hasattr(msg, 'file_path') and getattr(msg, 'file_path', ''):
try:
file_path = getattr(msg, 'file_path', '')
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
logger.warning(f"文件不存在或大小为0: {file_path}")
except Exception as e:
logger.error(f"检查文件失败: {str(e)}")
formatted_messages[clean_name].append({
'type': getattr(msg, 'type', 'unknown'),
'content': getattr(msg, 'content', str(msg)),
'sender': getattr(msg, 'sender', ''),
'id': getattr(msg, 'id', ''),
'mtype': getattr(msg, 'mtype', None),
'sender_remark': getattr(msg, 'sender_remark', None),
'file_path': getattr(msg, 'file_path', None),
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
else:
# 如果msg是字符串或其他类型,转换为文本消息
formatted_messages[clean_name].append({
'type': 'text',
'content': str(msg),
'sender': '',
'id': '',
'mtype': None,
'sender_remark': None,
'file_path': None,
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
except Exception as e:
logger.error(f"处理消息时出错: {str(e)}")
# 添加错误消息
formatted_messages[clean_name].append({
'type': 'error',
'content': f'消息处理错误: {str(e)}',
'sender': '',
'id': '',
'mtype': None,
'sender_remark': None,
'file_path': None,
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
else:
logger.warning(f"wxautox返回了意外的消息格式: {type(messages)}")
else:
# wxauto处理
if isinstance(messages, list):
# wxauto返回列表格式
formatted_messages["新消息"] = []
for msg in messages:
try:
# 检查msg是否已经是字典格式(适配器已转换)
if isinstance(msg, dict):
# 已经是字典格式,直接使用
logger.debug(f"使用wxauto返回的格式1: {msg}")
msg_data = {
'type': msg.get('type', 'unknown'),
'content': msg.get('content', str(msg)),
'sender': msg.get('sender', ''),
'time': msg.get('time', ''),
'id': msg.get('id', ''),
'mtype': msg.get('mtype', None),
'sender_remark': msg.get('sender_remark', None),
'file_path': msg.get('file_path', None),
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
}
else:
# 原始消息对象,需要转换
# 检查消息类型
if hasattr(msg, 'type') and getattr(msg, 'type', '') in ['image', 'file', 'video', 'voice']:
# 检查文件是否存在且大小大于0
if hasattr(msg, 'file_path') and getattr(msg, 'file_path', ''):
try:
file_path = getattr(msg, 'file_path', '')
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
logger.warning(f"文件不存在或大小为0: {file_path}")
except Exception as e:
logger.error(f"检查文件失败: {str(e)}")
logger.debug(f"使用wxauto返回的格式2: {msg}")
msg_data = {
'type': getattr(msg, 'type', 'unknown'),
'content': getattr(msg, 'content', str(msg)),
'sender': getattr(msg, 'sender', ''),
'time': getattr(msg, 'time', ''),
'id': getattr(msg, 'id', ''),
'mtype': getattr(msg, 'mtype', None),
'sender_remark': getattr(msg, 'sender_remark', None),
'file_path': getattr(msg, 'file_path', None),
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
}
formatted_messages["新消息"].append(msg_data)
except Exception as e:
logger.debug(f"使用wxauto返回的格式3: {msg}")
logger.error(f"处理wxauto消息时出错: {str(e)}")
# 添加错误消息
formatted_messages["新消息"].append({
'type': 'error',
'content': f'消息处理错误: {str(e)}',
'sender': '',
'time': '',
'id': '',
'mtype': None,
'sender_remark': None,
'file_path': None,
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
elif isinstance(messages, dict):
# 处理字典格式
# 检查是否是wxautox格式 {chat_name: str, chat_type: str, msg: [messages]}
if 'msg' in messages and isinstance(messages['msg'], list):
# 这是wxautox格式
chat_name = messages.get('chat_name', '未知聊天')
clean_name = clean_group_name(chat_name)
formatted_messages[clean_name] = []
for msg in messages['msg']:
try:
# 检查消息类型
if hasattr(msg, 'type') and getattr(msg, 'type', '') in ['image', 'file', 'video', 'voice']:
# 检查文件是否存在且大小大于0
if hasattr(msg, 'file_path') and getattr(msg, 'file_path', ''):
try:
file_path = getattr(msg, 'file_path', '')
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
logger.warning(f"文件不存在或大小为0: {file_path}")
except Exception as e:
logger.error(f"检查文件失败: {str(e)}")
logger.debug(f"使用wxauto返回的格式4: {msg}")
formatted_messages[clean_name].append({
'type': getattr(msg, 'type', 'unknown'),
'content': getattr(msg, 'content', str(msg)),
'sender': getattr(msg, 'sender', ''),
'time': getattr(msg, 'time', ''),
'id': getattr(msg, 'id', ''),
'mtype': getattr(msg, 'mtype', None),
'sender_remark': getattr(msg, 'sender_remark', None),
'file_path': getattr(msg, 'file_path', None),
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
except Exception as e:
logger.error(f"处理wxautox消息时出错: {str(e)}")
# 添加错误消息
logger.debug(f"使用wxauto返回的格式5: {msg}")
formatted_messages[clean_name].append({
'type': 'error',
'content': f'消息处理错误: {str(e)}',
'sender': '',
'time': '',
'id': '',
'mtype': None,
'sender_remark': None,
'file_path': None,
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
else:
# 可能是其他字典格式: {chat_name: [messages]}
for chat_name, msg_list in messages.items():
if isinstance(msg_list, list):
# 清理群名中的人数信息
clean_name = clean_group_name(chat_name)
formatted_messages[clean_name] = []
for msg in msg_list:
try:
# 检查消息类型
if hasattr(msg, 'type') and getattr(msg, 'type', '') in ['image', 'file', 'video', 'voice']:
# 检查文件是否存在且大小大于0
if hasattr(msg, 'file_path') and getattr(msg, 'file_path', ''):
try:
file_path = getattr(msg, 'file_path', '')
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
logger.warning(f"文件不存在或大小为0: {file_path}")
except Exception as e:
logger.error(f"检查文件失败: {str(e)}")
logger.debug(f"使用wxauto返回的格式6: {msg}")
formatted_messages[clean_name].append({
'type': getattr(msg, 'type', 'unknown'),
'content': getattr(msg, 'content', str(msg)),
'sender': getattr(msg, 'sender', ''),
'time': getattr(msg, 'time', ''),
'id': getattr(msg, 'id', ''),
'mtype': getattr(msg, 'mtype', None),
'sender_remark': getattr(msg, 'sender_remark', None),
'file_path': getattr(msg, 'file_path', None),
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
except Exception as e:
logger.error(f"处理wxauto字典消息时出错: {str(e)}")
# 添加错误消息
logger.debug(f"使用wxauto返回的格式7: {msg}")
formatted_messages[clean_name].append({
'type': 'error',
'content': f'消息处理错误: {str(e)}',
'sender': '',
'time': '',
'id': '',
'mtype': None,
'sender_remark': None,
'file_path': None,
'who': getattr(msg, 'chat_name', ''),
'chat_type': getattr(msg, 'chat_type', 'unknown')
})
else:
# 其他格式,转换为字符串
formatted_messages = {"消息": [{"type": "text", "content": str(messages)}]}
return jsonify({
'code': 0,
'message': '获取成功',
'data': {
'messages': formatted_messages
}
})
except Exception as e:
logger.error(f"获取新消息失败: {str(e)}", exc_info=True)
return jsonify({
'code': 3002,
'message': f'获取失败: {str(e)}',
'data': None
}), 500Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels