Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 262 additions & 49 deletions astrbot/core/pipeline/waking_check/stage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from collections import defaultdict
from typing import AsyncGenerator, Union

from astrbot import logger
from astrbot.core.message.components import At, AtAll, Reply
from astrbot.core.message.message_event_result import MessageChain, MessageEventResult
Expand All @@ -8,7 +9,6 @@
from astrbot.core.star.session_plugin_manager import SessionPluginManager
from astrbot.core.star.star import star_map
from astrbot.core.star.star_handler import EventType, star_handlers_registry

from ..context import PipelineContext
from ..stage import Stage, register_stage

Expand Down Expand Up @@ -46,6 +46,220 @@ async def initialize(self, ctx: PipelineContext) -> None:
"ignore_at_all", False
)

def _find_command_filter(self, handler):
"""查找command filter

Args:
handler (StarHandlerMetadata): handler 元数据对象
Returns:
CommandFilter | None: 找到则返回 CommandFilter 对象,否则返回 None
"""
for f in handler.event_filters:
if hasattr(f, "command_name"):
return f
return None
Comment on lines +57 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): 使用内置函数 next 而不是 for 循环 (use-next)

Suggested change
for f in handler.event_filters:
if hasattr(f, "command_name"):
return f
return None
return next(
(f for f in handler.event_filters if hasattr(f, "command_name")), None
)
Original comment in English

suggestion (code-quality): Use the built-in function next instead of a for-loop (use-next)

Suggested change
for f in handler.event_filters:
if hasattr(f, "command_name"):
return f
return None
return next(
(f for f in handler.event_filters if hasattr(f, "command_name")), None
)


async def _find_best_command_handlers(self, command_handlers, event):
"""查找最长匹配的handler

Args:
command_handlers (List[Tuple[StarHandlerMetadata, CommandFilter]]): 候选的指令 handler 列表
event (AstrMessageEvent): 消息事件对象
Returns:
StarHandlerMetadata | None: 找到则返回 handler 元数据对象,否则返回 None
"""

if not event.is_at_or_wake_command:
return None

message_str = re.sub(r"\s+", " ", event.get_message_str().strip())
best_match = None
best_length = 0
best_params = None

# 找到所有可能的匹配
for handler, command_filter in command_handlers:
match_result = self._match_command(command_filter, message_str, event)
if match_result:
Comment on lines +82 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): 使用命名表达式简化赋值和条件 (use-named-expression)

Suggested change
match_result = self._match_command(command_filter, message_str, event)
if match_result:
if match_result := self._match_command(
command_filter, message_str, event
):
Original comment in English

suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)

Suggested change
match_result = self._match_command(command_filter, message_str, event)
if match_result:
if match_result := self._match_command(
command_filter, message_str, event
):

match_length, parsed_params = match_result
if match_length > best_length:
best_length = match_length
best_match = handler
best_params = parsed_params

if best_match:
# 解析的参数
if best_params is not None:
event.set_extra("parsed_params", best_params)

# 还需要执行完整的filter检查
result = await self._check_handler_filters(best_match, event)
if result == "activate":
return best_match
elif result == "permission_error":
raise PermissionError("权限不足")

return None

def _match_command(self, command_filter, message_str, event):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 考虑提取一个辅助函数来生成所有完整的命令字符串,并重构命令匹配和冲突检测以使用它,从而减少重复。

这里有两个小的重构,可以在不改变行为的情况下,折叠 _match_command_detect_command_conflicts 中嵌套的循环和重复的逻辑:

  1. 提取一个辅助函数,从 CommandFilter 构建所有“完整命令”:
def _full_command_list(command_filter):
    parts = []
    for base in [command_filter.command_name] + list(command_filter.alias):
        for parent in command_filter.parent_command_names:
            parts.append(f"{parent} {base}" if parent else base)
    return parts
  1. 使用单个正则表达式传递来简化 _match_command,而不是三个 if 分支:
import re

def _match_command(self, command_filter, message_str, event):
    if not command_filter.custom_filter_ok(event, self.ctx.astrbot_config):
        return None

    commands = _full_command_list(command_filter)
    # build: ^(?:cmd1|cmd2)(?:\s+(.*))?$
    pat = re.compile(rf"^(?:{'|'.join(map(re.escape, commands))})(?:\s+(.*))?$")
    m = pat.match(message_str)
    if not m:
        return None

    raw_args = m.group(1) or ""
    params = raw_args.split() if raw_args.strip() else []
    try:
        parsed = command_filter.validate_and_convert_params(params, command_filter.handler_params)
        return (len(m.group(0).split()[0]), parsed)
    except ValueError:
        return None
  1. _detect_command_conflicts 中重用 _full_command_list
def _detect_command_conflicts(self, command_handlers):
    command_map = defaultdict(list)
    for handler, cf in command_handlers:
        plugin = star_map.get(handler.handler_module_path, {}).name or "unknown"
        for full_cmd in _full_command_list(cf):
            command_map[full_cmd].append((handler, plugin))
    for cmd, lst in command_map.items():
        if len(lst) > 1:
            logger.warning(f"冲突: '{cmd}' -> " +
                ", ".join(f"{p}.{h.handler_full_name}" for h, p in lst))

这些更改

  • 移除了 _match_command 中的三个独立分支
  • 消除了别名/父级上的重复循环
  • 在匹配和冲突检测之间共享命令生成逻辑

—同时保持了相同的行为。

Original comment in English

issue (complexity): Consider extracting a helper to generate all full command strings and refactoring command matching and conflict detection to use it for reduced duplication.

Here are two small refactorings that can collapse the nested loops and repeated logic in both _match_command and _detect_command_conflicts without changing behavior:

  1. Extract a helper to build all “full commands” from a CommandFilter:
def _full_command_list(command_filter):
    parts = []
    for base in [command_filter.command_name] + list(command_filter.alias):
        for parent in command_filter.parent_command_names:
            parts.append(f"{parent} {base}" if parent else base)
    return parts
  1. Simplify _match_command with a single regex pass instead of three if-branches:
import re

def _match_command(self, command_filter, message_str, event):
    if not command_filter.custom_filter_ok(event, self.ctx.astrbot_config):
        return None

    commands = _full_command_list(command_filter)
    # build: ^(?:cmd1|cmd2)(?:\s+(.*))?$
    pat = re.compile(rf"^(?:{'|'.join(map(re.escape, commands))})(?:\s+(.*))?$")
    m = pat.match(message_str)
    if not m:
        return None

    raw_args = m.group(1) or ""
    params = raw_args.split() if raw_args.strip() else []
    try:
        parsed = command_filter.validate_and_convert_params(params, command_filter.handler_params)
        return (len(m.group(0).split()[0]), parsed)
    except ValueError:
        return None
  1. Reuse _full_command_list in _detect_command_conflicts:
def _detect_command_conflicts(self, command_handlers):
    command_map = defaultdict(list)
    for handler, cf in command_handlers:
        plugin = star_map.get(handler.handler_module_path, {}).name or "unknown"
        for full_cmd in _full_command_list(cf):
            command_map[full_cmd].append((handler, plugin))
    for cmd, lst in command_map.items():
        if len(lst) > 1:
            logger.warning(f"冲突: '{cmd}' -> " +
                ", ".join(f"{p}.{h.handler_full_name}" for h, p in lst))

These two changes

  • Remove the three separate branches in _match_command
  • Eliminate duplicate loops over aliases/parents
  • Share command-generation logic between matching and conflict detection

—while keeping identical behavior.

"""匹配指令

Args:
command_filter (CommandFilter): 指令过滤器对象
message_str (str): 消息字符串
event (AstrMessageEvent): 消息事件对象
Returns:
Tuple[int, dict] | None: 如果匹配成功,返回 (匹配长度, 解析参数);否则返回 None
"""
# 检查自定义过滤器
if not command_filter.custom_filter_ok(event, self.ctx.astrbot_config):
return None

candidates = [command_filter.command_name] + list(command_filter.alias)

for candidate in candidates:
for parent_command_name in command_filter.parent_command_names:
if parent_command_name:
full_command = f"{parent_command_name} {candidate}"
else:
full_command = candidate

matched_params = None
if message_str == full_command:
# 完全匹配,无参数
try:
matched_params = command_filter.validate_and_convert_params(
[], command_filter.handler_params
)
return (len(full_command), matched_params)
except ValueError:
continue
elif message_str.startswith(full_command + " "):
# 前缀匹配,有参数
param_str = message_str[len(full_command) :].strip()
params_list = [p for p in param_str.split(" ") if p]
try:
matched_params = command_filter.validate_and_convert_params(
params_list, command_filter.handler_params
)
return (len(full_command), matched_params)
except ValueError:
continue
elif message_str.startswith(full_command) and len(message_str) > len(
full_command
):
# 前缀匹配 忘了加空格的情况
param_str = message_str[len(full_command) :]
# 将整个剩余部分作为一个参数
params_list = [param_str] if param_str else []
try:
matched_params = command_filter.validate_and_convert_params(
params_list, command_filter.handler_params
)
return (len(full_command), matched_params)
except ValueError:
continue

return None

async def _check_handler_filters(self, handler, event):
"""检查处理器的所有过滤器

Args:
handler (StarHandlerMetadata): handler 元数据对象
event (AstrMessageEvent): 消息事件对象
Returns:
"activate": 通过所有检查,可以激活
"skip": 跳过(权限不足但不报错,或其他过滤器不通过)
"permission_error": 权限不足且需要报错
"""
permission_not_pass = False
permission_filter_raise_error = False

for filter_obj in handler.event_filters:
try:
if isinstance(filter_obj, PermissionTypeFilter):
if not filter_obj.filter(event, self.ctx.astrbot_config):
permission_not_pass = True
permission_filter_raise_error = filter_obj.raise_error
elif hasattr(filter_obj, "command_name"):
# 不需要再检查 command filter
continue
else:
Comment on lines +179 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 过滤器检查中的异常处理可能会掩盖底层问题。

记录带有堆栈跟踪的异常将有助于识别过滤器实现中的问题。对于关键错误,请考虑重新抛出而不是仅仅通知用户。

Original comment in English

suggestion (bug_risk): Exception handling in filter checks may mask underlying issues.

Logging exceptions with traceback will help identify issues in filter implementations. For critical errors, consider re-raising instead of only notifying the user.

if not filter_obj.filter(event, self.ctx.astrbot_config):
return "skip"
except Exception as e:
await event.send(
MessageEventResult().message(
f"插件 {star_map[handler.handler_module_path].name}: {e}"
)
)
event.stop_event()
return "skip"

# 处理权限检查结果
if permission_not_pass:
if not permission_filter_raise_error:
return "skip"

if self.no_permission_reply:
await event.send(
MessageChain().message(
f"您(ID: {event.get_sender_id()})的权限不足以使用此指令。通过 /sid 获取 ID 并请管理员添加。"
)
)
logger.info(
f"触发 {star_map[handler.handler_module_path].name} 时, 用户(ID={event.get_sender_id()}) 权限不足。"
)
return "permission_error"

return "activate"

def _detect_command_conflicts(self, command_handlers):
"""检测指令冲突

Args:
command_handlers (List[Tuple[StarHandlerMetadata, CommandFilter]]): 候选的指令 handler 列表
"""
# 完整指令名 -> [(handler, plugin_name)] 映射
command_map = defaultdict(list)

for handler, command_filter in command_handlers:
star_metadata = star_map.get(handler.handler_module_path)
if star_metadata:
Comment on lines +228 to +229
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): 使用命名表达式简化赋值和条件 (use-named-expression)

Suggested change
star_metadata = star_map.get(handler.handler_module_path)
if star_metadata:
if star_metadata := star_map.get(handler.handler_module_path):
Original comment in English

suggestion (code-quality): Use named expression to simplify assignment and conditional (use-named-expression)

Suggested change
star_metadata = star_map.get(handler.handler_module_path)
if star_metadata:
if star_metadata := star_map.get(handler.handler_module_path):

plugin_name = star_metadata.name
else:
plugin_name = "不知道是哪个插件"

# 所有可能的指令名
candidates = [command_filter.command_name] + list(command_filter.alias)

for candidate in candidates:
for parent_command_name in command_filter.parent_command_names:
if parent_command_name:
full_command = f"{parent_command_name} {candidate}"
else:
full_command = candidate

command_map[full_command].append((handler, plugin_name, candidate))

# 检查冲突
conflicts_detected = False
for command_name, handlers_list in command_map.items():
if len(handlers_list) > 1:
if not conflicts_detected:
logger.warning("检测到指令名冲突!")
conflicts_detected = True

conflict_info = []
for handler, plugin_name, original_command in handlers_list:
conflict_info.append(
f"插件 '{plugin_name}' 的指令 '{original_command}'"
)
logger.warning(f"指令 '{command_name}' 存在冲突:")
for info in conflict_info:
logger.warning(f" - {info}")

async def process(
self, event: AstrMessageEvent
) -> Union[None, AsyncGenerator[None, None]]:
Expand Down Expand Up @@ -110,73 +324,72 @@ async def process(

# 检查插件的 handler filter
activated_handlers = []
handlers_parsed_params = {} # 注册了指令的 handler
handlers_parsed_params = {}

# 将 plugins_name 设置到 event 中
enabled_plugins_name = self.ctx.astrbot_config.get("plugin_set", ["*"])
if enabled_plugins_name == ["*"]:
# 如果是 *,则表示所有插件都启用
event.plugins_name = None
else:
event.plugins_name = enabled_plugins_name
logger.debug(f"enabled_plugins_name: {enabled_plugins_name}")

command_handlers = []
non_command_handlers = []

for handler in star_handlers_registry.get_handlers_by_event_type(
EventType.AdapterMessageEvent, plugins_name=event.plugins_name
):
# filter 需满足 AND 逻辑关系
passed = True
permission_not_pass = False
permission_filter_raise_error = False
if len(handler.event_filters) == 0:
continue

for filter in handler.event_filters:
try:
if isinstance(filter, PermissionTypeFilter):
if not filter.filter(event, self.ctx.astrbot_config):
permission_not_pass = True
permission_filter_raise_error = filter.raise_error
else:
if not filter.filter(event, self.ctx.astrbot_config):
passed = False
break
except Exception as e:
await event.send(
MessageEventResult().message(
f"插件 {star_map[handler.handler_module_path].name}: {e}"
)
)
event.stop_event()
passed = False
break
if passed:
if permission_not_pass:
if not permission_filter_raise_error:
# 跳过
continue
if self.no_permission_reply:
await event.send(
MessageChain().message(
f"您(ID: {event.get_sender_id()})的权限不足以使用此指令。通过 /sid 获取 ID 并请管理员添加。"
)
)
logger.info(
f"触发 {star_map[handler.handler_module_path].name} 时, 用户(ID={event.get_sender_id()}) 权限不足。"
)
event.stop_event()
return
# 检查是否为指令
command_filter = self._find_command_filter(handler)
if command_filter:
Comment on lines +346 to +348
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): 我们发现了这些问题:

Suggested change
# 检查是否为指令
command_filter = self._find_command_filter(handler)
if command_filter:
if command_filter := self._find_command_filter(handler):


解释
此函数的质量得分低于 25% 的质量阈值。
此得分是方法长度、认知复杂度和工作内存的组合。

您如何解决此问题?

重构此函数以使其更短、更易读可能是有益的。

  • 通过将功能块提取到它们自己的函数中来减少函数长度。这是您可以做的最重要的事情——理想情况下,一个函数应该少于 10 行。
  • 减少嵌套,或许可以通过引入守卫子句来尽早返回。
  • 确保变量作用域紧密,以便使用相关概念的代码在函数中紧密地放在一起,而不是分散开来。
Original comment in English

suggestion (code-quality): We've found these issues:

Suggested change
# 检查是否为指令
command_filter = self._find_command_filter(handler)
if command_filter:
if command_filter := self._find_command_filter(handler):


Explanation
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

command_handlers.append((handler, command_filter))
else:
non_command_handlers.append(handler)

is_wake = True
event.is_wake = True
self._detect_command_conflicts(command_handlers)

activated_handlers.append(handler)
# 指令
command_matched = False
try:
best_command_handler = await self._find_best_command_handlers(
command_handlers, event
)
if best_command_handler:
activated_handlers.append(best_command_handler)
if "parsed_params" in event.get_extra():
handlers_parsed_params[handler.handler_full_name] = event.get_extra(
"parsed_params"
handlers_parsed_params[best_command_handler.handler_full_name] = (
event.get_extra("parsed_params")
)
command_matched = True
is_wake = True
event.is_wake = True
except PermissionError:
event.stop_event()
return

event._extras.pop("parsed_params", None)

# 非指令
if not command_matched:
for handler in non_command_handlers:
result = await self._check_handler_filters(handler, event)
if result == "activate":
activated_handlers.append(handler)
if "parsed_params" in event.get_extra():
handlers_parsed_params[handler.handler_full_name] = (
event.get_extra("parsed_params")
)
is_wake = True
event.is_wake = True
elif result == "permission_error":
event.stop_event()
return

event._extras.pop("parsed_params", None)
event._extras.pop("parsed_params", None)

# 根据会话配置过滤插件处理器
activated_handlers = SessionPluginManager.filter_handlers_by_session(
Expand Down
7 changes: 6 additions & 1 deletion astrbot/core/star/filter/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .custom_filter import CustomFilter
from ..star_handler import StarHandlerMetadata


class GreedyStr(str):
"""标记指令完成其他参数接收后的所有剩余文本。"""

Expand Down Expand Up @@ -136,6 +137,10 @@ def validate_and_convert_params(
return result

def filter(self, event: AstrMessageEvent, cfg: AstrBotConfig) -> bool:
"""向后兼容

注意:实际的指令匹配逻辑已经移到 WakingCheckStage 中处理
"""
if not event.is_at_or_wake_command:
return False

Expand All @@ -159,7 +164,7 @@ def filter(self, event: AstrMessageEvent, cfg: AstrBotConfig) -> bool:
break
elif message_str.startswith(_full):
# 命令名后面无论是空格还是直接连参数都可以
message_str = message_str[len(_full):].lstrip()
message_str = message_str[len(_full) :].lstrip()
ok = True
break

Expand Down
Loading