Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions packages/components/nodes/documentloaders/Github/Github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import { omit } from 'lodash'
import { ICommonObject, IDocument, INode, INodeData, INodeParams } from '../../../src/Interface'
import { TextSplitter } from 'langchain/text_splitter'
import { GithubRepoLoader, GithubRepoLoaderParams } from '@langchain/community/document_loaders/web/github'
import { getCredentialData, getCredentialParam, handleEscapeCharacters, INodeOutputsValue } from '../../../src'
import { getCredentialData, getCredentialParam, handleEscapeCharacters, IDatabaseEntity, INodeOutputsValue } from '../../../src'
import { v4 as uuidv4 } from 'uuid'

import { DataSource } from 'typeorm'

class Github_DocumentLoaders implements INode {
label: string
Expand Down Expand Up @@ -61,6 +64,14 @@ class Github_DocumentLoaders implements INode {
optional: true,
additionalParams: true
},
{
label: 'Stream',
name: 'stream',
type: 'boolean',
optional: true,
default: false,
description: 'Process documents as they are loaded'
},
{
label: 'Github Base URL',
name: 'githubBaseUrl',
Expand Down Expand Up @@ -154,6 +165,7 @@ class Github_DocumentLoaders implements INode {
const output = nodeData.outputs?.output as string
const githubInstanceApi = nodeData.inputs?.githubInstanceApi as string
const githubBaseUrl = nodeData.inputs?.githubBaseUrl as string
const streamMode = nodeData.inputs?.stream as boolean

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
Expand Down Expand Up @@ -181,9 +193,59 @@ class Github_DocumentLoaders implements INode {
}

const loader = new GithubRepoLoader(repoLink, githubOptions)

let docs: IDocument[] = []

if (streamMode) {
// Load the external entities as stream
const dataSource: DataSource = options.appDataSource
const databaseEntities = options.databaseEntities as IDatabaseEntity
const loaderId = options.loaderId as string
const storeId = options.storeId as string

let seq = 0
let totalChars = 0

for await (const fileDoc of (loader as GithubRepoLoader).loadAsStream()) {
const doc = textSplitter ? await (textSplitter as TextSplitter).splitDocuments([fileDoc]) : [fileDoc]
if (options.preview) {
// As we are in preview mode, just return the data don't save
docs.push(...doc)
return docs
}

for (const chunk of doc) {
seq += 1
const entity = dataSource.getRepository(databaseEntities['DocumentStoreFileChunk']).create({
docId: loaderId as string,
storeId: storeId as string,
id: uuidv4(),
chunkNo: seq,
pageContent: this.sanitizeChunkContent(chunk.pageContent ?? ''),
metadata: JSON.stringify(chunk.metadata)
})
try {
await dataSource.getRepository(databaseEntities['DocumentStoreFileChunk']).save(entity)
} catch (err) {
options.logger.error(`Error streaming chunk to DB: ${err instanceof Error ? err.message : String(err)}`)
throw new Error(`Error streaming chunk to DB: ${err instanceof Error ? err.message : String(err)}`)
}

totalChars += entity.pageContent.length
}
}

options.logger.info(`Streaming loader ${loaderId} total chunks: ${seq} total characters: ${totalChars}`)

return [
{
metadata: {
totalChunks: seq,
totalChars
}
}
]
}

if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
Expand Down Expand Up @@ -233,6 +295,12 @@ class Github_DocumentLoaders implements INode {
return handleEscapeCharacters(finaltext, false)
}
}

// remove null bytes from chunk content
sanitizeChunkContent = (content: string) => {
// eslint-disable-next-line no-control-regex
return content.replaceAll(/\u0000/g, '')
}
}

module.exports = { nodeClass: Github_DocumentLoaders }
105 changes: 93 additions & 12 deletions packages/server/src/services/documentstore/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,10 @@ const _splitIntoChunks = async (appDataSource: DataSource, componentNodes: IComp
chatflowid: uuidv4(),
appDataSource,
databaseEntities,
logger
logger,
preview: data.preview || false,
storeId: data.storeId,
loaderId: data.id
}
const docNodeInstance = new nodeModule.nodeClass()
let docs: IDocument[] = await docNodeInstance.init(nodeData, '', options)
Expand Down Expand Up @@ -821,17 +824,31 @@ export const processLoader = async ({
throw new Error('Unauthorized access')
}
}
await _saveChunksToStorage(
appDataSource,
componentNodes,
data,
entity,
docLoaderId,
orgId,
workspaceId,
subscriptionId,
usageCacheManager
)

// Check for existing loader and if stream is enabled
if (entity.loaders && entity.loaders.length > 0) {
const existingLoaders = JSON.parse(entity.loaders)
const existingLoader = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === docLoaderId)

// Check if the loader is configured for streaming
if (existingLoader && existingLoader.loaderConfig?.stream) {
// Handle streaming
await _streamChunksToStorage(appDataSource, componentNodes, data, entity, docLoaderId)
}
} else {
await _saveChunksToStorage(
appDataSource,
componentNodes,
data,
entity,
docLoaderId,
orgId,
workspaceId,
subscriptionId,
usageCacheManager
)
}

return getDocumentStoreFileChunks(appDataSource, data.storeId as string, docLoaderId)
}

Expand Down Expand Up @@ -1046,6 +1063,70 @@ const _saveChunksToStorage = async (
}
}

const _streamChunksToStorage = async (
appDataSource: DataSource,
componentNodes: IComponentNodes,
data: IDocumentStoreLoaderForPreview,
entity: DocumentStore,
newLoaderId: string
) => {
try {
// Step 1: remove all previous chunks for the loader
await appDataSource.getRepository(DocumentStoreFileChunk).delete({ docId: newLoaderId })

// Step 2: set the loader to SYNCING status
await updateDocumentStoreLoaderStatus(entity, newLoaderId, DocumentStoreStatus.SYNCING)

// Step 3: process the loaders data in streaming mode
const response = await _splitIntoChunks(appDataSource, componentNodes, data)

// Step 4: update the loaders with the new loaderConfig
const totalChunks = response[0].metadata?.totalChunks || 0
const totalChars = response[0].metadata?.totalChars || 0
await updateDocumentStoreLoaderStatus(entity, newLoaderId, DocumentStoreStatus.SYNC, totalChunks, totalChars)

return
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices._streamChunksToStorage - ${getErrorMessage(error)}`
)
}
}

const updateDocumentStoreLoaderStatus = async (
entity: DocumentStore,
loaderId: string,
status: DocumentStoreStatus,
totalChunks = 0,
totalChars = 0
) => {
try {
const existingLoaders = JSON.parse(entity.loaders)
const loader = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === loaderId)
if (!loader) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.updateDocumentStoreLoaderStatus - Loader ${loaderId} not found`
)
}
loader.status = status
if (totalChunks) {
loader.totalChunks = totalChunks
}
if (totalChars) {
loader.totalChars = totalChars
}
entity.loaders = JSON.stringify(existingLoaders)
await getRunningExpressApp().AppDataSource.getRepository(DocumentStore).save(entity)
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.updateDocumentStoreLoaderStatus - ${getErrorMessage(error)}`
)
}
}

// remove null bytes from chunk content
const sanitizeChunkContent = (content: string) => {
// eslint-disable-next-line no-control-regex
Expand Down