Skip to content

Commit 19f2b68

Browse files
committed
fix: resolve 12 bugs in v0.2 codebase
Bug fixes included: Compose Module: - fix(compose): correct typo in Next.input field description (#1) - fix(workflow): defer next_pointer cleanup until first step succeeds (#2) - fix(team): enhance task group exception handling (#11) Core Module: - fix(react-engine): notify user on streaming loop exit reason (#3) - fix(memory): use lazy initialization for SummaryTokenMemory lock (#4) - fix(session): add debounce mechanism for auto-save (#5) - fix(toolbox): ensure thread-safe stats iteration (#6) - fix(structure): add length limits to JSON extraction (#10) - fix(events): use id() for robust handler deduplication (#12) Plugins Module: - fix(storage): add missing @register_storage to SQLiteStorage (#7) - fix(models): improve tokenizer preloading for non-OpenAI models (#8) - fix(knowledge): use Pydantic PrivateAttr for RetrievalTool (#9) Breaking Changes: None Migration Required: None Tested on: Python 3.10, 3.11, 3.12
1 parent 6932ab1 commit 19f2b68

File tree

19 files changed

+1520
-148
lines changed

19 files changed

+1520
-148
lines changed

docs/system design document.md

Lines changed: 1048 additions & 0 deletions
Large diffs are not rendered by default.

gecko/compose/nodes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def check_score(score: int):
3535
"""
3636
node: str = Field(..., description="下一个节点的名称")
3737
input: Optional[Any] = Field(default=None,
38-
iption="传递给下一个节点的输入数据。如果为 None,则保持上下文中的 last_output 不变。"
38+
description="传递给下一个节点的输入数据。如果为 None,则保持上下文中的 last_output 不变。"
3939
) # type: ignore
4040
# [New] 允许在跳转时更新 Context.state # type: ignore
4141
update_state: Dict[str, Any] = Field(

gecko/compose/team.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,17 @@ async def run(self, context_or_input: Any) -> List[MemberResult]:
105105
)
106106

107107
# 2. 初始化容器
108-
# 使用 MemberResult 占位,初始状态设为失败,防止未执行的情况
108+
# 注释: results[idx] 的写入是索引隔离的,每个 worker 只写自己的索引
109+
# 这种模式在 Python 中是并发安全的,因为列表元素的赋值是原子操作
109110
results: List[Optional[MemberResult]] = [None] * member_count
110111

111112
# 3. 准备并发控制
112113
semaphore = anyio.Semaphore(self.max_concurrent) if self.max_concurrent > 0 else None
113114

115+
# 修复: 追踪是否有任务失败(用于日志)
116+
failed_indices: List[int] = []
117+
failed_lock = anyio.Lock() # 用于保护 failed_indices 的并发写入
118+
114119
# 4. 定义 Worker
115120
async def _worker(idx: int, member: Any):
116121
if semaphore:
@@ -139,15 +144,30 @@ async def _worker(idx: int, member: Any):
139144
error=str(e),
140145
is_success=False
141146
)
147+
# 修复: 安全地记录失败索引
148+
async with failed_lock:
149+
failed_indices.append(idx)
142150
finally:
143151
if semaphore:
144152
semaphore.release()
145153

146154
# 5. 启动并发任务组
147-
async with anyio.create_task_group() as tg:
148-
for idx, member in enumerate(self.members):
149-
tg.start_soon(_worker, idx, member)
150-
155+
# 注释: anyio.create_task_group 会等待所有任务完成
156+
# 如果任何任务抛出未捕获的异常,会取消其他任务
157+
# 但我们在 _worker 中已经捕获了所有异常,所以不会发生取消
158+
try:
159+
async with anyio.create_task_group() as tg:
160+
for idx, member in enumerate(self.members):
161+
tg.start_soon(_worker, idx, member)
162+
except ExceptionGroup as eg:
163+
# 修复: Python 3.11+ 的 ExceptionGroup 处理
164+
logger.error(
165+
"Team execution encountered exceptions",
166+
team=self.name,
167+
exception_count=len(eg.exceptions)
168+
)
169+
# 异常已在 worker 中处理并记录到 results,这里只记录日志
170+
151171
# 6. 结果整理
152172
# 理论上 task_group 结束时所有 results 都已被赋值,这里做一次非空断言过滤
153173
final_results = [r for r in results if r is not None]

gecko/compose/workflow.py

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ async def execute(self, input_data: Any, session_id: Optional[str] = None) -> An
409409
async def resume(self, session_id: str) -> Any:
410410
"""
411411
从存储中恢复执行
412+
413+
修复: 延迟清除 next_pointer,确保首步执行成功后再清除
412414
"""
413415
if not self.storage:
414416
raise ValueError("Cannot resume: Storage not configured")
@@ -431,8 +433,9 @@ async def resume(self, session_id: str) -> Any:
431433

432434
# 3. 确定下一步
433435
next_node = None
436+
# ✅ 修复: 使用标记位追踪是否从 next_pointer 恢复,而不是立即清除
437+
resumed_from_pointer = False
434438

435-
# [优化] 优先检查是否存在动态跳转指针
436439
if context.next_pointer:
437440
logger.info("Resuming from dynamic Next pointer", target=context.next_pointer.get("target_node"))
438441
next_node = context.next_pointer.get("target_node")
@@ -441,17 +444,15 @@ async def resume(self, session_id: str) -> Any:
441444
if context.next_pointer.get("input"):
442445
context.state["_next_input"] = context.next_pointer["input"]
443446

444-
# 消费指针 (已使用,清除以避免重复)
445-
context.clear_next_pointer()
447+
# ✅ 修复: 标记而不是立即清除,让 _execute_loop 在成功执行后清除
448+
resumed_from_pointer = True
446449

447450
elif last_node:
448-
# 只有在没有动态指针时,才回退到基于静态图的推导
449451
next_node = await self._find_next_node(last_node, context)
450452
if not next_node:
451453
logger.info("Workflow already completed (no next node)", session_id=session_id)
452454
return context.get_last_output()
453455
else:
454-
# 这是一个全新的会话(或者刚初始化未执行)
455456
next_node = self._entry_point
456457

457458
# 4. 继续执行循环
@@ -460,7 +461,8 @@ async def resume(self, session_id: str) -> Any:
460461
context,
461462
session_id,
462463
start_node=next_node,
463-
start_step=current_step
464+
start_step=current_step,
465+
clear_pointer_after_first_step=resumed_from_pointer # ✅ 新增参数
464466
)
465467

466468
# 最终保存
@@ -473,39 +475,39 @@ async def resume(self, session_id: str) -> Any:
473475
logger.exception("Resume execution failed")
474476
raise
475477

476-
async def _execute_loop(self,
477-
context: WorkflowContext,
478-
session_id: Optional[str],
479-
start_node: Optional[str],
480-
start_step: int):
478+
async def _execute_loop(
479+
self,
480+
context: WorkflowContext,
481+
session_id: Optional[str],
482+
start_node: Optional[str],
483+
start_step: int,
484+
clear_pointer_after_first_step: bool = False # ✅ 新增参数
485+
):
481486
"""核心执行循环"""
482487
current_node = start_node
483488
steps = start_step
489+
is_first_step = True # ✅ 追踪是否为首步
484490

485491
while current_node and steps < self.max_steps:
486492
steps += 1
487493

488-
# 1. 如果是从 next_pointer 恢复的(Resume 场景),跳过执行,直接流转
489-
# 但这里逻辑比较绕,更清晰的是:如果 next_pointer 存在,说明上一步是 Next 指令,
490-
# 且已经持久化了,我们应该直接使用 next_pointer 指向的节点作为 current_node。
491-
# 这在 resume() 方法中处理更合适,这里保持循环逻辑。
492-
493494
# 执行节点
494495
logger.debug("Executing step", step=steps, node=current_node)
495-
# 执行节点逻辑
496496
result = await self._execute_node_safe(current_node, context)
497497

498+
# ✅ 修复: 首步成功后清除 pointer
499+
if is_first_step and clear_pointer_after_first_step:
500+
context.clear_next_pointer()
501+
is_first_step = False
502+
498503
# 准备持久化所需的临时变量
499-
# 记录当前节点为“已完成节点”
500504
persist_node = current_node
501505
next_target = None
502506

503507
# 处理流转逻辑
504508
if isinstance(result, Next):
505-
# === 动态跳转处理 ===
506509
next_target = result.node
507510

508-
# 更新 Input / State
509511
if result.input is not None:
510512
normalized = self._normalize_result(result.input)
511513
context.history["last_output"] = normalized
@@ -514,50 +516,26 @@ async def _execute_loop(self,
514516
if result.update_state:
515517
context.state.update(result.update_state)
516518

517-
# [关键优化] 记录动态指针,确保持久化时包含此信息
518519
context.next_pointer = {
519520
"target_node": next_target,
520521
"input": context.state.get("_next_input")
521522
}
522523

523-
# 即使是跳转,也需要在 history 中留痕,证明此节点已执行完毕
524-
# 这里记录一个特殊的标识,方便调试
525524
context.history[current_node] = f"<Next -> {next_target}>"
526525

527526
else:
528-
# === 静态流转处理 ===
529527
normalized = self._normalize_result(result)
530528
context.history[current_node] = normalized
531529
context.history["last_output"] = normalized
532530

533-
# 既然走了静态流程,确保清除之前的指针(防御性编程)
534531
context.clear_next_pointer()
535532

536-
# 基于静态图寻找下一跳
537533
next_target = await self._find_next_node(current_node, context)
538534

539-
# [优化] 立即持久化 (Atomic Checkpoint)
540-
# 此时 context 包含了最新的 history 和 next_pointer
541-
# 即使下一秒 Crash,resume 时也能通过 next_pointer 找到 next_target
535+
# 持久化
542536
if self.storage and session_id:
543537
await self._persist_state(session_id, steps, persist_node, context)
544538

545-
# [新增] 关键修正:状态推进
546-
# 一旦完成了持久化,next_pointer 的使命(防Crash)在当前步已完成。
547-
# 进入下一步前,如果那是基于 next_pointer 的跳转,理论上应在内存中清除,
548-
# 以免在 B 的 Pre-Commit 中还带着 "A->B" 的指针。
549-
# 但是,如果我们在 B 执行前 Crash,Resume 时加载的是 A 执行后的状态(含指针),这是对的。
550-
# 如果我们在 B 执行中 Crash,Resume 加载的是 B 的 Pre-Commit 状态。
551-
# B 的 Pre-Commit 状态如果包含 "A->B" 指针,Resume 会再次尝试跳转到 B。
552-
# 此时 last_node="B", next_pointer={"target":"B"}。
553-
# Resume 逻辑:优先 next_pointer -> target="B"。结果一样。
554-
555-
# 问题的根源是测试用例的查找逻辑太宽泛了。它只要找到包含 next_pointer 的记录就认为那是 A 的记录。
556-
# 实际上 B 的 Pre-Commit 记录也包含了它。
557-
558-
# 让我们在测试用例中更精确地定位。
559-
560-
# 推进到下一个节点
561539
current_node = next_target
562540

563541
if steps >= self.max_steps:

gecko/core/engine/react.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -385,14 +385,15 @@ async def _run_reasoning_loop(
385385
tool_calls=last_msg.tool_calls or [],
386386
)
387387

388-
# [修改方法] 重构流式循环,使用 _process_turn_results
388+
389389
async def _run_streaming_loop(
390390
self, context: ExecutionContext, llm_params: Dict[str, Any]
391391
) -> AsyncIterator[str]:
392392
"""
393393
ReAct 流式循环 (递归模式)
394+
395+
修复: 在检测到死循环或错误退出时,向用户输出提示信息
394396
"""
395-
# 循环控制:只要 turn 未达上限,且 should_continue 为 True,就一直循环
396397
while context.turn < self.max_turns:
397398
context.turn += 1
398399

@@ -402,30 +403,23 @@ async def _run_streaming_loop(
402403

403404
messages_payload = [m.to_openai_format() for m in context.messages]
404405

405-
# 状态累积器 (每轮开始前重置)
406406
collected_content = []
407407
tool_calls_data: List[Dict[str, Any]] = []
408408

409-
# 1. 消费流 (Inner Loop: Streaming Consumer)
410-
# 负责将 LLM 的 Token 实时透传给用户,并累积工具调用信息
411409
async for chunk in self.model.astream(messages=messages_payload, **llm_params): # type: ignore
412410
delta = self._extract_delta(chunk)
413411

414-
# A. 文本内容:实时 Yield
415412
content = delta.get("content")
416413
if content:
417414
collected_content.append(content)
418415
yield content
419416

420-
# B. 工具调用:后台累积
421417
if delta.get("tool_calls"):
422418
self._accumulate_tool_chunks(tool_calls_data, delta["tool_calls"])
423419

424-
# 2. 组装完整消息 (Turn Completion)
425420
final_text = "".join(collected_content)
426421
assistant_msg = Message.assistant(content=final_text)
427422

428-
# 清洗并组装工具调用
429423
if tool_calls_data:
430424
valid_calls = [
431425
tc for tc in tool_calls_data
@@ -434,20 +428,47 @@ async def _run_streaming_loop(
434428
if valid_calls:
435429
assistant_msg.tool_calls = valid_calls
436430

437-
# 3. 处理回合逻辑 (Decision Making)
438-
# 复用基类的 _process_turn_results 方法
439-
# 返回 True 表示 "工具已执行完毕,状态已更新,请继续下一轮 LLM 推理"
440-
# 返回 False 表示 "任务完成" 或 "检测到死循环/无需工具",应退出循环
441431
should_continue = await self._process_turn_results(
442432
context, assistant_msg, response_model=None
443433
)
444434

445-
# 如果不需要继续,跳出 while 循环,结束流式生成
446435
if not should_continue:
436+
# ✅ 修复: 检查退出原因并通知用户
437+
exit_reason = self._get_exit_reason(context, assistant_msg)
438+
if exit_reason:
439+
yield f"\n\n[System: {exit_reason}]"
447440
break
448-
449-
# 如果 should_continue 为 True,while 循环会自动进入下一轮
450-
# context.turn 增加,context.messages 已包含工具结果
441+
442+
def _get_exit_reason(self, context: ExecutionContext, last_message: Message) -> Optional[str]:
443+
"""
444+
✅ 新增: 确定流式循环退出的原因
445+
"""
446+
# 检查是否因死循环退出
447+
if last_message.tool_calls:
448+
if context.last_tool_calls_hash is not None:
449+
try:
450+
calls_dump = json.dumps(
451+
[
452+
{
453+
"name": tc["function"]["name"],
454+
"args": tc["function"]["arguments"],
455+
}
456+
for tc in last_message.tool_calls
457+
],
458+
sort_keys=True,
459+
)
460+
current_hash = hash(calls_dump)
461+
if context.last_tool_calls_hash == current_hash:
462+
return "Execution stopped due to detected infinite tool loop."
463+
except Exception:
464+
pass
465+
466+
# 检查是否因连续错误退出
467+
if context.consecutive_tool_error_count >= 3:
468+
return "Execution stopped due to repeated tool errors."
469+
470+
# 正常结束
471+
return None
451472

452473
# ===================== 辅助逻辑 =====================
453474

gecko/core/events/bus.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,22 @@ async def publish(self, event: BaseEvent, wait: bool = False):
8181
return
8282

8383
# 2. 获取订阅者
84-
handlers = self._subscribers.get(event.type, []) + self._subscribers.get("*", [])
85-
if not handlers:
84+
type_handlers = self._subscribers.get(event.type, [])
85+
wildcard_handlers = self._subscribers.get("*", [])
86+
87+
if not type_handlers and not wildcard_handlers:
8688
return
87-
88-
# 3. 执行处理(去重)
89-
unique_handlers = list(dict.fromkeys(handlers))
89+
90+
# 3. 修复: 使用 id() 进行更健壮的去重
91+
# dict.fromkeys() 依赖对象的 __hash__,对于方法和 lambda 可能不可靠
92+
seen_ids = set()
93+
unique_handlers = []
94+
95+
for h in type_handlers + wildcard_handlers:
96+
handler_id = id(h)
97+
if handler_id not in seen_ids:
98+
seen_ids.add(handler_id)
99+
unique_handlers.append(h)
90100

91101
# 创建执行协程
92102
tasks = [self._execute_handler(h, event) for h in unique_handlers]

0 commit comments

Comments
 (0)