-
-
Notifications
You must be signed in to change notification settings - Fork 23.5k
Add support for azure blob storage #5604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,6 +8,7 @@ import { | |||||||||||
| S3ClientConfig | ||||||||||||
| } from '@aws-sdk/client-s3' | ||||||||||||
| import { Storage } from '@google-cloud/storage' | ||||||||||||
| import { BlobServiceClient, StorageSharedKeyCredential } from '@azure/storage-blob' | ||||||||||||
| import fs from 'fs' | ||||||||||||
| import { Readable } from 'node:stream' | ||||||||||||
| import path from 'path' | ||||||||||||
|
|
@@ -96,6 +97,26 @@ export const addBase64FilesToStorage = async ( | |||||||||||
| fileNames.push(sanitizedFilename) | ||||||||||||
| const totalSize = await getGCSStorageSize(orgId) | ||||||||||||
|
|
||||||||||||
| return { path: 'FILE-STORAGE::' + JSON.stringify(fileNames), totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| const splitDataURI = fileBase64.split(',') | ||||||||||||
| const filename = splitDataURI.pop()?.split(':')[1] ?? '' | ||||||||||||
| const bf = Buffer.from(splitDataURI.pop() || '', 'base64') | ||||||||||||
| const mime = splitDataURI[0].split(':')[1].split(';')[0] | ||||||||||||
|
|
||||||||||||
| const sanitizedFilename = _sanitizeFilename(filename) | ||||||||||||
| const blobName = `${orgId}/${chatflowid}/${sanitizedFilename}` | ||||||||||||
|
|
||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| await blockBlobClient.upload(bf, bf.length, { | ||||||||||||
| blobHTTPHeaders: { blobContentType: mime } | ||||||||||||
| }) | ||||||||||||
|
|
||||||||||||
| fileNames.push(sanitizedFilename) | ||||||||||||
| const totalSize = await getAzureBlobStorageSize(orgId) | ||||||||||||
|
|
||||||||||||
| return { path: 'FILE-STORAGE::' + JSON.stringify(fileNames), totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else { | ||||||||||||
| const dir = path.join(getStoragePath(), orgId, chatflowid) | ||||||||||||
|
|
@@ -165,6 +186,23 @@ export const addArrayFilesToStorage = async ( | |||||||||||
|
|
||||||||||||
| const totalSize = await getGCSStorageSize(paths[0]) | ||||||||||||
|
|
||||||||||||
| return { path: 'FILE-STORAGE::' + JSON.stringify(fileNames), totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let blobName = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + sanitizedFilename | ||||||||||||
| if (blobName.startsWith('/')) { | ||||||||||||
| blobName = blobName.substring(1) | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+193
to
+196
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This path construction logic is duplicated across several functions ( Using
Suggested change
|
||||||||||||
|
|
||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| await blockBlobClient.upload(bf, bf.length, { | ||||||||||||
| blobHTTPHeaders: { blobContentType: mime } | ||||||||||||
| }) | ||||||||||||
| fileNames.push(sanitizedFilename) | ||||||||||||
|
|
||||||||||||
| const totalSize = await getAzureBlobStorageSize(paths[0]) | ||||||||||||
|
|
||||||||||||
| return { path: 'FILE-STORAGE::' + JSON.stringify(fileNames), totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else { | ||||||||||||
| const dir = path.join(getStoragePath(), ...paths.map(_sanitizeFilename)) | ||||||||||||
|
|
@@ -225,6 +263,22 @@ export const addSingleFileToStorage = async ( | |||||||||||
|
|
||||||||||||
| const totalSize = await getGCSStorageSize(paths[0]) | ||||||||||||
|
|
||||||||||||
| return { path: 'FILE-STORAGE::' + sanitizedFilename, totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let blobName = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + sanitizedFilename | ||||||||||||
| if (blobName.startsWith('/')) { | ||||||||||||
| blobName = blobName.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| await blockBlobClient.upload(bf, bf.length, { | ||||||||||||
| blobHTTPHeaders: { blobContentType: mime } | ||||||||||||
| }) | ||||||||||||
|
|
||||||||||||
| const totalSize = await getAzureBlobStorageSize(paths[0]) | ||||||||||||
|
|
||||||||||||
| return { path: 'FILE-STORAGE::' + sanitizedFilename, totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else { | ||||||||||||
| const dir = path.join(getStoragePath(), ...paths.map(_sanitizeFilename)) | ||||||||||||
|
|
@@ -270,6 +324,18 @@ export const getFileFromUpload = async (filePath: string): Promise<Buffer> => { | |||||||||||
| const file = bucket.file(filePath) | ||||||||||||
| const [buffer] = await file.download() | ||||||||||||
| return buffer | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let blobName = filePath | ||||||||||||
| // remove the first '/' if it exists | ||||||||||||
| if (blobName.startsWith('/')) { | ||||||||||||
| blobName = blobName.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| const downloadResponse = await blockBlobClient.downloadToBuffer() | ||||||||||||
| return downloadResponse | ||||||||||||
| } else { | ||||||||||||
| return fs.readFileSync(filePath) | ||||||||||||
| } | ||||||||||||
|
|
@@ -415,6 +481,47 @@ export const getFileFromStorage = async (file: string, ...paths: string[]): Prom | |||||||||||
| throw error | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let blobName = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + sanitizedFilename | ||||||||||||
| if (blobName.startsWith('/')) { | ||||||||||||
| blobName = blobName.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| try { | ||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| const buffer = await blockBlobClient.downloadToBuffer() | ||||||||||||
| return buffer | ||||||||||||
| } catch (error) { | ||||||||||||
| // Fallback: Check if file exists without the first path element (likely orgId) | ||||||||||||
| if (paths.length > 1) { | ||||||||||||
| const fallbackPaths = paths.slice(1) | ||||||||||||
| let fallbackBlobName = fallbackPaths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + sanitizedFilename | ||||||||||||
| if (fallbackBlobName.startsWith('/')) { | ||||||||||||
| fallbackBlobName = fallbackBlobName.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| try { | ||||||||||||
| const fallbackBlobClient = containerClient.getBlockBlobClient(fallbackBlobName) | ||||||||||||
| const buffer = await fallbackBlobClient.downloadToBuffer() | ||||||||||||
|
|
||||||||||||
| // Move to correct location with orgId | ||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| await blockBlobClient.upload(buffer, buffer.length) | ||||||||||||
|
|
||||||||||||
| // Delete the old file | ||||||||||||
| await fallbackBlobClient.delete() | ||||||||||||
|
|
||||||||||||
| return buffer | ||||||||||||
| } catch (fallbackError) { | ||||||||||||
| // Throw the original error since the fallback also failed | ||||||||||||
| throw error | ||||||||||||
| } | ||||||||||||
| } else { | ||||||||||||
| throw error | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+492
to
+524
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fallback logic to handle files stored without an Consider refactoring this into a higher-level function or using a strategy pattern where each storage provider implements a common interface for A simplified abstraction could look like this: async function getFileWithFallback(primaryPath, fallbackPath, storage) {
try {
return await storage.download(primaryPath);
} catch (error) {
if (fallbackPath) {
try {
const buffer = await storage.download(fallbackPath);
await storage.upload(primaryPath, buffer);
await storage.delete(fallbackPath);
return buffer;
} catch (fallbackError) {
throw error; // Throw original error
}
}
throw error;
}
}This logic is also duplicated in |
||||||||||||
| } else { | ||||||||||||
| try { | ||||||||||||
| const fileInStorage = path.join(getStoragePath(), ...paths.map(_sanitizeFilename), sanitizedFilename) | ||||||||||||
|
|
@@ -480,6 +587,23 @@ export const getFilesListFromStorage = async (...paths: string[]): Promise<Array | |||||||||||
| } else { | ||||||||||||
| return [] | ||||||||||||
| } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let prefix = paths.reduce((acc, cur) => acc + '/' + cur, '') | ||||||||||||
| if (prefix.startsWith('/')) { | ||||||||||||
| prefix = prefix.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const filesList: Array<{ name: string; path: string; size: number }> = [] | ||||||||||||
| for await (const blob of containerClient.listBlobsFlat({ prefix })) { | ||||||||||||
| filesList.push({ | ||||||||||||
| name: blob.name.split('/').pop() || '', | ||||||||||||
| path: blob.name, | ||||||||||||
| size: blob.properties.contentLength || 0 | ||||||||||||
| }) | ||||||||||||
| } | ||||||||||||
| return filesList | ||||||||||||
| } else { | ||||||||||||
| const directory = path.join(getStoragePath(), ...paths) | ||||||||||||
| const filesList = getFilePaths(directory) | ||||||||||||
|
|
@@ -568,6 +692,21 @@ export const removeFilesFromStorage = async (...paths: string[]) => { | |||||||||||
|
|
||||||||||||
| const totalSize = await getGCSStorageSize(paths[0]) | ||||||||||||
| return { totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let prefix = paths.reduce((acc, cur) => acc + '/' + cur, '') | ||||||||||||
| if (prefix.startsWith('/')) { | ||||||||||||
| prefix = prefix.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Delete all blobs with the prefix | ||||||||||||
| for await (const blob of containerClient.listBlobsFlat({ prefix })) { | ||||||||||||
| await containerClient.deleteBlob(blob.name) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const totalSize = await getAzureBlobStorageSize(paths[0]) | ||||||||||||
| return { totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else { | ||||||||||||
| const directory = path.join(getStoragePath(), ...paths.map(_sanitizeFilename)) | ||||||||||||
| await _deleteLocalFolderRecursive(directory) | ||||||||||||
|
|
@@ -590,6 +729,15 @@ export const removeSpecificFileFromUpload = async (filePath: string) => { | |||||||||||
| } else if (storageType === 'gcs') { | ||||||||||||
| const { bucket } = getGcsClient() | ||||||||||||
| await bucket.file(filePath).delete() | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let blobName = filePath | ||||||||||||
| if (blobName.startsWith('/')) { | ||||||||||||
| blobName = blobName.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| await containerClient.deleteBlob(blobName) | ||||||||||||
| } else { | ||||||||||||
| fs.unlinkSync(filePath) | ||||||||||||
| } | ||||||||||||
|
|
@@ -620,6 +768,22 @@ export const removeSpecificFileFromStorage = async (...paths: string[]) => { | |||||||||||
|
|
||||||||||||
| const totalSize = await getGCSStorageSize(paths[0]) | ||||||||||||
| return { totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let blobName = paths.reduce((acc, cur) => acc + '/' + cur, '') | ||||||||||||
| if (blobName.startsWith('/')) { | ||||||||||||
| blobName = blobName.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Check if blob exists before deleting | ||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| if (await blockBlobClient.exists()) { | ||||||||||||
| await blockBlobClient.delete() | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const totalSize = await getAzureBlobStorageSize(paths[0]) | ||||||||||||
| return { totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else { | ||||||||||||
| const fileName = paths.pop() | ||||||||||||
| if (fileName) { | ||||||||||||
|
|
@@ -659,6 +823,21 @@ export const removeFolderFromStorage = async (...paths: string[]) => { | |||||||||||
|
|
||||||||||||
| const totalSize = await getGCSStorageSize(paths[0]) | ||||||||||||
| return { totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
|
|
||||||||||||
| let prefix = paths.reduce((acc, cur) => acc + '/' + cur, '') | ||||||||||||
| if (prefix.startsWith('/')) { | ||||||||||||
| prefix = prefix.substring(1) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Delete all blobs with the prefix | ||||||||||||
| for await (const blob of containerClient.listBlobsFlat({ prefix })) { | ||||||||||||
| await containerClient.deleteBlob(blob.name) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const totalSize = await getAzureBlobStorageSize(paths[0]) | ||||||||||||
| return { totalSize: totalSize / 1024 / 1024 } | ||||||||||||
| } else { | ||||||||||||
|
Comment on lines
+826
to
841
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation for |
||||||||||||
| const directory = path.join(getStoragePath(), ...paths.map(_sanitizeFilename)) | ||||||||||||
| await _deleteLocalFolderRecursive(directory, true) | ||||||||||||
|
|
@@ -866,6 +1045,34 @@ export const streamStorageFile = async ( | |||||||||||
| throw new Error(`File ${fileName} not found`) | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } else if (storageType === 'azure') { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
| const blobName = `${orgId}/${chatflowId}/${chatId}/${sanitizedFilename}` | ||||||||||||
|
|
||||||||||||
| try { | ||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| const buffer = await blockBlobClient.downloadToBuffer() | ||||||||||||
| return buffer | ||||||||||||
| } catch (error) { | ||||||||||||
| // Fallback: Check if file exists without orgId | ||||||||||||
| const fallbackBlobName = `${chatflowId}/${chatId}/${sanitizedFilename}` | ||||||||||||
| try { | ||||||||||||
| const fallbackBlobClient = containerClient.getBlockBlobClient(fallbackBlobName) | ||||||||||||
| const buffer = await fallbackBlobClient.downloadToBuffer() | ||||||||||||
|
|
||||||||||||
| // Move to correct location with orgId | ||||||||||||
| const blockBlobClient = containerClient.getBlockBlobClient(blobName) | ||||||||||||
| await blockBlobClient.upload(buffer, buffer.length) | ||||||||||||
|
|
||||||||||||
| // Delete the old file | ||||||||||||
| await fallbackBlobClient.delete() | ||||||||||||
|
|
||||||||||||
| return buffer | ||||||||||||
| } catch (fallbackError) { | ||||||||||||
| // File not found in fallback location either | ||||||||||||
| throw new Error(`File ${fileName} not found`) | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } else { | ||||||||||||
| const filePath = path.join(getStoragePath(), orgId, chatflowId, chatId, sanitizedFilename) | ||||||||||||
| //raise error if file path is not absolute | ||||||||||||
|
|
@@ -1098,6 +1305,45 @@ export const getS3Config = () => { | |||||||||||
| return { s3Client, Bucket } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| export const getAzureBlobStorageSize = async (orgId: string): Promise<number> => { | ||||||||||||
| const { containerClient } = getAzureBlobConfig() | ||||||||||||
| let totalSize = 0 | ||||||||||||
|
|
||||||||||||
| for await (const blob of containerClient.listBlobsFlat({ prefix: orgId })) { | ||||||||||||
| totalSize += blob.properties.contentLength || 0 | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| return totalSize | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| export const getAzureBlobConfig = () => { | ||||||||||||
| const connectionString = process.env.AZURE_BLOB_STORAGE_CONNECTION_STRING | ||||||||||||
| const accountName = process.env.AZURE_BLOB_STORAGE_ACCOUNT_NAME | ||||||||||||
| const accountKey = process.env.AZURE_BLOB_STORAGE_ACCOUNT_KEY | ||||||||||||
| const containerName = process.env.AZURE_BLOB_STORAGE_CONTAINER_NAME | ||||||||||||
|
|
||||||||||||
| if (!containerName || containerName.trim() === '') { | ||||||||||||
| throw new Error('AZURE_BLOB_STORAGE_CONTAINER_NAME env variable is required') | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| let blobServiceClient: BlobServiceClient | ||||||||||||
|
|
||||||||||||
| // Authenticate either using connection string or account name and key | ||||||||||||
| if (connectionString && connectionString.trim() !== '') { | ||||||||||||
| blobServiceClient = BlobServiceClient.fromConnectionString(connectionString) | ||||||||||||
| } else if (accountName && accountName.trim() !== '' && accountKey && accountKey.trim() !== '') { | ||||||||||||
| const sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey) | ||||||||||||
| blobServiceClient = new BlobServiceClient(`https://${accountName}.blob.core.windows.net`, sharedKeyCredential) | ||||||||||||
| } else { | ||||||||||||
| throw new Error( | ||||||||||||
| 'Azure Blob Storage configuration is missing. Provide AZURE_BLOB_STORAGE_CONNECTION_STRING or AZURE_BLOB_STORAGE_ACCOUNT_NAME + AZURE_BLOB_STORAGE_ACCOUNT_KEY' | ||||||||||||
| ) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const containerClient = blobServiceClient.getContainerClient(containerName) | ||||||||||||
| return { blobServiceClient, containerClient, containerName } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const _sanitizeFilename = (filename: string): string => { | ||||||||||||
| if (filename) { | ||||||||||||
| let sanitizedFilename = sanitize(filename) | ||||||||||||
|
|
||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for parsing a base64 data URI is duplicated across the
s3,gcs, and nowazurebranches of this function. To improve maintainability and reduce redundancy, this logic should be extracted into a separate helper function.For example:
This would make
addBase64FilesToStoragecleaner and easier to read.