-
Notifications
You must be signed in to change notification settings - Fork 113
Expand file tree
/
Copy pathchatapp_common.py
More file actions
186 lines (154 loc) · 7.49 KB
/
chatapp_common.py
File metadata and controls
186 lines (154 loc) · 7.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import asyncio, glob, os, queue as Q, re, socket, sys, time
HELP_TEXT = "📖 命令列表:\n/help - 显示帮助\n/status - 查看状态\n/stop - 停止当前任务\n/new - 清空当前上下文\n/restore - 恢复上次对话历史\n/llm [n] - 查看或切换模型"
FILE_HINT = "If you need to show files to user, use [FILE:filepath] in your response."
TAG_PATS = [r"<" + t + r">.*?</" + t + r">" for t in ("thinking", "summary", "tool_use", "file_content")]
def clean_reply(text):
for pat in TAG_PATS:
text = re.sub(pat, "", text or "", flags=re.DOTALL)
return re.sub(r"\n{3,}", "\n\n", text).strip() or "..."
def extract_files(text):
return re.findall(r"\[FILE:([^\]]+)\]", text or "")
def strip_files(text):
return re.sub(r"\[FILE:[^\]]+\]", "", text or "").strip()
def split_text(text, limit):
text, parts = (text or "").strip() or "...", []
while len(text) > limit:
cut = text.rfind("\n", 0, limit)
if cut < limit * 0.6:
cut = limit
parts.append(text[:cut].rstrip())
text = text[cut:].lstrip()
return parts + ([text] if text else []) or ["..."]
def format_restore():
files = glob.glob("./temp/model_responses_*.txt")
if not files:
return None, "❌ 没有找到历史记录"
latest = max(files, key=os.path.getmtime)
with open(latest, "r", encoding="utf-8") as f:
content = f.read()
users = re.findall(r"=== USER ===\n(.+?)(?==== |$)", content, re.DOTALL)
resps = re.findall(r"=== Response ===.*?\n(.+?)(?==== Prompt|$)", content, re.DOTALL)
restored = []
for u, r in zip(users, resps):
u, r = u.strip(), r.strip()[:500]
if u and r:
restored.extend([f"[USER]: {u}", f"[Agent] {r}"])
if not restored:
return None, "❌ 历史记录里没有可恢复内容"
return (restored, os.path.basename(latest), len(restored) // 2), None
def build_done_text(raw_text):
files = [p for p in extract_files(raw_text) if os.path.exists(p)]
body = strip_files(clean_reply(raw_text))
if files:
body = (body + "\n\n" if body else "") + "\n".join(f"生成文件: {p}" for p in files)
return body or "..."
def public_access(allowed):
return not allowed or "*" in allowed
def to_allowed_set(value):
if value is None:
return set()
if isinstance(value, str):
value = [value]
return {str(x).strip() for x in value if str(x).strip()}
def allowed_label(allowed):
return "public" if public_access(allowed) else sorted(allowed)
def ensure_single_instance(port, label):
try:
lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lock_sock.bind(("127.0.0.1", port))
return lock_sock
except OSError:
print(f"[{label}] Another instance is already running, skipping...")
sys.exit(1)
def require_runtime(agent, label, **required):
missing = [k for k, v in required.items() if not v]
if missing:
print(f"[{label}] ERROR: please set {', '.join(missing)} in mykey.py or mykey.json")
sys.exit(1)
if agent.llmclient is None:
print(f"[{label}] ERROR: no usable LLM backend found in mykey.py or mykey.json")
sys.exit(1)
def redirect_log(script_file, log_name, label, allowed):
log_dir = os.path.join(os.path.dirname(script_file), "temp")
os.makedirs(log_dir, exist_ok=True)
logf = open(os.path.join(log_dir, log_name), "a", encoding="utf-8", buffering=1)
sys.stdout = sys.stderr = logf
print(f"[NEW] {label} process starting, the above are history infos ...")
print(f"[{label}] allow list: {allowed_label(allowed)}")
class AgentChatMixin:
label = "Chat"
source = "chat"
split_limit = 1500
ping_interval = 20
def __init__(self, agent, user_tasks):
self.agent, self.user_tasks = agent, user_tasks
async def send_text(self, chat_id, content, **ctx):
raise NotImplementedError
async def send_done(self, chat_id, raw_text, **ctx):
await self.send_text(chat_id, build_done_text(raw_text), **ctx)
async def handle_command(self, chat_id, cmd, **ctx):
parts = (cmd or "").split()
op = (parts[0] if parts else "").lower()
if op == "/stop":
state = self.user_tasks.get(chat_id)
if state:
state["running"] = False
self.agent.abort()
return await self.send_text(chat_id, "⏹️ 正在停止...", **ctx)
if op == "/status":
llm = self.agent.get_llm_name() if self.agent.llmclient else "未配置"
return await self.send_text(chat_id, f"状态: {'🔴 运行中' if self.agent.is_running else '🟢 空闲'}\nLLM: [{self.agent.llm_no}] {llm}", **ctx)
if op == "/llm":
if not self.agent.llmclient:
return await self.send_text(chat_id, "❌ 当前没有可用的 LLM 配置", **ctx)
if len(parts) > 1:
try:
self.agent.next_llm(int(parts[1]))
return await self.send_text(chat_id, f"✅ 已切换到 [{self.agent.llm_no}] {self.agent.get_llm_name()}", **ctx)
except Exception:
return await self.send_text(chat_id, f"用法: /llm <0-{len(self.agent.list_llms()) - 1}>", **ctx)
lines = [f"{'→' if cur else ' '} [{i}] {name}" for i, name, cur in self.agent.list_llms()]
return await self.send_text(chat_id, "LLMs:\n" + "\n".join(lines), **ctx)
if op == "/restore":
try:
restored_info, err = format_restore()
if err:
return await self.send_text(chat_id, err, **ctx)
restored, fname, count = restored_info
self.agent.abort()
self.agent.history.extend(restored)
return await self.send_text(chat_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)", **ctx)
except Exception as e:
return await self.send_text(chat_id, f"❌ 恢复失败: {e}", **ctx)
if op == "/new":
self.agent.abort()
self.agent.history = []
return await self.send_text(chat_id, "🆕 已清空当前共享上下文", **ctx)
return await self.send_text(chat_id, HELP_TEXT, **ctx)
async def run_agent(self, chat_id, text, **ctx):
state = {"running": True}
self.user_tasks[chat_id] = state
try:
await self.send_text(chat_id, "思考中...", **ctx)
dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source)
last_ping = time.time()
while state["running"]:
try:
item = await asyncio.to_thread(dq.get, True, 3)
except Q.Empty:
if self.agent.is_running and time.time() - last_ping > self.ping_interval:
await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx)
last_ping = time.time()
continue
if "done" in item:
await self.send_done(chat_id, item.get("done", ""), **ctx)
break
if not state["running"]:
await self.send_text(chat_id, "⏹️ 已停止", **ctx)
except Exception as e:
import traceback
print(f"[{self.label}] run_agent error: {e}")
traceback.print_exc()
await self.send_text(chat_id, f"❌ 错误: {e}", **ctx)
finally:
self.user_tasks.pop(chat_id, None)