Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions astrbot/core/platform/astr_message_event.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import asyncio
import copy
import hashlib
import re
import uuid
Expand Down Expand Up @@ -29,6 +30,9 @@


class AstrMessageEvent(abc.ABC):
# extras 中可安全清理的瞬态字段清单;子类可按需扩展
TRANSIENT_EXTRA_KEYS: set[str] = set()
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The class variable name TRANSIENT_EXTRA_KEYS could be more descriptive. Consider renaming to TRANSIENT_EXTRAS_TO_CLEAR or NON_PERSISTENT_EXTRA_KEYS to better convey that these keys will be removed during cloning.

# Keys in extras that should not persist across event cloning
TRANSIENT_EXTRAS_TO_CLEAR: set[str] = set()
Suggested change
TRANSIENT_EXTRA_KEYS: set[str] = set()
TRANSIENT_EXTRAS_TO_CLEAR: set[str] = set()

Copilot uses AI. Check for mistakes.

def __init__(
self,
message_str: str,
Expand Down Expand Up @@ -71,6 +75,8 @@ def __init__(

# back_compability
self.platform = platform_meta
# 可选的绕过标记,避免被 SessionWaiter 再次截获
self._bypass_session_waiter = False

def get_platform_name(self):
"""获取这个事件所属的平台的类型(如 aiocqhttp, slack, discord 等)。
Expand Down Expand Up @@ -278,6 +284,22 @@ def clear_result(self):
"""清除消息事件的结果。"""
self._result = None

def clone_for_llm(self) -> "AstrMessageEvent":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议通过把“extras 清理”和“LLM pipeline 状态重置”逻辑提取到独立的辅助方法中,来重构 clone_for_llm,使其职责更清晰、更聚焦。

你可以保持当前新增行为不变,但通过拆分职责、显式化意图,减少 clone_for_llm 目前略显“杂物箱”的感觉。

1. 抽取对瞬时 extras 的处理

目前只有 clone_for_llm 知道 TRANSIENT_EXTRA_KEYS 的存在。把这部分逻辑移到一个小 helper 中,可以让行为更清晰并方便复用:

class AstrMessageEvent(abc.ABC):
    TRANSIENT_EXTRA_KEYS: set[str] = set()

    # ...

    def _copy_extras_without_transient(self) -> dict[str, Any]:
        """Return a shallow copy of extras without transient keys."""
        new_extras = self._extras.copy()
        for key in self.TRANSIENT_EXTRA_KEYS:
            new_extras.pop(key, None)
        return new_extras

    def clear_transient_extras(self) -> None:
        """In-place removal of transient extras."""
        for key in self.TRANSIENT_EXTRA_KEYS:
            self._extras.pop(key, None)

这样 clone_for_llm 就不需要直接去操作 _extras

    def clone_for_llm(self) -> "AstrMessageEvent":
        """浅拷贝并重置状态,以便重新走默认 LLM 流程。"""
        new_event: AstrMessageEvent = copy.copy(self)
        new_event.clear_result()
        new_event._extras = self._copy_extras_without_transient()
        self._reset_llm_pipeline_state(new_event)
        return new_event

2. 把状态重置逻辑集中到专用 helper

那段一次性重置多个标志位的代码可以提取成一个语义更明确的 helper,这样未来如果 LLM pipeline 状态有新增字段,只需要在这一处更新,而不用在 clone_for_llm 内联维护:

class AstrMessageEvent(abc.ABC):
    # ...

    def _reset_llm_pipeline_state(self) -> None:
        """Reset state so this event can re-enter the default LLM pipeline."""
        self._has_send_oper = False
        self.call_llm = False
        self.is_wake = False
        self.is_at_or_wake_command = False
        self.plugins_name = None
        self._bypass_session_waiter = False

这样 clone_for_llm 就可以写成:

    def clone_for_llm(self) -> "AstrMessageEvent":
        """浅拷贝并重置状态,以便重新走默认 LLM 流程。"""
        new_event: AstrMessageEvent = copy.copy(self)
        new_event.clear_result()
        new_event._extras = self._copy_extras_without_transient()
        new_event._reset_llm_pipeline_state()
        return new_event

这样的拆分在保留当前行为的同时,让下面两类操作:

  • “克隆 + extras 清理”,以及
  • “重置 LLM pipeline 状态”

都变得更加显式且可以分别进行测试。如果你之后决定把 _bypass_session_waiter 从核心事件中剥离出去,也会更容易做到这一点。

Original comment in English

issue (complexity): Consider refactoring clone_for_llm by extracting extras-cleanup and LLM-pipeline state-reset logic into dedicated helper methods to make its responsibilities clearer and more focused.

You can keep the new behavior but reduce the “grab‑bag” feel of clone_for_llm by splitting responsibilities and making the intent explicit.

1. Factor out transient extras handling

Right now clone_for_llm is the only place that knows about TRANSIENT_EXTRA_KEYS. Moving this into a small helper makes the behavior clearer and reusable:

class AstrMessageEvent(abc.ABC):
    TRANSIENT_EXTRA_KEYS: set[str] = set()

    # ...

    def _copy_extras_without_transient(self) -> dict[str, Any]:
        """Return a shallow copy of extras without transient keys."""
        new_extras = self._extras.copy()
        for key in self.TRANSIENT_EXTRA_KEYS:
            new_extras.pop(key, None)
        return new_extras

    def clear_transient_extras(self) -> None:
        """In-place removal of transient extras."""
        for key in self.TRANSIENT_EXTRA_KEYS:
            self._extras.pop(key, None)

Then clone_for_llm doesn’t need to manually fiddle with _extras:

    def clone_for_llm(self) -> "AstrMessageEvent":
        """浅拷贝并重置状态,以便重新走默认 LLM 流程。"""
        new_event: AstrMessageEvent = copy.copy(self)
        new_event.clear_result()
        new_event._extras = self._copy_extras_without_transient()
        self._reset_llm_pipeline_state(new_event)
        return new_event

2. Factor state resets into a dedicated helper

The block that resets multiple flags can be made intention‑revealing and centralized, so future state additions only need to be updated in one “LLM pipeline” helper instead of inline in clone_for_llm:

class AstrMessageEvent(abc.ABC):
    # ...

    def _reset_llm_pipeline_state(self) -> None:
        """Reset state so this event can re-enter the default LLM pipeline."""
        self._has_send_oper = False
        self.call_llm = False
        self.is_wake = False
        self.is_at_or_wake_command = False
        self.plugins_name = None
        self._bypass_session_waiter = False

Then clone_for_llm becomes:

    def clone_for_llm(self) -> "AstrMessageEvent":
        """浅拷贝并重置状态,以便重新走默认 LLM 流程。"""
        new_event: AstrMessageEvent = copy.copy(self)
        new_event.clear_result()
        new_event._extras = self._copy_extras_without_transient()
        new_event._reset_llm_pipeline_state()
        return new_event

This preserves all current behavior but makes the operations:

  • “clone + extras cleanup” and
  • “reset LLM pipeline state”

explicit and independently testable. It also makes it easier to later move _bypass_session_waiter handling out of the core event if you decide to decouple that concern.

"""浅拷贝并重置状态,以便重新走默认 LLM 流程。"""
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete docstring: The method does more than described. It should document:

  1. What state is reset (result, flags like call_llm, is_wake, etc.)
  2. How extras are handled (non-transient keys are preserved)
  3. The return value

Suggested improvement:

"""浅拷贝并重置状态,以便重新走默认 LLM 流程。
    
清除结果、发送操作标记、唤醒状态等,同时保留非瞬态的 extras 字段。
    
Returns:
    AstrMessageEvent: 重置后的事件副本,可安全地重新进入 LLM 处理流程
"""
Suggested change
"""浅拷贝并重置状态,以便重新走默认 LLM 流程。"""
"""
浅拷贝并重置状态以便重新走默认 LLM 流程
- 清除事件结果_result
- 重置发送操作标记_has_send_oper)、LLM 调用标记call_llm)、唤醒状态is_wake)、命令唤醒标记is_at_or_wake_command)、插件名plugins_name)、会话等待绕过标记_bypass_session_waiter
- 保留非瞬态的 extras 字段_extras),移除瞬态字段TRANSIENT_EXTRA_KEYS以避免跨管线上下文丢失
Returns:
AstrMessageEvent: 重置后的事件副本可安全地重新进入 LLM 处理流程
"""

Copilot uses AI. Check for mistakes.
new_event: AstrMessageEvent = copy.copy(self)
Comment on lines +288 to +289
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shallow copy with copy.copy() may cause issues with mutable attributes. The message_obj.message list is shared between the original and cloned event. If the clone modifies the message chain (like inserting an At component at line 95-97 in packages/session_controller/main.py), it affects the original event.

Consider using copy.deepcopy() instead, or at least document this limitation:

new_event: AstrMessageEvent = copy.deepcopy(self)

Or add a warning comment:

# Warning: Shallow copy means message_obj and its contents are shared
new_event: AstrMessageEvent = copy.copy(self)
Suggested change
"""浅拷贝并重置状态,以便重新走默认 LLM 流程。"""
new_event: AstrMessageEvent = copy.copy(self)
"""深拷贝并重置状态,以便重新走默认 LLM 流程。"""
new_event: AstrMessageEvent = copy.deepcopy(self)

Copilot uses AI. Check for mistakes.
new_event.clear_result()
# 保留非瞬态 extras,避免跨管线上下文丢失
new_event._extras = self._extras.copy()
for key in self.TRANSIENT_EXTRA_KEYS:
new_event._extras.pop(key, None)
new_event._has_send_oper = False
new_event.call_llm = False
new_event.is_wake = False
new_event.is_at_or_wake_command = False
new_event.plugins_name = None
new_event._bypass_session_waiter = False
return new_event
Comment on lines +287 to +301
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new clone_for_llm method lacks test coverage. This method handles state reset logic for event re-processing, with several fields that must be correctly cleared or preserved.

Consider adding tests to verify:

  1. Result is cleared
  2. Event flags are reset (_has_send_oper, call_llm, is_wake, etc.)
  3. Non-transient extras are preserved
  4. Transient extras (if any in TRANSIENT_EXTRA_KEYS) are removed
  5. _bypass_session_waiter is reset to False
  6. Cloned event is independent (mutations don't affect original)

Copilot uses AI. Check for mistakes.

"""消息链相关"""

def make_result(self) -> MessageEventResult:
Expand Down
40 changes: 37 additions & 3 deletions astrbot/core/utils/session_waiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any

import astrbot.core.message.components as Comp
from astrbot import logger
from astrbot.core.platform import AstrMessageEvent

USER_SESSIONS: dict[str, "SessionWaiter"] = {} # 存储 SessionWaiter 实例
Expand All @@ -29,6 +30,33 @@ def __init__(self):

self.history_chains: list[list[Comp.BaseMessageComponent]] = []

def fallback_to_llm(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议对 fallback_to_llm 以及相关 API 进行重构,以便更好地分离职责、明确路由行为,并稍微简化 trigger 的控制流,让逻辑更容易被理解。

主要的复杂度来自 SessionController.fallback_to_llm 同时承担了多种职责,并依赖一个私有标志位。你可以通过拆分出更聚焦的辅助函数并让路由意图更加显式,在不改变现有行为的前提下降低复杂度。

1. 拆分 fallback_to_llm 内部的职责

保留当前的对外 API,但把各种副作用的编排拆分到更小的方法中。这样更容易推理,并且如果未来有需要,也能为调用方提供更低层级的入口。

class SessionController:
    ...

    def _prepare_llm_fallback_event(
        self,
        event: AstrMessageEvent,
        *,
        stop_session: bool,
    ) -> AstrMessageEvent:
        new_event = event.clone_for_llm()
        # preserve behavior
        new_event._bypass_session_waiter = not stop_session
        return new_event

    def _apply_llm_fallback_side_effects(
        self,
        event_queue: asyncio.Queue,
        original_event: AstrMessageEvent,
        new_event: AstrMessageEvent,
        *,
        stop_session: bool,
    ) -> None:
        event_queue.put_nowait(new_event)
        original_event.stop_event()
        if stop_session:
            self.stop()

    def fallback_to_llm(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
        *,
        stop_session: bool = True,
    ) -> AstrMessageEvent:
        if not stop_session:
            logger.warning(
                "fallback_to_llm(stop_session=False) 会保留当前会话,默认会话拦截可能导致兜底无效,"
                "建议谨慎使用或在后续输入中自行终止会话。",
            )
        new_event = self._prepare_llm_fallback_event(event, stop_session=stop_session)
        self._apply_llm_fallback_side_effects(
            event_queue,
            event,
            new_event,
            stop_session=stop_session,
        )
        return new_event

这样的重构保留了所有当前行为(克隆、标记、入队、停止事件/会话),但把“创建什么”和“如何应用”清晰地分离开来。

2. 用比私有标志更显式的方式表达路由意图

如果你可以修改 AstrMessageEvent,建议把 _bypass_session_waiter 包到一个命名更清晰的 API 或结构化的元数据里。这样能保持当前行为的同时,让相关逻辑更容易被发现和理解。

示例 1:一个小的包装方法(最小改动):

class AstrMessageEvent:
    ...

    def mark_to_bypass_session_waiter(self, bypass: bool = True) -> None:
        # still uses the same private field under the hood
        self._bypass_session_waiter = bypass

在 helper 中使用它:

def _prepare_llm_fallback_event(...):
    new_event = event.clone_for_llm()
    new_event.mark_to_bypass_session_waiter(bypass=not stop_session)
    return new_event

示例 2(如果你未来希望有更明确的路由结构):一个专门的 routing 字段:

class AstrMessageEvent:
    ...
    routing: dict[str, Any] = attr.Factory(dict)

# usage
new_event.routing["bypass_session_waiter"] = not stop_session

如果有其他代码依赖旧字段,你也可以在内部把这个字段映射回 _bypass_session_waiter 以保持向后兼容。

3. 明确当前一个 stop_session 标志承载的两种语义

目前同一个标志同时控制两类截然不同的行为。通过增加更明确的包装方法,你可以保留现有方法不变,同时给调用方提供更具自说明性的入口:

class SessionController:
    ...

    def fallback_to_llm_and_stop_session(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
    ) -> AstrMessageEvent:
        return self.fallback_to_llm(
            event_queue=event_queue,
            event=event,
            stop_session=True,
        )

    def fallback_to_llm_only_for_current_input(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
    ) -> AstrMessageEvent:
        return self.fallback_to_llm(
            event_queue=event_queue,
            event=event,
            stop_session=False,
        )

你可以保留 fallback_to_llm(..., stop_session=...) 以兼容现有调用方,同时逐步将调用迁移到这些语义更清晰的入口上。

4. 略微扁平化 trigger 的控制流

你已经通过让 trigger 返回 bool 提升了可读性。你还可以通过把“已完成”的检查提前,并在加锁块最后统一返回一次,让分支逻辑更易追踪:

@classmethod
async def trigger(cls, session_id: str, event: AstrMessageEvent) -> bool:
    session = USER_SESSIONS.get(session_id)
    if not session:
        return False

    # fast-path: session already done
    if session.session_controller.future.done():
        return False

    async with session._lock:
        if session.session_controller.future.done():
            return False

        if session.record_history_chains:
            session.session_controller.history_chains.append(
                [copy.deepcopy(comp) for comp in event.get_messages()],
            )

        try:
            await session.handler(session.session_controller, event)
        except Exception as e:
            session.session_controller.stop(e)
            # handler considered triggered even if it failed
        return True

这样既保留了所有语义(包括在锁内再次检查 .future.done(),以及只要 handler 被调用就返回 True),又能避免在关键路径中出现过多的嵌套 return。

Original comment in English

issue (complexity): Consider refactoring fallback_to_llm and related APIs to separate concerns, clarify routing behavior, and slightly simplify trigger’s control flow for easier reasoning.

The main complexity comes from SessionController.fallback_to_llm bundling multiple concerns and relying on a private flag. You can reduce this without changing behavior by extracting focused helpers and making the routing intent more explicit.

1. Split responsibilities inside fallback_to_llm

Keep the public API, but move the side‑effect orchestration into smaller methods. That makes it easier to reason about and gives future callers a lower‑level entry point if needed.

class SessionController:
    ...

    def _prepare_llm_fallback_event(
        self,
        event: AstrMessageEvent,
        *,
        stop_session: bool,
    ) -> AstrMessageEvent:
        new_event = event.clone_for_llm()
        # preserve behavior
        new_event._bypass_session_waiter = not stop_session
        return new_event

    def _apply_llm_fallback_side_effects(
        self,
        event_queue: asyncio.Queue,
        original_event: AstrMessageEvent,
        new_event: AstrMessageEvent,
        *,
        stop_session: bool,
    ) -> None:
        event_queue.put_nowait(new_event)
        original_event.stop_event()
        if stop_session:
            self.stop()

    def fallback_to_llm(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
        *,
        stop_session: bool = True,
    ) -> AstrMessageEvent:
        if not stop_session:
            logger.warning(
                "fallback_to_llm(stop_session=False) 会保留当前会话,默认会话拦截可能导致兜底无效,"
                "建议谨慎使用或在后续输入中自行终止会话。",
            )
        new_event = self._prepare_llm_fallback_event(event, stop_session=stop_session)
        self._apply_llm_fallback_side_effects(
            event_queue,
            event,
            new_event,
            stop_session=stop_session,
        )
        return new_event

This keeps all behavior (clone, flag, enqueue, stop event/session) but clearly separates what is created from how it is applied.

2. Make the routing intent more explicit than a private flag

If you can touch AstrMessageEvent, consider wrapping _bypass_session_waiter behind a clearly named API or structured metadata. That keeps the logic discoverable while preserving existing behavior.

Example 1: a small wrapper method (minimal change):

class AstrMessageEvent:
    ...

    def mark_to_bypass_session_waiter(self, bypass: bool = True) -> None:
        # still uses the same private field under the hood
        self._bypass_session_waiter = bypass

Use it in your helper:

def _prepare_llm_fallback_event(...):
    new_event = event.clone_for_llm()
    new_event.mark_to_bypass_session_waiter(bypass=not stop_session)
    return new_event

Example 2 (if you want something more explicit later): a dedicated routing field:

class AstrMessageEvent:
    ...
    routing: dict[str, Any] = attr.Factory(dict)

# usage
new_event.routing["bypass_session_waiter"] = not stop_session

You can internally map this to the old _bypass_session_waiter for backward compatibility if other code depends on it.

3. Clarify the two different stop_session semantics

Right now one flag controls two quite different behaviors. Adding explicit wrappers keeps the existing method but gives call sites more self‑documenting options:

class SessionController:
    ...

    def fallback_to_llm_and_stop_session(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
    ) -> AstrMessageEvent:
        return self.fallback_to_llm(
            event_queue=event_queue,
            event=event,
            stop_session=True,
        )

    def fallback_to_llm_only_for_current_input(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
    ) -> AstrMessageEvent:
        return self.fallback_to_llm(
            event_queue=event_queue,
            event=event,
            stop_session=False,
        )

You keep fallback_to_llm(..., stop_session=...) for compatibility but can migrate callers to these clearer entry points over time.

4. Slightly flatten trigger control flow

You’ve already improved trigger by returning a bool. You can make the branching a bit easier to follow by returning once at the end of the lock block and moving the “done” check earlier:

@classmethod
async def trigger(cls, session_id: str, event: AstrMessageEvent) -> bool:
    session = USER_SESSIONS.get(session_id)
    if not session:
        return False

    # fast-path: session already done
    if session.session_controller.future.done():
        return False

    async with session._lock:
        if session.session_controller.future.done():
            return False

        if session.record_history_chains:
            session.session_controller.history_chains.append(
                [copy.deepcopy(comp) for comp in event.get_messages()],
            )

        try:
            await session.handler(session.session_controller, event)
        except Exception as e:
            session.session_controller.stop(e)
            # handler considered triggered even if it failed
        return True

This keeps all semantics (including double‑checking .future.done() under the lock and returning True as long as the handler was invoked) but avoids nested returns in the critical path.

self,
event_queue: asyncio.Queue,
event: AstrMessageEvent,
*,
stop_session: bool = True,
) -> AstrMessageEvent:
"""将当前事件重新入队,由默认 LLM 流程处理,适用于非预期输入的兜底。

Args:
Comment on lines +33 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): call_llm 标志被重置为 False,这似乎和“回退到默认 LLM 流程”的意图相矛盾。

从命名和 docstring(fallback_to_llm / _clone_event_for_llm)来看,这个分支应该是为了重新进入默认的 LLM 流水线。将 new_event.call_llm = False 很可能会阻止这一点,具体取决于 AstrMessageEvent 在其他地方是如何使用的。请确认这里是否应该改为 True,或者保持原值不变,以便回退路径确实会触发默认的 LLM 流程。

Original comment in English

issue (bug_risk): The call_llm flag is being reset to False, which seems to contradict the intent of falling back to the default LLM flow.

Given the naming and docstring (fallback_to_llm / _clone_event_for_llm), this path appears intended to re-enter the default LLM pipeline. Setting new_event.call_llm = False likely prevents that, depending on how AstrMessageEvent is used elsewhere. Please confirm whether this should instead be True or left unchanged so the fallback actually triggers the default LLM flow.

Comment on lines +33 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 建议在使用事件队列的 put_nowait 时考虑处理潜在的队列背压问题。

put_nowait 假设队列永远不会满。对于有边界的队列或在高负载情况下,这可能会抛出 QueueFull 并悄然丢弃兜底事件。可以考虑将该方法改为异步并使用 await event_queue.put(new_event),或者显式捕获 QueueFull 并进行处理/日志记录,以确保在有背压时兜底行为依然可靠。

建议实现:

from typing import Any
import asyncio

import astrbot.core.message.components as Comp
from astrbot import logger
from astrbot.core.platform import AstrMessageEvent
    async def fallback_to_llm(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
        *,
        stop_session: bool = True,
    ) -> AstrMessageEvent:
        new_event = event.clone_for_llm()
        new_event._bypass_session_waiter = not stop_session
        await event_queue.put(new_event)
        event.stop_event()

由于 fallback_to_llm 现在变为 async,所有调用点都需要更新为 await session_waiter.fallback_to_llm(...)(或在持有实例上进行等价调用),并且这些调用方本身必须运行在异步上下文中。如果某些调用方无法改为异步,可以使用诸如 asyncio.create_task(self.fallback_to_llm(...)) 之类的小工具函数代替,但这会改变调用语义,需要结合上下文进行评估。

Original comment in English

suggestion (bug_risk): Consider handling potential queue backpressure instead of using put_nowait on the event queue.

put_nowait assumes the queue is never full. With a bounded queue or under load, this can raise QueueFull and silently drop the fallback event. Consider making this method async and await event_queue.put(new_event), or explicitly catch QueueFull and handle/log it so fallback behavior remains reliable under backpressure.

Suggested implementation:

from typing import Any
import asyncio

import astrbot.core.message.components as Comp
from astrbot import logger
from astrbot.core.platform import AstrMessageEvent
    async def fallback_to_llm(
        self,
        event_queue: asyncio.Queue,
        event: AstrMessageEvent,
        *,
        stop_session: bool = True,
    ) -> AstrMessageEvent:
        new_event = event.clone_for_llm()
        new_event._bypass_session_waiter = not stop_session
        await event_queue.put(new_event)
        event.stop_event()

Because fallback_to_llm is now async, all call sites must be updated to await session_waiter.fallback_to_llm(...) (or equivalent on the owning instance), and the callers must themselves be running in an async context. If some callers cannot be made async, a small helper like asyncio.create_task(self.fallback_to_llm(...)) may be used instead, but that changes the call semantics and should be reviewed in context.

event_queue: 事件队列
event: 当前事件
stop_session: 是否结束当前 SessionWaiter。False 时仅兜底当前输入,继续等待后续输入。
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing return value documentation. The docstring should document that this method returns the cloned AstrMessageEvent that was enqueued, as shown by the -> AstrMessageEvent return type annotation and the return new_event statement at line 58.

Add to docstring:

Returns:
    AstrMessageEvent: 重新入队的事件副本
Suggested change
stop_session: 是否结束当前 SessionWaiterFalse 时仅兜底当前输入继续等待后续输入
stop_session: 是否结束当前 SessionWaiterFalse 时仅兜底当前输入继续等待后续输入
Returns:
AstrMessageEvent: 重新入队的事件副本

Copilot uses AI. Check for mistakes.
"""
if not stop_session:
logger.warning(
"fallback_to_llm(stop_session=False) 会保留当前会话,默认会话拦截可能导致兜底无效,"
"建议谨慎使用或在后续输入中自行终止会话。",
)
new_event = event.clone_for_llm()
new_event._bypass_session_waiter = not stop_session
event_queue.put_nowait(new_event)
event.stop_event()
if stop_session:
self.stop()
Comment on lines +52 to +57
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: When stop_session=True, there's a window between queueing the new event (line 54) and stopping the session (line 57) where the new event could be processed and intercepted by the still-active session again, defeating the fallback purpose.

The logic should be:

new_event._bypass_session_waiter = True  # Always bypass for fallback events

This ensures the fallback event always reaches the LLM pipeline regardless of session state or timing.

Copilot uses AI. Check for mistakes.
return new_event
Comment on lines +33 to +58
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new fallback_to_llm method lacks test coverage. This is a critical feature that handles session fallback logic with multiple code paths (stop_session=True vs False) and interactions with event queuing, event stopping, and session cleanup.

Consider adding tests to verify:

  1. Event is correctly cloned and enqueued
  2. _bypass_session_waiter flag is set appropriately
  3. Original event is stopped
  4. Session is stopped when stop_session=True
  5. Session continues when stop_session=False
  6. Warning is logged when stop_session=False

Copilot uses AI. Check for mistakes.

def stop(self, error: Exception = None):
"""立即结束这个会话"""
if not self.future.done():
Expand Down Expand Up @@ -147,11 +175,15 @@ def _cleanup(self, error: Exception = None):
self.session_controller.stop(error)

@classmethod
async def trigger(cls, session_id: str, event: AstrMessageEvent):
"""外部输入触发会话处理"""
async def trigger(cls, session_id: str, event: AstrMessageEvent) -> bool:
"""外部输入触发会话处理

Returns:
bool: 是否成功触发处理。False 表示会话不存在或已结束。
"""
session = USER_SESSIONS.get(session_id)
if not session or session.session_controller.future.done():
return
return False

async with session._lock:
if not session.session_controller.future.done():
Expand All @@ -164,6 +196,8 @@ async def trigger(cls, session_id: str, event: AstrMessageEvent):
await session.handler(session.session_controller, event)
except Exception as e:
session.session_controller.stop(e)
return True
return False


def session_waiter(timeout: int = 30, record_history_chains: bool = False):
Expand Down
17 changes: 8 additions & 9 deletions packages/session_controller/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import copy
from sys import maxsize

import astrbot.api.message_components as Comp
Expand All @@ -7,7 +6,6 @@
from astrbot.api.star import Context, Star
from astrbot.core.utils.session_waiter import (
FILTERS,
USER_SESSIONS,
SessionController,
SessionWaiter,
session_waiter,
Expand All @@ -23,10 +21,12 @@ def __init__(self, context: Context):
@filter.event_message_type(filter.EventMessageType.ALL, priority=maxsize)
async def handle_session_control_agent(self, event: AstrMessageEvent):
"""会话控制代理"""
if getattr(event, "_bypass_session_waiter", False):
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider using hasattr() instead of getattr() with a default for better readability when checking attribute existence:

if hasattr(event, "_bypass_session_waiter") and event._bypass_session_waiter:
    return

This makes it clearer that you're checking for the presence of the attribute, not retrieving a value.

Suggested change
if getattr(event, "_bypass_session_waiter", False):
if hasattr(event, "_bypass_session_waiter") and event._bypass_session_waiter:

Copilot uses AI. Check for mistakes.
return
for session_filter in FILTERS:
session_id = session_filter.filter(event)
if session_id in USER_SESSIONS:
await SessionWaiter.trigger(session_id, event)
handled = await SessionWaiter.trigger(session_id, event)
if handled:
event.stop_event()

@filter.event_message_type(filter.EventMessageType.ALL, priority=maxsize - 1)
Expand Down Expand Up @@ -96,11 +96,10 @@ async def empty_mention_waiter(
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
new_event = copy.copy(event)
# 重新推入事件队列
self.context.get_event_queue().put_nowait(new_event)
event.stop_event()
controller.stop()
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)

try:
await empty_mention_waiter(event)
Comment on lines 96 to 105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): 合并嵌套的 if 条件(merge-nested-ifs

Suggested change
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
new_event = copy.copy(event)
# 重新推入事件队列
self.context.get_event_queue().put_nowait(new_event)
event.stop_event()
controller.stop()
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
if len(messages) == 1 and ((
isinstance(messages[0], Comp.At)
and str(messages[0].qq) == str(event.get_self_id())
and p_settings.get("empty_mention_waiting", True)
) or (
isinstance(messages[0], Comp.Plain)
and messages[0].text.strip() in wake_prefix
)):
if p_settings.get("empty_mention_waiting_need_reply", True):
try:
# 尝试使用 LLM 生成更生动的回复
func_tools_mgr = self.context.get_llm_tool_manager()
# 获取用户当前的对话信息
curr_cid = await self.context.conversation_manager.get_curr_conversation_id(
event.unified_msg_origin,
)
conversation = None
if curr_cid:
conversation = await self.context.conversation_manager.get_conversation(
event.unified_msg_origin,
curr_cid,
)
else:
# 创建新对话
curr_cid = await self.context.conversation_manager.new_conversation(
event.unified_msg_origin,
platform_id=event.get_platform_id(),
)
# 使用 LLM 生成回复
yield event.request_llm(
prompt=(
"注意,你正在社交媒体上中与用户进行聊天,用户只是通过@来唤醒你,但并未在这条消息中输入内容,他可能会在接下来一条发送他想发送的内容。"
"你友好地询问用户想要聊些什么或者需要什么帮助,回复要符合人设,不要太过机械化。"
"请注意,你仅需要输出要回复用户的内容,不要输出其他任何东西"
),
func_tool_manager=func_tools_mgr,
session_id=curr_cid,
contexts=[],
system_prompt="",
conversation=conversation,
)
except Exception as e:
logger.error(f"LLM response failed: {e!s}")
# LLM 回复失败,使用原始预设回复
yield event.plain_result("想要问什么呢?😄")
@session_waiter(60)
async def empty_mention_waiter(
controller: SessionController,
event: AstrMessageEvent,
):
event.message_obj.message.insert(
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
except TimeoutError as _:
pass
except Exception as e:
yield event.plain_result("发生错误,请联系管理员: " + str(e))
finally:
event.stop_event()


说明过多的嵌套会使代码难以理解,这在 Python 中尤为明显,因为没有花括号来帮助区分不同的嵌套层级。

阅读嵌套很深的代码时会很混乱,因为你必须时刻记住各个条件对应的是哪一层。我们因此会尽量在可能的地方减少嵌套,而当两个 if 条件可以通过 and 合并时,就是一个很容易的优化点。

Original comment in English

suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)

Suggested change
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
new_event = copy.copy(event)
# 重新推入事件队列
self.context.get_event_queue().put_nowait(new_event)
event.stop_event()
controller.stop()
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
if len(messages) == 1 and ((
isinstance(messages[0], Comp.At)
and str(messages[0].qq) == str(event.get_self_id())
and p_settings.get("empty_mention_waiting", True)
) or (
isinstance(messages[0], Comp.Plain)
and messages[0].text.strip() in wake_prefix
)):
if p_settings.get("empty_mention_waiting_need_reply", True):
try:
# 尝试使用 LLM 生成更生动的回复
func_tools_mgr = self.context.get_llm_tool_manager()
# 获取用户当前的对话信息
curr_cid = await self.context.conversation_manager.get_curr_conversation_id(
event.unified_msg_origin,
)
conversation = None
if curr_cid:
conversation = await self.context.conversation_manager.get_conversation(
event.unified_msg_origin,
curr_cid,
)
else:
# 创建新对话
curr_cid = await self.context.conversation_manager.new_conversation(
event.unified_msg_origin,
platform_id=event.get_platform_id(),
)
# 使用 LLM 生成回复
yield event.request_llm(
prompt=(
"注意,你正在社交媒体上中与用户进行聊天,用户只是通过@来唤醒你,但并未在这条消息中输入内容,他可能会在接下来一条发送他想发送的内容。"
"你友好地询问用户想要聊些什么或者需要什么帮助,回复要符合人设,不要太过机械化。"
"请注意,你仅需要输出要回复用户的内容,不要输出其他任何东西"
),
func_tool_manager=func_tools_mgr,
session_id=curr_cid,
contexts=[],
system_prompt="",
conversation=conversation,
)
except Exception as e:
logger.error(f"LLM response failed: {e!s}")
# LLM 回复失败,使用原始预设回复
yield event.plain_result("想要问什么呢?😄")
@session_waiter(60)
async def empty_mention_waiter(
controller: SessionController,
event: AstrMessageEvent,
):
event.message_obj.message.insert(
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
except TimeoutError as _:
pass
except Exception as e:
yield event.plain_result("发生错误,请联系管理员: " + str(e))
finally:
event.stop_event()


ExplanationToo much nesting can make code difficult to understand, and this is especially
true in Python, where there are no brackets to help out with the delineation of
different nesting levels.

Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.

Comment on lines 96 to 105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): 合并嵌套的 if 条件(merge-nested-ifs

Suggested change
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
new_event = copy.copy(event)
# 重新推入事件队列
self.context.get_event_queue().put_nowait(new_event)
event.stop_event()
controller.stop()
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
if len(messages) == 1 and ((
isinstance(messages[0], Comp.At)
and str(messages[0].qq) == str(event.get_self_id())
and p_settings.get("empty_mention_waiting", True)
) or (
isinstance(messages[0], Comp.Plain)
and messages[0].text.strip() in wake_prefix
)):
if p_settings.get("empty_mention_waiting_need_reply", True):
try:
# 尝试使用 LLM 生成更生动的回复
func_tools_mgr = self.context.get_llm_tool_manager()
# 获取用户当前的对话信息
curr_cid = await self.context.conversation_manager.get_curr_conversation_id(
event.unified_msg_origin,
)
conversation = None
if curr_cid:
conversation = await self.context.conversation_manager.get_conversation(
event.unified_msg_origin,
curr_cid,
)
else:
# 创建新对话
curr_cid = await self.context.conversation_manager.new_conversation(
event.unified_msg_origin,
platform_id=event.get_platform_id(),
)
# 使用 LLM 生成回复
yield event.request_llm(
prompt=(
"注意,你正在社交媒体上中与用户进行聊天,用户只是通过@来唤醒你,但并未在这条消息中输入内容,他可能会在接下来一条发送他想发送的内容。"
"你友好地询问用户想要聊些什么或者需要什么帮助,回复要符合人设,不要太过机械化。"
"请注意,你仅需要输出要回复用户的内容,不要输出其他任何东西"
),
func_tool_manager=func_tools_mgr,
session_id=curr_cid,
contexts=[],
system_prompt="",
conversation=conversation,
)
except Exception as e:
logger.error(f"LLM response failed: {e!s}")
# LLM 回复失败,使用原始预设回复
yield event.plain_result("想要问什么呢?😄")
@session_waiter(60)
async def empty_mention_waiter(
controller: SessionController,
event: AstrMessageEvent,
):
event.message_obj.message.insert(
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
except TimeoutError as _:
pass
except Exception as e:
yield event.plain_result("发生错误,请联系管理员: " + str(e))
finally:
event.stop_event()


说明过多的嵌套会让代码难以理解,特别是在没有花括号来标示不同嵌套层级的 Python 中,这一点尤为明显。

阅读嵌套很深的代码时,你需要时刻记住每个条件属于哪一层逻辑,这会让理解变得很吃力。因此我们会尽量在可能的情况下减少嵌套,而把两个 if 条件用 and 合并起来,就是一个简单直接的改进点。

Original comment in English

suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)

Suggested change
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
new_event = copy.copy(event)
# 重新推入事件队列
self.context.get_event_queue().put_nowait(new_event)
event.stop_event()
controller.stop()
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
if len(messages) == 1 and ((
isinstance(messages[0], Comp.At)
and str(messages[0].qq) == str(event.get_self_id())
and p_settings.get("empty_mention_waiting", True)
) or (
isinstance(messages[0], Comp.Plain)
and messages[0].text.strip() in wake_prefix
)):
if p_settings.get("empty_mention_waiting_need_reply", True):
try:
# 尝试使用 LLM 生成更生动的回复
func_tools_mgr = self.context.get_llm_tool_manager()
# 获取用户当前的对话信息
curr_cid = await self.context.conversation_manager.get_curr_conversation_id(
event.unified_msg_origin,
)
conversation = None
if curr_cid:
conversation = await self.context.conversation_manager.get_conversation(
event.unified_msg_origin,
curr_cid,
)
else:
# 创建新对话
curr_cid = await self.context.conversation_manager.new_conversation(
event.unified_msg_origin,
platform_id=event.get_platform_id(),
)
# 使用 LLM 生成回复
yield event.request_llm(
prompt=(
"注意,你正在社交媒体上中与用户进行聊天,用户只是通过@来唤醒你,但并未在这条消息中输入内容,他可能会在接下来一条发送他想发送的内容。"
"你友好地询问用户想要聊些什么或者需要什么帮助,回复要符合人设,不要太过机械化。"
"请注意,你仅需要输出要回复用户的内容,不要输出其他任何东西"
),
func_tool_manager=func_tools_mgr,
session_id=curr_cid,
contexts=[],
system_prompt="",
conversation=conversation,
)
except Exception as e:
logger.error(f"LLM response failed: {e!s}")
# LLM 回复失败,使用原始预设回复
yield event.plain_result("想要问什么呢?😄")
@session_waiter(60)
async def empty_mention_waiter(
controller: SessionController,
event: AstrMessageEvent,
):
event.message_obj.message.insert(
0,
Comp.At(qq=event.get_self_id(), name=event.get_self_id()),
)
controller.fallback_to_llm(
self.context.get_event_queue(),
event,
)
try:
await empty_mention_waiter(event)
except TimeoutError as _:
pass
except Exception as e:
yield event.plain_result("发生错误,请联系管理员: " + str(e))
finally:
event.stop_event()


ExplanationToo much nesting can make code difficult to understand, and this is especially
true in Python, where there are no brackets to help out with the delineation of
different nesting levels.

Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.

Expand Down