11import { getErrText } from '@fastgpt/global/common/error/utils' ;
22import { getNextTimeByCronStringAndTimezone } from '@fastgpt/global/common/string/time' ;
33import { getNanoid } from '@fastgpt/global/common/string/tools' ;
4- import { delay , retryFn } from '@fastgpt/global/common/system/utils' ;
4+ import { batchRun , retryFn } from '@fastgpt/global/common/system/utils' ;
55import {
66 ChatItemValueTypeEnum ,
77 ChatRoleEnum ,
88 ChatSourceEnum
99} from '@fastgpt/global/core/chat/constants' ;
10- import { type UserChatItemValueItemType } from '@fastgpt/global/core/chat/type' ;
10+ import type {
11+ AIChatItemValueItemType ,
12+ UserChatItemValueItemType ,
13+ ChatHistoryItemResType
14+ } from '@fastgpt/global/core/chat/type' ;
1115import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants' ;
1216import {
1317 getWorkflowEntryNodeIds ,
@@ -21,7 +25,6 @@ import { getAppLatestVersion } from '@fastgpt/service/core/app/version/controlle
2125import { saveChat } from '@fastgpt/service/core/chat/saveChat' ;
2226import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants' ;
2327import { dispatchWorkFlow } from '@fastgpt/service/core/workflow/dispatch' ;
24- import { getUserChatInfo } from '@fastgpt/service/support/user/team/utils' ;
2528import { getRunningUserInfoByTmbId } from '@fastgpt/service/support/user/team/utils' ;
2629import { createChatUsageRecord } from '@fastgpt/service/support/wallet/usage/controller' ;
2730
@@ -30,19 +33,28 @@ export const getScheduleTriggerApp = async () => {
3033
3134 // 1. Find all the app
3235 const apps = await retryFn ( ( ) => {
33- return MongoApp . find ( {
34- scheduledTriggerConfig : { $exists : true } ,
35- scheduledTriggerNextTime : { $lte : new Date ( ) }
36- } ) ;
36+ return MongoApp . find (
37+ {
38+ scheduledTriggerConfig : { $exists : true } ,
39+ scheduledTriggerNextTime : { $lte : new Date ( ) }
40+ } ,
41+ {
42+ _id : 1 ,
43+ scheduledTriggerConfig : 1 ,
44+ scheduledTriggerNextTime : 1 ,
45+ name : 1 ,
46+ teamId : 1 ,
47+ tmbId : 1
48+ }
49+ ) . lean ( ) ;
3750 } ) ;
3851
3952 // 2. Run apps
40- await Promise . allSettled (
41- apps . map ( async ( app ) => {
53+ await batchRun (
54+ apps ,
55+ async ( app ) => {
4256 if ( ! app . scheduledTriggerConfig ) return ;
4357 const chatId = getNanoid ( ) ;
44- // random delay 0 ~ 60s
45- await delay ( Math . floor ( Math . random ( ) * 60 * 1000 ) ) ;
4658
4759 // Get app latest version
4860 const { versionId, nodes, edges, chatConfig } = await retryFn ( ( ) =>
@@ -52,7 +64,7 @@ export const getScheduleTriggerApp = async () => {
5264 {
5365 type : ChatItemValueTypeEnum . text ,
5466 text : {
55- content : app . scheduledTriggerConfig ? .defaultPrompt
67+ content : app . scheduledTriggerConfig . defaultPrompt || ''
5668 }
5769 }
5870 ] ;
@@ -67,8 +79,47 @@ export const getScheduleTriggerApp = async () => {
6779 } )
6880 ) ;
6981
82+ const onSave = async ( {
83+ error,
84+ durationSeconds = 0 ,
85+ assistantResponses = [ ] ,
86+ flowResponses = [ ] ,
87+ system_memories
88+ } : {
89+ error ?: any ;
90+ durationSeconds ?: number ;
91+ assistantResponses ?: AIChatItemValueItemType [ ] ;
92+ flowResponses ?: ChatHistoryItemResType [ ] ;
93+ system_memories ?: Record < string , any > ;
94+ } ) =>
95+ saveChat ( {
96+ chatId,
97+ appId : app . _id ,
98+ versionId,
99+ teamId : String ( app . teamId ) ,
100+ tmbId : String ( app . tmbId ) ,
101+ nodes,
102+ appChatConfig : chatConfig ,
103+ variables : { } ,
104+ isUpdateUseTime : false , // owner update use time
105+ newTitle : 'Cron Job' ,
106+ source : ChatSourceEnum . cronJob ,
107+ userContent : {
108+ obj : ChatRoleEnum . Human ,
109+ value : userQuery
110+ } ,
111+ aiContent : {
112+ obj : ChatRoleEnum . AI ,
113+ value : assistantResponses ,
114+ [ DispatchNodeResponseKeyEnum . nodeResponse ] : flowResponses ,
115+ memories : system_memories
116+ } ,
117+ durationSeconds,
118+ errorMsg : getErrText ( error )
119+ } ) ;
120+
70121 try {
71- const { flowUsages , assistantResponses, flowResponses, durationSeconds, system_memories } =
122+ const { assistantResponses, flowResponses, durationSeconds, system_memories } =
72123 await retryFn ( async ( ) => {
73124 return dispatchWorkFlow ( {
74125 chatId,
@@ -96,62 +147,29 @@ export const getScheduleTriggerApp = async () => {
96147 const error = flowResponses [ flowResponses . length - 1 ] ?. error ;
97148
98149 // Save chat
99- await saveChat ( {
100- chatId,
101- appId : app . _id ,
102- versionId,
103- teamId : String ( app . teamId ) ,
104- tmbId : String ( app . tmbId ) ,
105- nodes,
106- appChatConfig : chatConfig ,
107- variables : { } ,
108- isUpdateUseTime : false , // owner update use time
109- newTitle : 'Cron Job' ,
110- source : ChatSourceEnum . cronJob ,
111- userContent : {
112- obj : ChatRoleEnum . Human ,
113- value : userQuery
114- } ,
115- aiContent : {
116- obj : ChatRoleEnum . AI ,
117- value : assistantResponses ,
118- [ DispatchNodeResponseKeyEnum . nodeResponse ] : flowResponses ,
119- memories : system_memories
120- } ,
150+ await onSave ( {
151+ error,
121152 durationSeconds,
122- errorMsg : getErrText ( error )
153+ assistantResponses,
154+ flowResponses,
155+ system_memories
123156 } ) ;
124157 } catch ( error ) {
125- addLog . error ( 'Schedule trigger error' , error ) ;
158+ addLog . error ( '[ Schedule app] run error' , error ) ;
126159
127- await saveChat ( {
128- chatId,
129- appId : app . _id ,
130- teamId : String ( app . teamId ) ,
131- tmbId : String ( app . tmbId ) ,
132- nodes,
133- appChatConfig : chatConfig ,
134- variables : { } ,
135- isUpdateUseTime : false , // owner update use time
136- newTitle : 'Cron Job' ,
137- source : ChatSourceEnum . cronJob ,
138- userContent : {
139- obj : ChatRoleEnum . Human ,
140- value : userQuery
141- } ,
142- aiContent : {
143- obj : ChatRoleEnum . AI ,
144- value : [ ] ,
145- [ DispatchNodeResponseKeyEnum . nodeResponse ] : [ ]
146- } ,
147- durationSeconds : 0 ,
148- errorMsg : getErrText ( error )
160+ await onSave ( {
161+ error
162+ } ) . catch ( ) ;
163+ } finally {
164+ // update next time
165+ const nextTime = getNextTimeByCronStringAndTimezone ( app . scheduledTriggerConfig ) ;
166+ await retryFn ( ( ) =>
167+ MongoApp . updateOne ( { _id : app . _id } , { $set : { scheduledTriggerNextTime : nextTime } } )
168+ ) . catch ( ( err ) => {
169+ addLog . error ( `[Schedule app] error update next time` , err ) ;
149170 } ) ;
150171 }
151-
152- // update next time
153- app . scheduledTriggerNextTime = getNextTimeByCronStringAndTimezone ( app . scheduledTriggerConfig ) ;
154- await app . save ( ) . catch ( ) ;
155- } )
172+ } ,
173+ 50
156174 ) ;
157175} ;
0 commit comments