Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 84 additions & 86 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/agent/acp/AcpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ export class AcpConnection {
return new Promise((resolve, reject) => {
// Use longer timeout for session/prompt requests as they involve LLM processing
// Complex tasks like document processing may need significantly more time
const timeoutDuration = method === 'session/prompt' ? 300000 : 60000; // 5 minutes for prompts, 1 minute for others
// 优化超时配置:LLM 请求 2 分钟,其他请求 30 秒,加快错误恢复
const timeoutDuration = method === 'session/prompt' ? 120000 : 30000; // 2 minutes for prompts, 30 seconds for others
const startTime = Date.now();

const createTimeoutHandler = () => {
Expand Down
8 changes: 4 additions & 4 deletions src/agent/acp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export class AcpAgent {

let connectTimeoutId: NodeJS.Timeout | null = null;
const connectTimeoutPromise = new Promise<never>((_, reject) => {
connectTimeoutId = setTimeout(() => reject(new Error('Connection timeout after 70 seconds')), 70000);
connectTimeoutId = setTimeout(() => reject(new Error('Connection timeout after 30 seconds')), 30000);
});

try {
Expand Down Expand Up @@ -217,7 +217,7 @@ export class AcpAgent {
await this.connection.setSessionMode(sessionMode);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
throw new Error(`[ACP] Failed to enable ${this.extra.backend} YOLO mode (${sessionMode}): ${errorMessage}`);
throw new Error(`[ACP] Failed to enable ${this.extra.backend.charAt(0).toUpperCase() + this.extra.backend.slice(1)} YOLO mode (${sessionMode}): ${errorMessage}`);
}
}
}
Expand Down Expand Up @@ -677,7 +677,7 @@ export class AcpAgent {
this.pendingPermissions.delete(requestId);
reject(new Error('Permission request timed out'));
}
}, 70000);
}, 30000);
});
}

Expand Down Expand Up @@ -994,7 +994,7 @@ export class AcpAgent {

const loginProcess = spawn(command, args, {
stdio: 'pipe', // 避免干扰用户界面
timeout: 70000,
timeout: 30000,
});

await new Promise<void>((resolve, reject) => {
Expand Down
2 changes: 1 addition & 1 deletion src/agent/gemini/cli/streamResilience.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export const DEFAULT_STREAM_RESILIENCE_CONFIG: StreamResilienceConfig = {
maxRetries: 3,
initialRetryDelayMs: 1000,
maxRetryDelayMs: 10000,
heartbeatTimeoutMs: 90000, // 90 seconds without data considered disconnected / 90秒无数据则认为断开
heartbeatTimeoutMs: 30000, // 30 seconds without data considered disconnected / 30秒无数据则认为断开(优化:加快错误恢复)
requestTimeoutMs: 120000, // 2 minutes request timeout / 2分钟请求超时
enableAutoReconnect: true,
};
Expand Down
31 changes: 31 additions & 0 deletions src/process/WorkerManage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,36 @@ const kill = (id: string) => {
taskList.splice(index, 1);
};

/**
* 清理对话相关的所有资源
* 在对话关闭或删除时调用,防止内存泄漏
*
* @param conversationId - 对话 ID
*/
const cleanupConversation = (conversationId: string) => {
console.log(`[WorkerManage] Cleaning up resources for conversation: ${conversationId}`);

// 1. 清理 Worker 任务
kill(conversationId);

// 2. 清理消息管理缓存(如果存在)
// 通过访问 message.ts 的 Cache 来清理
try {
// 动态导入以避免循环依赖
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { ConversationManageWithDB } = require('./message');
const cache = (ConversationManageWithDB as any).Cache;
if (cache && cache.has(conversationId)) {
cache.delete(conversationId);
console.log(`[WorkerManage] Cleaned up message cache for conversation: ${conversationId}`);
}
} catch (error) {
console.warn(`[WorkerManage] Failed to cleanup message cache:`, error);
}

console.log(`[WorkerManage] Cleanup completed for conversation: ${conversationId}`);
};

const clear = () => {
taskList.forEach((item) => {
item.task.kill();
Expand Down Expand Up @@ -181,6 +211,7 @@ const WorkerManage = {
listTasks,
kill,
clear,
cleanupConversation,
};

export default WorkerManage;
3 changes: 3 additions & 0 deletions src/process/bridge/conversationBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ export function initConversationBridge(): void {
return false;
}

// 清理对话相关的 Worker 任务和缓存,防止内存泄漏
WorkerManage.cleanupConversation(id);

return true;
} catch (error) {
console.error('[conversationBridge] Failed to remove conversation:', error);
Expand Down
4 changes: 2 additions & 2 deletions src/process/bridge/databaseBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { migrateConversationToDatabase } from './migrationUtils';

export function initDatabaseBridge(): void {
// Get conversation messages from database
ipcBridge.database.getConversationMessages.provider(({ conversation_id, page = 0, pageSize = 10000 }) => {
ipcBridge.database.getConversationMessages.provider(({ conversation_id, page = 0, pageSize = 50 }) => {
try {
const db = getDatabase();
const result = db.getConversationMessages(conversation_id, page, pageSize);
Expand All @@ -24,7 +24,7 @@ export function initDatabaseBridge(): void {
});

// Get user conversations from database with lazy migration from file storage
ipcBridge.database.getUserConversations.provider(async ({ page = 0, pageSize = 10000 }) => {
ipcBridge.database.getUserConversations.provider(async ({ page = 0, pageSize = 50 }) => {
try {
const db = getDatabase();
const result = db.getUserConversations(undefined, page, pageSize);
Expand Down
93 changes: 56 additions & 37 deletions src/process/bridge/fsBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -443,48 +443,67 @@ export function initFsBridge(): void {
// 确保工作空间目录存在 / Ensure workspace directory exists
await fs.mkdir(workspace, { recursive: true });

for (const filePath of filePaths) {
try {
let targetPath: string;
// 批量并发处理文件复制,提升性能
const CONCURRENCY = 5; // 并发数,避免同时打开过多文件句柄
const chunks: (typeof filePaths)[] = [];
for (let i = 0; i < filePaths.length; i += CONCURRENCY) {
chunks.push(filePaths.slice(i, i + CONCURRENCY));
}

if (sourceRoot) {
// Preserve directory structure / 保留目录结构
const relativePath = path.relative(sourceRoot, filePath);
targetPath = path.join(workspace, relativePath);
for (const chunk of chunks) {
const chunkPromises = chunk.map(async (filePath) => {
try {
let targetPath: string;

if (sourceRoot) {
// Preserve directory structure / 保留目录结构
const relativePath = path.relative(sourceRoot, filePath);
targetPath = path.join(workspace, relativePath);

// Ensure parent directory exists / 确保父目录存在
await fs.mkdir(path.dirname(targetPath), { recursive: true });
} else {
// Flatten to root (legacy behavior) / 扁平化到根目录(旧行为)
const fileName = path.basename(filePath);
targetPath = path.join(workspace, fileName);
}

// Ensure parent directory exists / 确保父目录存在
await fs.mkdir(path.dirname(targetPath), { recursive: true });
} else {
// Flatten to root (legacy behavior) / 扁平化到根目录(旧行为)
const fileName = path.basename(filePath);
targetPath = path.join(workspace, fileName);
}
// 检查目标文件是否已存在
const exists = await fs
.access(targetPath)
.then(() => true)
.catch(() => false);

let finalTargetPath = targetPath;
if (exists) {
// 如果文件已存在,添加时间戳后缀 / Append timestamp when target file already exists
const timestamp = Date.now();
const ext = path.extname(targetPath);
const name = path.basename(targetPath, ext);
// Construct new path in the same directory / 在同一目录下构建新路径
const dir = path.dirname(targetPath);
const newFileName = `${name}${AIONUI_TIMESTAMP_SEPARATOR}${timestamp}${ext}`;
finalTargetPath = path.join(dir, newFileName);
}

// 检查目标文件是否已存在
const exists = await fs
.access(targetPath)
.then(() => true)
.catch(() => false);

let finalTargetPath = targetPath;
if (exists) {
// 如果文件已存在,添加时间戳后缀 / Append timestamp when target file already exists
const timestamp = Date.now();
const ext = path.extname(targetPath);
const name = path.basename(targetPath, ext);
// Construct new path in the same directory / 在同一目录下构建新路径
const dir = path.dirname(targetPath);
const newFileName = `${name}${AIONUI_TIMESTAMP_SEPARATOR}${timestamp}${ext}`;
finalTargetPath = path.join(dir, newFileName);
await fs.copyFile(filePath, finalTargetPath);
return { success: true, path: finalTargetPath };
} catch (error) {
// 记录失败的文件路径与错误信息,前端可以用来提示用户 / Record failed file info so UI can warn user
const message = error instanceof Error ? error.message : String(error);
return { success: false, path: filePath, error: message };
}
});

const results = await Promise.all(chunkPromises);

await fs.copyFile(filePath, finalTargetPath);
copiedFiles.push(finalTargetPath);
} catch (error) {
// 记录失败的文件路径与错误信息,前端可以用来提示用户 / Record failed file info so UI can warn user
const message = error instanceof Error ? error.message : String(error);
console.error(`Failed to copy file ${filePath}:`, message);
failedFiles.push({ path: filePath, error: message });
// 处理结果
for (const result of results) {
if (result.success) {
copiedFiles.push(result.path);
} else {
failedFiles.push({ path: result.path, error: result.error || 'Unknown error' });
}
}
}

Expand Down
28 changes: 20 additions & 8 deletions src/process/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { composeMessage } from '@/common/chatLib';
import type { AcpBackend } from '@/types/acpTypes';
import { getDatabase } from './database/export';
import { ProcessChat } from './initStorage';
import { streamingBuffer } from './database/StreamingMessageBuffer';

const Cache = new Map<string, ConversationManageWithDB>();

Expand All @@ -30,30 +31,41 @@ class ConversationManageWithDB {
return manage;
}
sync(type: 'insert' | 'accumulate', message: TMessage) {
this.stack.push([type, message]);
clearTimeout(this.timer);
if (type === 'insert') {
this.save2DataBase();
// 对于 accumulate 类型,直接使用 StreamingMessageBuffer 进行批量更新
// 这可以显著减少数据库 I/O 操作(从每个 chunk 一次查询减少到每 300ms 或 20 个 chunk 一次)
if (type === 'accumulate') {
const msgId = message.msg_id || message.id;
// 只处理包含 content.content 属性的消息类型
let content = '';
if (message.type === 'text' || message.type === 'tips') {
content = message.content.content || '';
}
streamingBuffer.append(message.id, msgId, this.conversation_id, content, 'accumulate');
// 仍然保存到 stack,以便在需要时处理其他逻辑
this.stack.push([type, message]);
return;
}
this.timer = setTimeout(() => {
this.save2DataBase();
}, 2000);

// 对于 insert 类型,使用原有的批量处理机制
this.stack.push([type, message]);
clearTimeout(this.timer);
this.save2DataBase();
}

private save2DataBase() {
this.savePromise = this.savePromise
.then(() => {
const stack = this.stack.slice();
this.stack = [];
const messages = this.db.getConversationMessages(this.conversation_id, 0, 50, 'DESC'); //
const messages = this.db.getConversationMessages(this.conversation_id, 0, 50, 'DESC');
let messageList = messages.data.reverse();
let updateMessage = stack.shift();
while (updateMessage) {
if (updateMessage[0] === 'insert') {
this.db.insertMessage(updateMessage[1]);
messageList.push(updateMessage[1]);
} else {
// accumulate 类型已经由 StreamingMessageBuffer 处理,这里只处理需要 compose 的情况
messageList = composeMessage(updateMessage[1], messageList, (type, message) => {
if (type === 'insert') this.db.insertMessage(message);
if (type === 'update') {
Expand Down
Loading