diff --git a/packages/service/common/mongo/init.ts b/packages/service/common/mongo/init.ts index b7233f06c6a0..f743db5e8bb0 100644 --- a/packages/service/common/mongo/init.ts +++ b/packages/service/common/mongo/init.ts @@ -3,7 +3,6 @@ import { addLog } from '../system/log'; import type { Mongoose } from 'mongoose'; const maxConnecting = Math.max(30, Number(process.env.DB_MAX_LINK || 20)); - /** * connect MongoDB and init data */ diff --git a/packages/service/core/ai/config/utils.ts b/packages/service/core/ai/config/utils.ts index 3869b264d0dc..0a2901dd360f 100644 --- a/packages/service/core/ai/config/utils.ts +++ b/packages/service/core/ai/config/utils.ts @@ -234,19 +234,58 @@ export const getSystemModelConfig = async (model: string): Promise { - const changeStream = MongoSystemModel.watch(); + let changeStream: any; + + const createWatch = () => { + try { + changeStream = MongoSystemModel.watch(); - changeStream.on( - 'change', - debounce(async () => { - try { - // Main node will reload twice - await loadSystemModels(true); - // All node reaload buffer - await reloadFastGPTConfigBuffer(); - } catch (error) {} - }, 500) - ); + changeStream.on( + 'change', + debounce(async () => { + try { + // Main node will reload twice + await loadSystemModels(true); + // All node reaload buffer + await reloadFastGPTConfigBuffer(); + } catch (error) { + console.error('System model update error:', error); + } + }, 500) + ); + + // 添加错误处理和重连机制 + changeStream.on('error', (error) => { + console.error('System model change stream error:', error); + setTimeout(() => { + console.log('Attempting to reconnect system model change stream...'); + createWatch(); + }, 5000); + }); + + changeStream.on('close', () => { + console.log('System model change stream closed, attempting to reconnect...'); + setTimeout(() => { + createWatch(); + }, 1000); + }); + + console.log('System model change stream created'); + } catch (error) { + console.error('Failed to create system model change stream:', error); + setTimeout(() => { + createWatch(); + }, 5000); + } + }; + + createWatch(); + + return () => { + if (changeStream) { + changeStream.close(); + } + }; }; // 更新完模型后,需要重载缓存 diff --git a/projects/app/src/instrumentation.ts b/projects/app/src/instrumentation.ts index 28e28e766e2f..6ed7f30d40ce 100644 --- a/projects/app/src/instrumentation.ts +++ b/projects/app/src/instrumentation.ts @@ -3,6 +3,9 @@ import { exit } from 'process'; /* Init system */ +// 全局变量保存清理函数 +let mongoWatchCleanup: (() => void) | null = null; + export async function register() { try { if (process.env.NEXT_RUNTIME === 'nodejs') { @@ -60,11 +63,15 @@ export async function register() { initAppTemplateTypes() ]); - startMongoWatch(); + // 启动 MongoDB Change Streams 并保存清理函数 + mongoWatchCleanup = await startMongoWatch(); startCron(); startTrainingQueue(true); trackTimerProcess(); + // 注册优雅关闭处理 + setupGracefulShutdown(); + console.log('Init system success'); } } catch (error) { @@ -72,3 +79,26 @@ export async function register() { exit(1); } } + +// 优雅关闭处理 +function setupGracefulShutdown() { + const gracefulShutdown = (signal: string) => { + console.log(`Received ${signal}, starting graceful shutdown...`); + + if (mongoWatchCleanup) { + try { + mongoWatchCleanup(); + console.log('MongoDB Change Streams closed successfully'); + } catch (error) { + console.error('Error closing MongoDB Change Streams:', error); + } + } + + process.exit(0); + }; + + // 监听各种关闭信号 + process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); + process.on('SIGINT', () => gracefulShutdown('SIGINT')); + process.on('SIGUSR2', () => gracefulShutdown('SIGUSR2')); // nodemon restart +} diff --git a/projects/app/src/service/common/system/volumnMongoWatch.ts b/projects/app/src/service/common/system/volumnMongoWatch.ts index 729120ed7d12..cbe7cfb114a7 100644 --- a/projects/app/src/service/common/system/volumnMongoWatch.ts +++ b/projects/app/src/service/common/system/volumnMongoWatch.ts @@ -8,42 +8,138 @@ import { watchSystemModelUpdate } from '@fastgpt/service/core/ai/config/utils'; import { SystemConfigsTypeEnum } from '@fastgpt/global/common/system/config/constants'; export const startMongoWatch = async () => { - reloadConfigWatch(); - createDatasetTrainingMongoWatch(); - refetchAppTemplates(); - watchSystemModelUpdate(); + const cleanupFunctions: (() => void)[] = []; + + // 启动所有 Change Stream 监听器并收集清理函数 + cleanupFunctions.push(reloadConfigWatch()); + cleanupFunctions.push(createDatasetTrainingMongoWatch()); + cleanupFunctions.push(refetchAppTemplates()); + cleanupFunctions.push(watchSystemModelUpdate()); + + console.log('All MongoDB change streams started'); + + // 返回清理函数,用于关闭所有 Change Stream + return () => { + console.log('Closing all MongoDB change streams...'); + cleanupFunctions.forEach(cleanup => { + try { + cleanup(); + } catch (error) { + console.error('Error closing change stream:', error); + } + }); + console.log('All MongoDB change streams closed'); + }; }; const reloadConfigWatch = () => { - const changeStream = MongoSystemConfigs.watch(); - - changeStream.on('change', async (change) => { + let changeStream: any; + + const createWatch = () => { try { - if ( - change.operationType === 'update' || - (change.operationType === 'insert' && - [SystemConfigsTypeEnum.fastgptPro, SystemConfigsTypeEnum.license].includes( - change.fullDocument.type - )) - ) { - await initSystemConfig(); - console.log('refresh system config'); - } - } catch (error) {} - }); + changeStream = MongoSystemConfigs.watch(); + + changeStream.on('change', async (change) => { + try { + if ( + change.operationType === 'update' || + (change.operationType === 'insert' && + [SystemConfigsTypeEnum.fastgptPro, SystemConfigsTypeEnum.license].includes( + change.fullDocument.type + )) + ) { + await initSystemConfig(); + console.log('refresh system config'); + } + } catch (error) { + console.error('System config change processing error:', error); + } + }); + + // 添加错误处理和重连机制 + changeStream.on('error', (error) => { + console.error('System config change stream error:', error); + setTimeout(() => { + console.log('Attempting to reconnect system config change stream...'); + createWatch(); + }, 5000); + }); + + changeStream.on('close', () => { + console.log('System config change stream closed, attempting to reconnect...'); + setTimeout(() => { + createWatch(); + }, 1000); + }); + + console.log('System config change stream created'); + } catch (error) { + console.error('Failed to create system config change stream:', error); + setTimeout(() => { + createWatch(); + }, 5000); + } + }; + + createWatch(); + + return () => { + if (changeStream) { + changeStream.close(); + } + }; }; const refetchAppTemplates = () => { - const changeStream = MongoAppTemplate.watch(); + let changeStream: any; + + const createWatch = () => { + try { + changeStream = MongoAppTemplate.watch(); + + changeStream.on( + 'change', + debounce(async (change) => { + setTimeout(() => { + try { + getAppTemplatesAndLoadThem(true); + } catch (error) { + console.error('App templates reload error:', error); + } + }, 5000); + }, 500) + ); - changeStream.on( - 'change', - debounce(async (change) => { + // 添加错误处理和重连机制 + changeStream.on('error', (error) => { + console.error('App templates change stream error:', error); + setTimeout(() => { + console.log('Attempting to reconnect app templates change stream...'); + createWatch(); + }, 5000); + }); + + changeStream.on('close', () => { + console.log('App templates change stream closed, attempting to reconnect...'); + setTimeout(() => { + createWatch(); + }, 1000); + }); + + console.log('App templates change stream created'); + } catch (error) { + console.error('Failed to create app templates change stream:', error); setTimeout(() => { - try { - getAppTemplatesAndLoadThem(true); - } catch (error) {} + createWatch(); }, 5000); - }, 500) - ); + } + }; + + createWatch(); + + return () => { + if (changeStream) { + changeStream.close(); + } + }; }; diff --git a/projects/app/src/service/core/dataset/training/utils.ts b/projects/app/src/service/core/dataset/training/utils.ts index 687464db49e8..b5fc88bb88c7 100644 --- a/projects/app/src/service/core/dataset/training/utils.ts +++ b/projects/app/src/service/core/dataset/training/utils.ts @@ -6,23 +6,63 @@ import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/sch import { datasetParseQueue } from '../queues/datasetParse'; export const createDatasetTrainingMongoWatch = () => { - const changeStream = MongoDatasetTraining.watch(); - - changeStream.on('change', async (change) => { + let changeStream: any; + + const createWatch = () => { try { - if (change.operationType === 'insert') { - const fullDocument = change.fullDocument as DatasetTrainingSchemaType; - const { mode } = fullDocument; - if (mode === TrainingModeEnum.qa) { - generateQA(); - } else if (mode === TrainingModeEnum.chunk) { - generateVector(); - } else if (mode === TrainingModeEnum.parse) { - datasetParseQueue(); + changeStream = MongoDatasetTraining.watch(); + + changeStream.on('change', async (change) => { + try { + if (change.operationType === 'insert') { + const fullDocument = change.fullDocument as DatasetTrainingSchemaType; + const { mode } = fullDocument; + if (mode === TrainingModeEnum.qa) { + generateQA(); + } else if (mode === TrainingModeEnum.chunk) { + generateVector(); + } else if (mode === TrainingModeEnum.parse) { + datasetParseQueue(); + } + } + } catch (error) { + console.error('Change stream processing error:', error); } - } - } catch (error) {} - }); + }); + + // 添加错误处理和重连机制 + changeStream.on('error', (error) => { + console.error('Change stream error:', error); + setTimeout(() => { + console.log('Attempting to reconnect change stream...'); + createWatch(); + }, 5000); // 5秒后重连 + }); + + changeStream.on('close', () => { + console.log('Change stream closed, attempting to reconnect...'); + setTimeout(() => { + createWatch(); + }, 1000); // 1秒后重连 + }); + + console.log('Dataset training change stream created'); + } catch (error) { + console.error('Failed to create change stream:', error); + setTimeout(() => { + createWatch(); + }, 5000); + } + }; + + createWatch(); + + // 返回清理函数 + return () => { + if (changeStream) { + changeStream.close(); + } + }; }; export const startTrainingQueue = (fast?: boolean) => {