diff --git a/backend/core/registrar.py b/backend/core/registrar.py index 3b3a0a94..90ea6017 100644 --- a/backend/core/registrar.py +++ b/backend/core/registrar.py @@ -40,8 +40,6 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]: """ # 创建数据库表 await create_table() - # 连接 redis - await redis_client.open() # 初始化 limiter await FastAPILimiter.init( redis=redis_client, diff --git a/backend/plugin/tools.py b/backend/plugin/tools.py index 2daf57ab..566f3883 100644 --- a/backend/plugin/tools.py +++ b/backend/plugin/tools.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import asyncio import inspect import json import os @@ -11,7 +10,6 @@ from functools import lru_cache from typing import Any -import nest_asyncio import rtoml from fastapi import APIRouter, Depends, Request @@ -22,8 +20,9 @@ from backend.common.log import log from backend.core.conf import settings from backend.core.path_conf import PLUGIN_DIR -from backend.database.redis import redis_client +from backend.database.redis import RedisCli, redis_client from backend.plugin.errors import PluginConfigError, PluginInjectError +from backend.utils._asyncio import run_await from backend.utils.import_parse import import_module_cached @@ -84,20 +83,18 @@ def parse_plugin_config() -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: extra_plugins = [] app_plugins = [] - # 事件循环嵌套: https://pypi.org/project/nest-asyncio/ - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = asyncio.get_event_loop() - else: - nest_asyncio.apply() - - loop.create_task(redis_client.delete_prefix(f'{settings.PLUGIN_REDIS_PREFIX}:info')) - plugin_status = loop.run_until_complete(redis_client.hgetall(f'{settings.PLUGIN_REDIS_PREFIX}:status')) # type: ignore + plugins = get_plugins() + + # 使用独立单例,避免与主线程冲突 + current_redis_client = RedisCli() + run_await(current_redis_client.open)() + + run_await(current_redis_client.delete_prefix)(f'{settings.PLUGIN_REDIS_PREFIX}:info', exclude=plugins) + plugin_status = run_await(current_redis_client.hgetall)(f'{settings.PLUGIN_REDIS_PREFIX}:status') if not plugin_status: plugin_status = {} - for plugin in get_plugins(): + for plugin in plugins: data = load_plugin_config(plugin) plugin_info = data.get('plugin') @@ -123,13 +120,13 @@ def parse_plugin_config() -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: data['plugin']['name'] = plugin # 缓存插件信息 - loop.create_task( - redis_client.set(f'{settings.PLUGIN_REDIS_PREFIX}:info:{plugin}', json.dumps(data, ensure_ascii=False)) + run_await(current_redis_client.set)( + f'{settings.PLUGIN_REDIS_PREFIX}:info:{plugin}', json.dumps(data, ensure_ascii=False) ) # 缓存插件状态 - loop.create_task(redis_client.hset(f'{settings.PLUGIN_REDIS_PREFIX}:status', mapping=plugin_status)) # type: ignore - loop.create_task(redis_client.delete(f'{settings.PLUGIN_REDIS_PREFIX}:changed')) + run_await(current_redis_client.hset)(f'{settings.PLUGIN_REDIS_PREFIX}:status', mapping=plugin_status) + run_await(current_redis_client.delete)(f'{settings.PLUGIN_REDIS_PREFIX}:changed') return extra_plugins, app_plugins diff --git a/backend/utils/_asyncio.py b/backend/utils/_asyncio.py new file mode 100644 index 00000000..ae61d976 --- /dev/null +++ b/backend/utils/_asyncio.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import asyncio +import atexit +import threading +import weakref + +from typing import Awaitable, Callable, TypeVar + +T = TypeVar('T') + + +class _TaskRunner: + """A task runner that runs an asyncio event loop on a background thread.""" + + def __init__(self): + self.__loop: asyncio.AbstractEventLoop | None = None + self.__thread: threading.Thread | None = None + self.__lock = threading.Lock() + atexit.register(self.close) + + def close(self): + """关闭事件循环""" + if self.__loop: + self.__loop.stop() + + def _target(self): + """后台线程目标""" + loop = self.__loop + try: + loop.run_forever() + finally: + loop.close() + + def run(self, coro): + """在后台线程上同步运行协程""" + with self.__lock: + name = f'{threading.current_thread().name} - runner' + if self.__loop is None: + self.__loop = asyncio.new_event_loop() + self.__thread = threading.Thread(target=self._target, daemon=True, name=name) + self.__thread.start() + fut = asyncio.run_coroutine_threadsafe(coro, self.__loop) + return fut.result(None) + + +_runner_map = weakref.WeakValueDictionary() + + +def run_await(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: + """将协程包装在一个函数中,该函数会阻塞,直到它执行完为止""" + + def wrapped(*args, **kwargs): + name = threading.current_thread().name + inner = coro(*args, **kwargs) + try: + # 如果当前此线程中正在运行循环 + # 使用任务运行程序 + asyncio.get_running_loop() + if name not in _runner_map: + _runner_map[name] = _TaskRunner() + return _runner_map[name].run(inner) + except RuntimeError: + # 如果没有,请创建一个新的事件循环 + loop = asyncio.get_event_loop() + return loop.run_until_complete(inner) + + wrapped.__doc__ = coro.__doc__ + return wrapped diff --git a/pyproject.toml b/pyproject.toml index eb7962b4..1935878d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,6 @@ dependencies = [ "jinja2>=3.1.4", "loguru>=0.7.3", "msgspec>=0.19.0", - "nest-asyncio>=1.6.0", "path==17.0.0", "psutil>=6.0.0", "pwdlib>=0.2.1", diff --git a/requirements.txt b/requirements.txt index e455e3d6..312ab722 100644 --- a/requirements.txt +++ b/requirements.txt @@ -161,8 +161,6 @@ mdurl==0.1.2 # via markdown-it-py msgspec==0.19.0 # via fastapi-best-architecture -nest-asyncio==1.6.0 - # via fastapi-best-architecture nodeenv==1.9.1 # via pre-commit packaging==24.2 diff --git a/uv.lock b/uv.lock index 20ed6824..3955b0fb 100644 --- a/uv.lock +++ b/uv.lock @@ -612,7 +612,6 @@ dependencies = [ { name = "jinja2" }, { name = "loguru" }, { name = "msgspec" }, - { name = "nest-asyncio" }, { name = "path" }, { name = "psutil" }, { name = "pwdlib" }, @@ -667,7 +666,6 @@ requires-dist = [ { name = "jinja2", specifier = ">=3.1.4" }, { name = "loguru", specifier = ">=0.7.3" }, { name = "msgspec", specifier = ">=0.19.0" }, - { name = "nest-asyncio", specifier = ">=1.6.0" }, { name = "path", specifier = "==17.0.0" }, { name = "psutil", specifier = ">=6.0.0" }, { name = "pwdlib", specifier = ">=0.2.1" }, @@ -1237,15 +1235,6 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/9c/fd/b247aec6add5601956d440488b7f23151d8343747e82c038af37b28d6098/multidict-6.2.0-py3-none-any.whl", hash = "sha256:5d26547423e5e71dcc562c4acdc134b900640a39abd9066d7326a7cc2324c530" }, ] -[[package]] -name = "nest-asyncio" -version = "1.6.0" -source = { registry = "https://mirrors.aliyun.com/pypi/simple" } -sdist = { url = "https://mirrors.aliyun.com/pypi/packages/83/f8/51569ac65d696c8ecbee95938f89d4abf00f47d58d48f6fbabfe8f0baefe/nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe" } -wheels = [ - { url = "https://mirrors.aliyun.com/pypi/packages/a0/c4/c2971a3ba4c6103a3d10c4b0f24f461ddc027f0f09763220cf35ca1401b3/nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c" }, -] - [[package]] name = "nodeenv" version = "1.9.1"