Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"friend_message_needs_wake_prefix": False,
"ignore_bot_self_message": False,
"ignore_at_all": False,
"request_queue": False,
},
"provider": [],
"provider_settings": {
Expand Down Expand Up @@ -468,6 +469,10 @@
"type": "bool",
"hint": "启用后,机器人会忽略 @ 全体成员 的消息事件。",
},
"request_queue": {
"type": "bool",
"hint": "启用请求队列功能。当同一会话中同时收到多个请求时,会将这些请求排队依次处理,确保上下文连续性,避免并发处理导致的混乱。",
},
"segmented_reply": {
"type": "object",
"items": {
Expand Down Expand Up @@ -1903,31 +1908,17 @@
"_special": "select_provider",
"hint": "留空代表不使用。可用于不支持视觉模态的聊天模型。",
},
"provider_stt_settings.enable": {
"description": "默认启用语音转文本",
"type": "bool",
},
"provider_stt_settings.provider_id": {
"description": "语音转文本模型",
"type": "string",
"hint": "留空代表不使用。",
"_special": "select_provider_stt",
"condition": {
"provider_stt_settings.enable": True,
},
},
"provider_tts_settings.enable": {
"description": "默认启用文本转语音",
"type": "bool",
},
"provider_tts_settings.provider_id": {
"description": "文本转语音模型",
"type": "string",
"hint": "留空代表不使用。",
"_special": "select_provider_tts",
"condition": {
"provider_tts_settings.enable": True,
},
},
"provider_settings.image_caption_prompt": {
"description": "图片转述提示词",
Expand Down Expand Up @@ -2216,6 +2207,11 @@
"description": "用户权限不足时是否回复",
"type": "bool",
},
"platform_settings.request_queue": {
"description": "启用请求队列功能",
"type": "bool",
"hint": "当同一会话中同时收到多个请求时,会将这些请求排队依次处理,确保上下文连续性,避免并发处理导致的混乱。",
},
},
},
},
Expand Down
26 changes: 26 additions & 0 deletions astrbot/core/pipeline/process_stage/method/llm_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import asyncio
from collections import defaultdict
import copy
import json
import traceback
Expand Down Expand Up @@ -286,6 +287,10 @@ async def run_agent(
)


# 用于将并行请求转化为队列的锁
user_llm_locks = defaultdict(asyncio.Lock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 考虑通过使用每个实例锁和 'async with' 块来重构锁管理,以简化请求队列处理。

你可以通过以下方式消除分散的 `lock.acquire()`/`lock.release()` 调用(以及全局 `_unlock`):

1. 将锁映射移动到阶段实例中。
2. 将请求逻辑包装在 `async with lock:` 块中。
3. 将“实际工作”提取到辅助方法中,这样就不必重复提前返回。

例如:

```python
from collections import defaultdict

class LLMRequestSubStage(Stage):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # now per‐stage, not global
        self.user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

    async def process(self, event: AstrMessageEvent, _nested: bool = False):
        # ... your preamble: build req, conversation, etc. ...
        cid = req.conversation.cid

        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            lock = self.user_locks[cid]
            async with lock:
                return await self._process_with_llm(event, req)
        else:
            return await self._process_with_llm(event, req)

    async def _process_with_llm(self, event: AstrMessageEvent, req: ProviderRequest):
        # all the code that used to be between acquire()/release()
        # including hooks, max‐length checks, agent runner, history save, webchat, etc.
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            return
        # ... rest of your logic ...

然后:

  • 删除模块级别的旧 user_llm_locks = defaultdict(...)
  • 删除 _unlock 方法和所有手动 .release() 调用。
  • 所有功能保持不变,但你不再有容易出错的手动锁/释放路径。
Original comment in English

issue (complexity): Consider refactoring lock management by using per-instance locks and 'async with' blocks to simplify request queue handling.

You can eliminate the scattered `lock.acquire()`/`lock.release()` calls (and the global `_unlock`) by:

1. Moving your lock‐map into the stage instance.
2. Wrapping your request logic in an `async with lock:` block.
3. Extracting the “real work” into a helper method so you don’t have to repeat early returns.

For example:

```python
from collections import defaultdict

class LLMRequestSubStage(Stage):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # now per‐stage, not global
        self.user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

    async def process(self, event: AstrMessageEvent, _nested: bool = False):
        # ... your preamble: build req, conversation, etc. ...
        cid = req.conversation.cid

        if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
            lock = self.user_locks[cid]
            async with lock:
                return await self._process_with_llm(event, req)
        else:
            return await self._process_with_llm(event, req)

    async def _process_with_llm(self, event: AstrMessageEvent, req: ProviderRequest):
        # all the code that used to be between acquire()/release()
        # including hooks, max‐length checks, agent runner, history save, webchat, etc.
        if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
            return
        # ... rest of your logic ...

Then:

  • Remove the old user_llm_locks = defaultdict(...) at module level.
  • Delete the _unlock method and all manual .release() calls.
  • Everything remains functionally identical, but you no longer have error‐prone manual lock/release paths.



class LLMRequestSubStage(Stage):
async def initialize(self, ctx: PipelineContext) -> None:
self.ctx = ctx
Expand Down Expand Up @@ -339,6 +344,12 @@ async def _get_session_conv(self, event: AstrMessageEvent):
conversation = await conv_mgr.get_conversation(umo, cid)
return conversation

def _unlock(self, cid: str):
# 释放锁
if cid in user_llm_locks and user_llm_locks[cid].locked():
user_llm_locks[cid].release()
logger.info(f"用户(cid: {cid}) 的请求已完成,锁已释放。")
Comment on lines +347 to +351
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 检查释放锁时的竞态条件。

多个并发请求可能导致锁被未获取它的请求释放。为防止这种情况,请考虑跟踪锁所有权或实现排队机制。

Original comment in English

issue (bug_risk): Check for race conditions when releasing locks.

Multiple concurrent requests could cause a lock to be released by a request that did not acquire it. To prevent this, consider tracking lock ownership or implementing a queueing mechanism.


async def process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): 在 LLMRequestSubStage.process 中发现低代码质量 - 10% (low-code-quality)


解释此函数的质量得分低于 25% 的质量阈值。
此得分是方法长度、认知复杂度和工作内存的组合。

如何解决这个问题?

重构此函数以使其更短、更具可读性可能是有益的。

  • 通过将部分功能提取到自己的函数中来减少函数长度。这是你能做的最重要的事情——理想情况下,一个函数应该少于 10 行。
  • 减少嵌套,也许可以通过引入守卫子句来提前返回。
  • 确保变量的作用域紧密,以便使用相关概念的代码在函数中坐在一起,而不是分散开来。
Original comment in English

issue (code-quality): Low code quality found in LLMRequestSubStage.process - 10% (low-code-quality)


ExplanationThe quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

self, event: AstrMessageEvent, _nested: bool = False
) -> Union[None, AsyncGenerator[None, None]]:
Expand Down Expand Up @@ -390,8 +401,21 @@ async def process(
if not req.prompt and not req.image_urls:
return

# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
Comment on lines +404 to +414
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 考虑对锁获取和上下文更新进行异常处理。

如果在锁获取或上下文更新期间发生错误,锁可能不会释放,从而导致请求挂起。使用 try/finally 块来确保锁被释放。

Suggested change
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
try:
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
except Exception as e:
logger.error(f"请求队列处理时发生异常: {e}")
raise
finally:
if lock.locked():
lock.release()
Original comment in English

suggestion (bug_risk): Consider exception handling for lock acquisition and context update.

If an error occurs during lock acquisition or context update, the lock may not be released, leading to hanging requests. Use a try/finally block to guarantee the lock is released.

Suggested change
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
# 控制请求队列
if self.ctx.astrbot_config["platform_settings"]["request_queue"]:
cid = req.conversation.cid
lock = user_llm_locks[cid]
if lock.locked():
logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
await lock.acquire()
try:
# 更新到最新的上下文
conversation = await self._get_session_conv(event)
req.conversation = conversation
req.contexts = json.loads(conversation.history)
except Exception as e:
logger.error(f"请求队列处理时发生异常: {e}")
raise
finally:
if lock.locked():
lock.release()


# 执行请求 LLM 前事件钩子。
if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
self._unlock(req.conversation.cid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 在返回之前解锁可能无法覆盖所有提前返回的情况。

其他提前返回或异常可能会阻止锁被释放。使用上下文管理器或 try/finally 来确保锁始终被释放。

建议的实现:

            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)

                # 执行请求 LLM 前事件钩子。
                if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
                    return

                # ... 这里继续后续处理逻辑 ...

            finally:
                lock.release()

如果函数其余部分还有其他提前返回或异常,它们现在将由 try/finally 块覆盖。如果额外的逻辑是锁定部分的一部分,你可能需要将其移到 try 块内。

Original comment in English

suggestion (bug_risk): Unlocking before returning may not cover all early returns.

Other early returns or exceptions could prevent the lock from being released. Use a context manager or try/finally to ensure the lock is always released.

Suggested implementation:

            if lock.locked():
                logger.info(f"用户(cid: {cid}) 的新请求正在等待上一次请求完成...")
            await lock.acquire()
            try:
                # 更新到最新的上下文
                conversation = await self._get_session_conv(event)
                req.conversation = conversation
                req.contexts = json.loads(conversation.history)

                # 执行请求 LLM 前事件钩子。
                if await call_event_hook(event, EventType.OnLLMRequestEvent, req):
                    return

                # ... 这里继续后续处理逻辑 ...

            finally:
                lock.release()

If there are other early returns or exceptions in the rest of the function, they will now be covered by the try/finally block. You may need to move additional logic inside the try block if it is part of the locked section.

return

if isinstance(req.contexts, str):
Expand Down Expand Up @@ -505,6 +529,8 @@ async def process(
if event.get_platform_name() == "webchat":
asyncio.create_task(self._handle_webchat(event, req, provider))

self._unlock(req.conversation.cid)

async def _handle_webchat(
self, event: AstrMessageEvent, req: ProviderRequest, prov: Provider
):
Expand Down