diff --git a/packages/components/nodes/documentloaders/Github/Github.ts b/packages/components/nodes/documentloaders/Github/Github.ts index 3edef63f342..85744e477e6 100644 --- a/packages/components/nodes/documentloaders/Github/Github.ts +++ b/packages/components/nodes/documentloaders/Github/Github.ts @@ -2,7 +2,10 @@ import { omit } from 'lodash' import { ICommonObject, IDocument, INode, INodeData, INodeParams } from '../../../src/Interface' import { TextSplitter } from 'langchain/text_splitter' import { GithubRepoLoader, GithubRepoLoaderParams } from '@langchain/community/document_loaders/web/github' -import { getCredentialData, getCredentialParam, handleEscapeCharacters, INodeOutputsValue } from '../../../src' +import { getCredentialData, getCredentialParam, handleEscapeCharacters, IDatabaseEntity, INodeOutputsValue } from '../../../src' +import { v4 as uuidv4 } from 'uuid' + +import { DataSource } from 'typeorm' class Github_DocumentLoaders implements INode { label: string @@ -61,6 +64,14 @@ class Github_DocumentLoaders implements INode { optional: true, additionalParams: true }, + { + label: 'Stream', + name: 'stream', + type: 'boolean', + optional: true, + default: false, + description: 'Process documents as they are loaded' + }, { label: 'Github Base URL', name: 'githubBaseUrl', @@ -154,6 +165,7 @@ class Github_DocumentLoaders implements INode { const output = nodeData.outputs?.output as string const githubInstanceApi = nodeData.inputs?.githubInstanceApi as string const githubBaseUrl = nodeData.inputs?.githubBaseUrl as string + const streamMode = nodeData.inputs?.stream as boolean let omitMetadataKeys: string[] = [] if (_omitMetadataKeys) { @@ -181,9 +193,59 @@ class Github_DocumentLoaders implements INode { } const loader = new GithubRepoLoader(repoLink, githubOptions) - let docs: IDocument[] = [] + if (streamMode) { + // Load the external entities as stream + const dataSource: DataSource = options.appDataSource + const databaseEntities = options.databaseEntities as IDatabaseEntity + const loaderId = options.loaderId as string + const storeId = options.storeId as string + + let seq = 0 + let totalChars = 0 + + for await (const fileDoc of (loader as GithubRepoLoader).loadAsStream()) { + const doc = textSplitter ? await (textSplitter as TextSplitter).splitDocuments([fileDoc]) : [fileDoc] + if (options.preview) { + // As we are in preview mode, just return the data don't save + docs.push(...doc) + return docs + } + + for (const chunk of doc) { + seq += 1 + const entity = dataSource.getRepository(databaseEntities['DocumentStoreFileChunk']).create({ + docId: loaderId as string, + storeId: storeId as string, + id: uuidv4(), + chunkNo: seq, + pageContent: this.sanitizeChunkContent(chunk.pageContent ?? ''), + metadata: JSON.stringify(chunk.metadata) + }) + try { + await dataSource.getRepository(databaseEntities['DocumentStoreFileChunk']).save(entity) + } catch (err) { + options.logger.error(`Error streaming chunk to DB: ${err instanceof Error ? err.message : String(err)}`) + throw new Error(`Error streaming chunk to DB: ${err instanceof Error ? err.message : String(err)}`) + } + + totalChars += entity.pageContent.length + } + } + + options.logger.info(`Streaming loader ${loaderId} total chunks: ${seq} total characters: ${totalChars}`) + + return [ + { + metadata: { + totalChunks: seq, + totalChars + } + } + ] + } + if (textSplitter) { docs = await loader.load() docs = await textSplitter.splitDocuments(docs) @@ -233,6 +295,12 @@ class Github_DocumentLoaders implements INode { return handleEscapeCharacters(finaltext, false) } } + + // remove null bytes from chunk content + sanitizeChunkContent = (content: string) => { + // eslint-disable-next-line no-control-regex + return content.replaceAll(/\u0000/g, '') + } } module.exports = { nodeClass: Github_DocumentLoaders } diff --git a/packages/server/src/services/documentstore/index.ts b/packages/server/src/services/documentstore/index.ts index a9a12fd47ce..8f6b6c5e32b 100644 --- a/packages/server/src/services/documentstore/index.ts +++ b/packages/server/src/services/documentstore/index.ts @@ -573,7 +573,10 @@ const _splitIntoChunks = async (appDataSource: DataSource, componentNodes: IComp chatflowid: uuidv4(), appDataSource, databaseEntities, - logger + logger, + preview: data.preview || false, + storeId: data.storeId, + loaderId: data.id } const docNodeInstance = new nodeModule.nodeClass() let docs: IDocument[] = await docNodeInstance.init(nodeData, '', options) @@ -821,17 +824,31 @@ export const processLoader = async ({ throw new Error('Unauthorized access') } } - await _saveChunksToStorage( - appDataSource, - componentNodes, - data, - entity, - docLoaderId, - orgId, - workspaceId, - subscriptionId, - usageCacheManager - ) + + // Check for existing loader and if stream is enabled + if (entity.loaders && entity.loaders.length > 0) { + const existingLoaders = JSON.parse(entity.loaders) + const existingLoader = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === docLoaderId) + + // Check if the loader is configured for streaming + if (existingLoader && existingLoader.loaderConfig?.stream) { + // Handle streaming + await _streamChunksToStorage(appDataSource, componentNodes, data, entity, docLoaderId) + } + } else { + await _saveChunksToStorage( + appDataSource, + componentNodes, + data, + entity, + docLoaderId, + orgId, + workspaceId, + subscriptionId, + usageCacheManager + ) + } + return getDocumentStoreFileChunks(appDataSource, data.storeId as string, docLoaderId) } @@ -1046,6 +1063,70 @@ const _saveChunksToStorage = async ( } } +const _streamChunksToStorage = async ( + appDataSource: DataSource, + componentNodes: IComponentNodes, + data: IDocumentStoreLoaderForPreview, + entity: DocumentStore, + newLoaderId: string +) => { + try { + // Step 1: remove all previous chunks for the loader + await appDataSource.getRepository(DocumentStoreFileChunk).delete({ docId: newLoaderId }) + + // Step 2: set the loader to SYNCING status + await updateDocumentStoreLoaderStatus(entity, newLoaderId, DocumentStoreStatus.SYNCING) + + // Step 3: process the loaders data in streaming mode + const response = await _splitIntoChunks(appDataSource, componentNodes, data) + + // Step 4: update the loaders with the new loaderConfig + const totalChunks = response[0].metadata?.totalChunks || 0 + const totalChars = response[0].metadata?.totalChars || 0 + await updateDocumentStoreLoaderStatus(entity, newLoaderId, DocumentStoreStatus.SYNC, totalChunks, totalChars) + + return + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: documentStoreServices._streamChunksToStorage - ${getErrorMessage(error)}` + ) + } +} + +const updateDocumentStoreLoaderStatus = async ( + entity: DocumentStore, + loaderId: string, + status: DocumentStoreStatus, + totalChunks = 0, + totalChars = 0 +) => { + try { + const existingLoaders = JSON.parse(entity.loaders) + const loader = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === loaderId) + if (!loader) { + throw new InternalFlowiseError( + StatusCodes.NOT_FOUND, + `Error: documentStoreServices.updateDocumentStoreLoaderStatus - Loader ${loaderId} not found` + ) + } + loader.status = status + if (totalChunks) { + loader.totalChunks = totalChunks + } + if (totalChars) { + loader.totalChars = totalChars + } + entity.loaders = JSON.stringify(existingLoaders) + await getRunningExpressApp().AppDataSource.getRepository(DocumentStore).save(entity) + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: documentStoreServices.updateDocumentStoreLoaderStatus - ${getErrorMessage(error)}` + ) + } +} + // remove null bytes from chunk content const sanitizeChunkContent = (content: string) => { // eslint-disable-next-line no-control-regex