Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/service/common/mongo/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { addLog } from '../system/log';
import type { Mongoose } from 'mongoose';

const maxConnecting = Math.max(30, Number(process.env.DB_MAX_LINK || 20));

/**
* connect MongoDB and init data
*/
Expand Down
63 changes: 51 additions & 12 deletions packages/service/core/ai/config/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,58 @@ export const getSystemModelConfig = async (model: string): Promise<SystemModelIt
};

export const watchSystemModelUpdate = () => {
const changeStream = MongoSystemModel.watch();
let changeStream: any;

const createWatch = () => {
try {
changeStream = MongoSystemModel.watch();

changeStream.on(
'change',
debounce(async () => {
try {
// Main node will reload twice
await loadSystemModels(true);
// All node reaload buffer
await reloadFastGPTConfigBuffer();
} catch (error) {}
}, 500)
);
changeStream.on(
'change',
debounce(async () => {
try {
// Main node will reload twice
await loadSystemModels(true);
// All node reaload buffer
await reloadFastGPTConfigBuffer();
} catch (error) {
console.error('System model update error:', error);
}
}, 500)
);

// 添加错误处理和重连机制
changeStream.on('error', (error) => {
console.error('System model change stream error:', error);
setTimeout(() => {
console.log('Attempting to reconnect system model change stream...');
createWatch();
}, 5000);
});

changeStream.on('close', () => {
console.log('System model change stream closed, attempting to reconnect...');
setTimeout(() => {
createWatch();
}, 1000);
});

console.log('System model change stream created');
} catch (error) {
console.error('Failed to create system model change stream:', error);
setTimeout(() => {
createWatch();
}, 5000);
}
};

createWatch();

return () => {
if (changeStream) {
changeStream.close();
}
};
};

// 更新完模型后,需要重载缓存
Expand Down
32 changes: 31 additions & 1 deletion projects/app/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { exit } from 'process';
/*
Init system
*/
// 全局变量保存清理函数
let mongoWatchCleanup: (() => void) | null = null;

export async function register() {
try {
if (process.env.NEXT_RUNTIME === 'nodejs') {
Expand Down Expand Up @@ -60,15 +63,42 @@ export async function register() {
initAppTemplateTypes()
]);

startMongoWatch();
// 启动 MongoDB Change Streams 并保存清理函数
mongoWatchCleanup = await startMongoWatch();
startCron();
startTrainingQueue(true);
trackTimerProcess();

// 注册优雅关闭处理
setupGracefulShutdown();

console.log('Init system success');
}
} catch (error) {
console.log('Init system error', error);
exit(1);
}
}

// 优雅关闭处理
function setupGracefulShutdown() {
const gracefulShutdown = (signal: string) => {
console.log(`Received ${signal}, starting graceful shutdown...`);

if (mongoWatchCleanup) {
try {
mongoWatchCleanup();
console.log('MongoDB Change Streams closed successfully');
} catch (error) {
console.error('Error closing MongoDB Change Streams:', error);
}
}

process.exit(0);
};

// 监听各种关闭信号
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
process.on('SIGUSR2', () => gracefulShutdown('SIGUSR2')); // nodemon restart
}
152 changes: 124 additions & 28 deletions projects/app/src/service/common/system/volumnMongoWatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,138 @@ import { watchSystemModelUpdate } from '@fastgpt/service/core/ai/config/utils';
import { SystemConfigsTypeEnum } from '@fastgpt/global/common/system/config/constants';

export const startMongoWatch = async () => {
reloadConfigWatch();
createDatasetTrainingMongoWatch();
refetchAppTemplates();
watchSystemModelUpdate();
const cleanupFunctions: (() => void)[] = [];

// 启动所有 Change Stream 监听器并收集清理函数
cleanupFunctions.push(reloadConfigWatch());
cleanupFunctions.push(createDatasetTrainingMongoWatch());
cleanupFunctions.push(refetchAppTemplates());
cleanupFunctions.push(watchSystemModelUpdate());

console.log('All MongoDB change streams started');

// 返回清理函数,用于关闭所有 Change Stream
return () => {
console.log('Closing all MongoDB change streams...');
cleanupFunctions.forEach(cleanup => {
try {
cleanup();
} catch (error) {
console.error('Error closing change stream:', error);
}
});
console.log('All MongoDB change streams closed');
};
};

const reloadConfigWatch = () => {
const changeStream = MongoSystemConfigs.watch();

changeStream.on('change', async (change) => {
let changeStream: any;
const createWatch = () => {
try {
if (
change.operationType === 'update' ||
(change.operationType === 'insert' &&
[SystemConfigsTypeEnum.fastgptPro, SystemConfigsTypeEnum.license].includes(
change.fullDocument.type
))
) {
await initSystemConfig();
console.log('refresh system config');
}
} catch (error) {}
});
changeStream = MongoSystemConfigs.watch();

changeStream.on('change', async (change) => {
try {
if (
change.operationType === 'update' ||
(change.operationType === 'insert' &&
[SystemConfigsTypeEnum.fastgptPro, SystemConfigsTypeEnum.license].includes(
change.fullDocument.type
))
) {
await initSystemConfig();
console.log('refresh system config');
}
} catch (error) {
console.error('System config change processing error:', error);
}
});

// 添加错误处理和重连机制
changeStream.on('error', (error) => {
console.error('System config change stream error:', error);
setTimeout(() => {
console.log('Attempting to reconnect system config change stream...');
createWatch();
}, 5000);
});

changeStream.on('close', () => {
console.log('System config change stream closed, attempting to reconnect...');
setTimeout(() => {
createWatch();
}, 1000);
});

console.log('System config change stream created');
} catch (error) {
console.error('Failed to create system config change stream:', error);
setTimeout(() => {
createWatch();
}, 5000);
}
};

createWatch();

return () => {
if (changeStream) {
changeStream.close();
}
};
};

const refetchAppTemplates = () => {
const changeStream = MongoAppTemplate.watch();
let changeStream: any;

const createWatch = () => {
try {
changeStream = MongoAppTemplate.watch();

changeStream.on(
'change',
debounce(async (change) => {
setTimeout(() => {
try {
getAppTemplatesAndLoadThem(true);
} catch (error) {
console.error('App templates reload error:', error);
}
}, 5000);
}, 500)
);

changeStream.on(
'change',
debounce(async (change) => {
// 添加错误处理和重连机制
changeStream.on('error', (error) => {
console.error('App templates change stream error:', error);
setTimeout(() => {
console.log('Attempting to reconnect app templates change stream...');
createWatch();
}, 5000);
});

changeStream.on('close', () => {
console.log('App templates change stream closed, attempting to reconnect...');
setTimeout(() => {
createWatch();
}, 1000);
});

console.log('App templates change stream created');
} catch (error) {
console.error('Failed to create app templates change stream:', error);
setTimeout(() => {
try {
getAppTemplatesAndLoadThem(true);
} catch (error) {}
createWatch();
}, 5000);
}, 500)
);
}
};

createWatch();

return () => {
if (changeStream) {
changeStream.close();
}
};
};
70 changes: 55 additions & 15 deletions projects/app/src/service/core/dataset/training/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,63 @@ import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/sch
import { datasetParseQueue } from '../queues/datasetParse';

export const createDatasetTrainingMongoWatch = () => {
const changeStream = MongoDatasetTraining.watch();

changeStream.on('change', async (change) => {
let changeStream: any;
const createWatch = () => {
try {
if (change.operationType === 'insert') {
const fullDocument = change.fullDocument as DatasetTrainingSchemaType;
const { mode } = fullDocument;
if (mode === TrainingModeEnum.qa) {
generateQA();
} else if (mode === TrainingModeEnum.chunk) {
generateVector();
} else if (mode === TrainingModeEnum.parse) {
datasetParseQueue();
changeStream = MongoDatasetTraining.watch();

changeStream.on('change', async (change) => {
try {
if (change.operationType === 'insert') {
const fullDocument = change.fullDocument as DatasetTrainingSchemaType;
const { mode } = fullDocument;
if (mode === TrainingModeEnum.qa) {
generateQA();
} else if (mode === TrainingModeEnum.chunk) {
generateVector();
} else if (mode === TrainingModeEnum.parse) {
datasetParseQueue();
}
}
} catch (error) {
console.error('Change stream processing error:', error);
}
}
} catch (error) {}
});
});

// 添加错误处理和重连机制
changeStream.on('error', (error) => {
console.error('Change stream error:', error);
setTimeout(() => {
console.log('Attempting to reconnect change stream...');
createWatch();
}, 5000); // 5秒后重连
});

changeStream.on('close', () => {
console.log('Change stream closed, attempting to reconnect...');
setTimeout(() => {
createWatch();
}, 1000); // 1秒后重连
});

console.log('Dataset training change stream created');
} catch (error) {
console.error('Failed to create change stream:', error);
setTimeout(() => {
createWatch();
}, 5000);
}
};

createWatch();

// 返回清理函数
return () => {
if (changeStream) {
changeStream.close();
}
};
};

export const startTrainingQueue = (fast?: boolean) => {
Expand Down
Loading