本文档详细说明如何将 zion-local-tools 从当前的单进程阻塞架构改造为支持多 Worker + Redis 共享状态的架构,以实现真正的高并发能力。
改造目标:
- ✅ 支持多个 Worker 进程同时运行
- ✅ 解决 Session 状态跨 Worker 共享问题
- ✅ 解决阻塞调用导致的事件循环阻塞问题
- ✅ 支持水平扩展,提升并发 QPS 10-50 倍
- ✅ 保持向后兼容性,不破坏现有 API
当前架构(单进程阻塞)
┌─────────────┐ ┌─────────────┐
│ FastAPI │ │ Sessions │
│ App │ │ Manager │
│ │ │ (内存Dict) │
└──────┬──────┘ └──────┬──────┘
│ │
│ ┌────────────┴───────┐
│ │ 所有服务都是同步的 │
│ │ ❌ subprocess.run │
│ │ ❌ paramiko 阻塞 │
│ └────────────────────┘
│
▼
┌─────────────┐
│ 单事件循环 │ ❌ 一个请求阻塞全部
└─────────────┘
改造后架构(多 Worker + Redis)
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ FastAPI │ │ FastAPI │ │ FastAPI │
│ 无状态 │ │ 无状态 │ │ 无状态 │
│ + 线程池 │ │ + 线程池 │ │ + 线程池 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└─────────────────┼─────────────────┘
│
▼
┌──────────────────┐
│ Redis 共享存储 │
│ Session 配置 │
│ 分布式锁 │
└──────────────────┘
│
▼
┌──────────────────┐
│ SSH 连接池 │
│ (进程级缓存) │
└──────────────────┘
| 文件路径 | 改动类型 | 优先级 |
|---|---|---|
pyproject.toml |
依赖添加 | P0 |
requirements.txt |
依赖添加 | P0 |
src/zion_local_tools/config/settings.py |
配置扩展 | P0 |
src/zion_local_tools/main.py |
应用配置 | P0 |
run.py |
启动脚本 | P0 |
src/zion_local_tools/core/__init__.py |
导出扩展 | P1 |
src/zion_local_tools/sessions/service.py |
核心改造 | P0 |
src/zion_local_tools/sessions/router.py |
路由改造 | P0 |
src/zion_local_tools/bash/service.py |
异步改造 | P0 |
src/zion_local_tools/files/service.py |
异步改造 | P1 |
| 文件路径 | 说明 | 优先级 |
|---|---|---|
src/zion_local_tools/core/redis_client.py |
Redis 客户端封装 | P0 |
src/zion_local_tools/core/shared_state.py |
共享状态管理器 | P0 |
src/zion_local_tools/core/async_executor.py |
异步线程池封装 | P0 |
src/zion_local_tools/core/ssh_pool.py |
SSH 连接池 | P0 |
src/zion_local_tools/core/distributed_lock.py |
Redis 分布式锁 | P0 |
src/zion_local_tools/sessions/websocket_helper.py |
WebSocket 辅助 | P1 |
改动位置:[project.dependencies] 节点
改动内容:
# 新增 Redis 相关依赖
redis = {version = "^5.0.0", extras = ["hiredis"]}
aioredis = "^2.0.1"改动内容:在文件末尾添加
# Redis 支持
redis>=5.0.0
hiredis>=2.2.0
aioredis>=2.0.1改动位置 1:Settings.__init__() 方法,在 self.MCP_NPM_CACHE_DIR = ... 之后添加
# Redis 配置
self.REDIS_ENABLED = os.environ.get("ZION_REDIS_ENABLED", "1") == "1"
self.REDIS_URL = os.environ.get("ZION_REDIS_URL", "redis://localhost:6379/0")
self.REDIS_PREFIX = "zion:localtools:"
self.REDIS_SESSION_TTL = 86400 # 24小时
self.REDIS_LOCK_TTL = 300 # 5分钟
# 线程池配置
self.THREAD_POOL_MAX_WORKERS = int(os.environ.get("ZION_THREAD_POOL_MAX_WORKERS", str(os.cpu_count() * 2)))
self.THREAD_POOL_MIN_WORKERS = 4
# Worker 配置
self.WORKER_COUNT = int(os.environ.get("ZION_WORKER_COUNT", "4"))
self.WORKER_LIMIT_CONCURRENCY = int(os.environ.get("ZION_WORKER_LIMIT_CONCURRENCY", "100"))
# WebSocket 粘性路由配置
self.WEBSOCKET_STICKY_COOKIE = "zion_worker_id"
self.WEBSOCKET_COOKIE_MAX_AGE = 3600改动位置 2:Settings.to_dict() 方法,添加新配置的导出
# 在 return 字典中添加
"redis_enabled": self.REDIS_ENABLED,
"redis_url": self.REDIS_URL,
"thread_pool_max_workers": self.THREAD_POOL_MAX_WORKERS,
"worker_count": self.WORKER_COUNT,完整文件内容,提供统一的 Redis 访问接口。
由于完整代码较长,以下是核心文件的改造要点:
- 实现
RedisClient单例类 - 提供基础操作:get/set/delete/exists/expire
- 提供 Hash 操作:hget/hset/hgetall/hdel/hkeys
- 提供 JSON 操作:json_get/json_set
- 提供 List 操作:lpush/rpush/lpop/rpop/lrange/llen
- 自动处理 key 前缀
- 实现
DistributedLock类,基于 Redis SET NX EX - 实现
AsyncLockContext上下文管理器 - 使用 Lua 脚本保证释放锁的原子性
- Redis 不可用时降级为 asyncio.Lock
- 实现
AsyncExecutor单例类 - 封装 ThreadPoolExecutor
- 提供
run()方法执行同步函数 - 提供
run_with_retry()方法支持重试 - 提供
@async_run装饰器
- 实现
SSHConnection类封装单个连接 - 实现
SSHConnectionPool进程级单例 - 支持连接保活、健康检查、自动重连
- 提供统计信息接口
- 实现
SharedSessionManager类 - Session CRUD 操作
- CWD 管理操作
- 健康状态管理
- 与 Redis 交互
改动位置 1:在 import 区域添加
from .core.redis_client import redis_client
from .core.ssh_pool import ssh_pool改动位置 2:添加 lifespan 事件处理,修改 FastAPI 创建
from contextlib import asynccontextmanager
def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
await redis_client.connect()
await ssh_pool.start()
yield
await redis_client.close()
await ssh_pool.stop()
app = FastAPI(
title=settings.APP_TITLE,
version=settings.APP_VERSION,
description="Zion本地工具服务...",
lifespan=lifespan
)改动位置 3:修改健康检查端点
@app.get("/health")
async def health():
return {
"ok": True,
"redis": redis_client.is_enabled,
"ssh_pool": ssh_pool.get_stats() if redis_client.is_enabled else None
}需要添加 workers 和 limit_concurrency 参数配置。
改造要点:
- 导入 shared_session_manager, ssh_pool, distributed_lock
- 将所有方法改为 async def
- 使用 Redis 存储 session 状态
- SSH 连接使用进程级连接池
- 使用分布式锁保护 SSH 执行
改造要点:
- 所有路由改为 async def
- 调用改造后的 async service 方法
- WebSocket 保持兼容,但建议使用粘性路由
改造要点:
- 添加 async_execute_async() 方法
- 使用 async_executor.run() 在线程池执行
- 保持原 execute() 方法兼容(标记为 deprecated)
- 路由改为调用 async 版本
改造要点:
- glob_search 改为异步
- grep_search 改为异步
- read_file/write_file/edit_file 改为异步
- 使用 async_executor 执行文件操作
添加导出:
from .redis_client import redis_client
from .shared_state import shared_session_manager
from .async_executor import async_executor
from .ssh_pool import ssh_pool
from .distributed_lock import acquire_lock, DistributedLock如有问题或疑问,请联系技术团队。
文档版本: v1.0
最后更新: $(date)
作者: Zion Tools Team