@@ -4,7 +4,7 @@ import { In } from 'typeorm'
4
4
import { ChatflowType , IReactFlowObject } from '../../Interface'
5
5
import { FLOWISE_COUNTER_STATUS , FLOWISE_METRIC_COUNTERS } from '../../Interface.Metrics'
6
6
import { UsageCacheManager } from '../../UsageCacheManager'
7
- import { ChatFlow } from '../../database/entities/ChatFlow'
7
+ import { ChatFlow , EnumChatflowType } from '../../database/entities/ChatFlow'
8
8
import { ChatMessage } from '../../database/entities/ChatMessage'
9
9
import { ChatMessageFeedback } from '../../database/entities/ChatMessageFeedback'
10
10
import { UpsertHistory } from '../../database/entities/UpsertHistory'
@@ -20,6 +20,15 @@ import { utilGetUploadsConfig } from '../../utils/getUploadsConfig'
20
20
import logger from '../../utils/logger'
21
21
import { updateStorageUsage } from '../../utils/quotaUsage'
22
22
23
+ export const enum ChatflowErrorMessage {
24
+ INVALID_CHATFLOW_TYPE = 'Invalid Chatflow Type'
25
+ }
26
+
27
+ export function validateChatflowType ( type : ChatflowType | undefined ) {
28
+ if ( ! Object . values ( EnumChatflowType ) . includes ( type as EnumChatflowType ) )
29
+ throw new InternalFlowiseError ( StatusCodes . BAD_REQUEST , ChatflowErrorMessage . INVALID_CHATFLOW_TYPE )
30
+ }
31
+
23
32
// Check if chatflow valid for streaming
24
33
const checkIfChatflowIsValidForStreaming = async ( chatflowId : string ) : Promise < any > => {
25
34
try {
@@ -254,57 +263,51 @@ const saveChatflow = async (
254
263
subscriptionId : string ,
255
264
usageCacheManager : UsageCacheManager
256
265
) : Promise < any > => {
257
- try {
258
- const appServer = getRunningExpressApp ( )
259
-
260
- let dbResponse : ChatFlow
261
- if ( containsBase64File ( newChatFlow ) ) {
262
- // we need a 2-step process, as we need to save the chatflow first and then update the file paths
263
- // this is because we need the chatflow id to create the file paths
264
-
265
- // step 1 - save with empty flowData
266
- const incomingFlowData = newChatFlow . flowData
267
- newChatFlow . flowData = JSON . stringify ( { } )
268
- const chatflow = appServer . AppDataSource . getRepository ( ChatFlow ) . create ( newChatFlow )
269
- const step1Results = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( chatflow )
270
-
271
- // step 2 - convert base64 to file paths and update the chatflow
272
- step1Results . flowData = await updateFlowDataWithFilePaths (
273
- step1Results . id ,
274
- incomingFlowData ,
275
- orgId ,
276
- workspaceId ,
277
- subscriptionId ,
278
- usageCacheManager
279
- )
280
- await _checkAndUpdateDocumentStoreUsage ( step1Results , newChatFlow . workspaceId )
281
- dbResponse = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( step1Results )
282
- } else {
283
- const chatflow = appServer . AppDataSource . getRepository ( ChatFlow ) . create ( newChatFlow )
284
- dbResponse = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( chatflow )
285
- }
286
- await appServer . telemetry . sendTelemetry (
287
- 'chatflow_created' ,
288
- {
289
- version : await getAppVersion ( ) ,
290
- chatflowId : dbResponse . id ,
291
- flowGraph : getTelemetryFlowObj ( JSON . parse ( dbResponse . flowData ) ?. nodes , JSON . parse ( dbResponse . flowData ) ?. edges )
292
- } ,
293
- orgId
294
- )
295
-
296
- appServer . metricsProvider ?. incrementCounter (
297
- dbResponse ?. type === 'MULTIAGENT' ? FLOWISE_METRIC_COUNTERS . AGENTFLOW_CREATED : FLOWISE_METRIC_COUNTERS . CHATFLOW_CREATED ,
298
- { status : FLOWISE_COUNTER_STATUS . SUCCESS }
299
- )
300
-
301
- return dbResponse
302
- } catch ( error ) {
303
- throw new InternalFlowiseError (
304
- StatusCodes . INTERNAL_SERVER_ERROR ,
305
- `Error: chatflowsService.saveChatflow - ${ getErrorMessage ( error ) } `
266
+ validateChatflowType ( newChatFlow . type )
267
+ const appServer = getRunningExpressApp ( )
268
+
269
+ let dbResponse : ChatFlow
270
+ if ( containsBase64File ( newChatFlow ) ) {
271
+ // we need a 2-step process, as we need to save the chatflow first and then update the file paths
272
+ // this is because we need the chatflow id to create the file paths
273
+
274
+ // step 1 - save with empty flowData
275
+ const incomingFlowData = newChatFlow . flowData
276
+ newChatFlow . flowData = JSON . stringify ( { } )
277
+ const chatflow = appServer . AppDataSource . getRepository ( ChatFlow ) . create ( newChatFlow )
278
+ const step1Results = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( chatflow )
279
+
280
+ // step 2 - convert base64 to file paths and update the chatflow
281
+ step1Results . flowData = await updateFlowDataWithFilePaths (
282
+ step1Results . id ,
283
+ incomingFlowData ,
284
+ orgId ,
285
+ workspaceId ,
286
+ subscriptionId ,
287
+ usageCacheManager
306
288
)
289
+ await _checkAndUpdateDocumentStoreUsage ( step1Results , newChatFlow . workspaceId )
290
+ dbResponse = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( step1Results )
291
+ } else {
292
+ const chatflow = appServer . AppDataSource . getRepository ( ChatFlow ) . create ( newChatFlow )
293
+ dbResponse = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( chatflow )
307
294
}
295
+ await appServer . telemetry . sendTelemetry (
296
+ 'chatflow_created' ,
297
+ {
298
+ version : await getAppVersion ( ) ,
299
+ chatflowId : dbResponse . id ,
300
+ flowGraph : getTelemetryFlowObj ( JSON . parse ( dbResponse . flowData ) ?. nodes , JSON . parse ( dbResponse . flowData ) ?. edges )
301
+ } ,
302
+ orgId
303
+ )
304
+
305
+ appServer . metricsProvider ?. incrementCounter (
306
+ dbResponse ?. type === 'MULTIAGENT' ? FLOWISE_METRIC_COUNTERS . AGENTFLOW_CREATED : FLOWISE_METRIC_COUNTERS . CHATFLOW_CREATED ,
307
+ { status : FLOWISE_COUNTER_STATUS . SUCCESS }
308
+ )
309
+
310
+ return dbResponse
308
311
}
309
312
310
313
const updateChatflow = async (
@@ -314,29 +317,27 @@ const updateChatflow = async (
314
317
workspaceId : string ,
315
318
subscriptionId : string
316
319
) : Promise < any > => {
317
- try {
318
- const appServer = getRunningExpressApp ( )
319
- if ( updateChatFlow . flowData && containsBase64File ( updateChatFlow ) ) {
320
- updateChatFlow . flowData = await updateFlowDataWithFilePaths (
321
- chatflow . id ,
322
- updateChatFlow . flowData ,
323
- orgId ,
324
- workspaceId ,
325
- subscriptionId ,
326
- appServer . usageCacheManager
327
- )
328
- }
329
- const newDbChatflow = appServer . AppDataSource . getRepository ( ChatFlow ) . merge ( chatflow , updateChatFlow )
330
- await _checkAndUpdateDocumentStoreUsage ( newDbChatflow , chatflow . workspaceId )
331
- const dbResponse = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( newDbChatflow )
332
-
333
- return dbResponse
334
- } catch ( error ) {
335
- throw new InternalFlowiseError (
336
- StatusCodes . INTERNAL_SERVER_ERROR ,
337
- `Error: chatflowsService.updateChatflow - ${ getErrorMessage ( error ) } `
320
+ const appServer = getRunningExpressApp ( )
321
+ if ( updateChatFlow . flowData && containsBase64File ( updateChatFlow ) ) {
322
+ updateChatFlow . flowData = await updateFlowDataWithFilePaths (
323
+ chatflow . id ,
324
+ updateChatFlow . flowData ,
325
+ orgId ,
326
+ workspaceId ,
327
+ subscriptionId ,
328
+ appServer . usageCacheManager
338
329
)
339
330
}
331
+ if ( updateChatFlow . type || updateChatFlow . type === '' ) {
332
+ validateChatflowType ( updateChatFlow . type )
333
+ } else {
334
+ updateChatFlow . type = chatflow . type
335
+ }
336
+ const newDbChatflow = appServer . AppDataSource . getRepository ( ChatFlow ) . merge ( chatflow , updateChatFlow )
337
+ await _checkAndUpdateDocumentStoreUsage ( newDbChatflow , chatflow . workspaceId )
338
+ const dbResponse = await appServer . AppDataSource . getRepository ( ChatFlow ) . save ( newDbChatflow )
339
+
340
+ return dbResponse
340
341
}
341
342
342
343
// Get specific chatflow chatbotConfig via id (PUBLIC endpoint, used to retrieve config for embedded chat)
0 commit comments