-
Notifications
You must be signed in to change notification settings - Fork 6.7k
Feat/minio #5748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat/minio #5748
Changes from 6 commits
295e6d2
cfd01e9
7f58786
892b7e1
c7d3cc8
34cdcbd
fe3ce1d
9dbb270
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
import type { TeamCollectionName } from '../../../support/user/team/constant'; | ||
|
||
export type MinioTtlSchemaType = { | ||
_id: string; | ||
bucketName: string; | ||
minioKey: string; | ||
expiredTime?: Date; | ||
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import { MongoMinioTtl } from './schema'; | ||
import { S3BucketManager } from '../../s3/buckets/manager'; | ||
import { addLog } from '../../system/log'; | ||
import { setCron } from '../../system/cron'; | ||
import { checkTimerLock } from '../../system/timerLock/utils'; | ||
import { TimerIdEnum } from '../../system/timerLock/constants'; | ||
|
||
export async function clearExpiredMinioFiles() { | ||
try { | ||
const now = new Date(); | ||
|
||
const expiredFiles = await MongoMinioTtl.find({ | ||
expiredTime: { $exists: true, $ne: null, $lte: now } | ||
}).lean(); | ||
|
||
if (expiredFiles.length === 0) { | ||
addLog.info('No expired minio files to clean'); | ||
return; | ||
} | ||
|
||
addLog.info(`Found ${expiredFiles.length} expired minio files to clean`); | ||
|
||
const s3Manager = S3BucketManager.getInstance(); | ||
let success = 0; | ||
let fail = 0; | ||
|
||
for (const file of expiredFiles) { | ||
try { | ||
const bucket = (() => { | ||
switch (file.bucketName) { | ||
case process.env.S3_PUBLIC_BUCKET: | ||
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return s3Manager.getPublicBucket(); | ||
case process.env.S3_PRIVATE_BUCKET: | ||
return s3Manager.getPrivateBucket(); | ||
default: | ||
throw new Error(`Unknown bucket name: ${file.bucketName}`); | ||
} | ||
})(); | ||
|
||
await bucket.delete(file.minioKey); | ||
|
||
await MongoMinioTtl.deleteOne({ _id: file._id }); | ||
|
||
success++; | ||
addLog.info(`Deleted expired minio file: ${file.minioKey} from bucket: ${file.bucketName}`); | ||
} catch (error) { | ||
fail++; | ||
addLog.error(`Failed to delete minio file: ${file.minioKey}`, error); | ||
} | ||
} | ||
|
||
addLog.info(`Minio TTL cleanup completed. Success: ${success}, Failed: ${fail}`); | ||
} catch (error) { | ||
addLog.error('Error in clearExpiredMinioFiles', error); | ||
} | ||
} | ||
|
||
export function clearExpiredMinioFilesCron() { | ||
// 每小时执行一次 | ||
setCron('0 */1 * * *', async () => { | ||
if ( | ||
await checkTimerLock({ | ||
timerId: TimerIdEnum.clearExpiredMinioFiles, | ||
lockMinuted: 59 | ||
}) | ||
) { | ||
await clearExpiredMinioFiles(); | ||
} | ||
}); | ||
} | ||
|
||
export async function addMinioTtlFile({ | ||
bucketName, | ||
minioKey, | ||
expiredTime | ||
}: { | ||
bucketName: string; | ||
minioKey: string; | ||
expiredTime?: Date; | ||
}) { | ||
try { | ||
await MongoMinioTtl.create({ | ||
bucketName, | ||
minioKey, | ||
expiredTime | ||
}); | ||
addLog.info(`Added minio TTL file: ${minioKey}, expiredTime: ${expiredTime}`); | ||
} catch (error) { | ||
addLog.error('Failed to add minio TTL file', error); | ||
throw error; | ||
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import { addMinioTtlFile } from './controller'; | ||
import { addLog } from '../../system/log'; | ||
|
||
/** | ||
* @param bucketName - S3 bucket 名称 | ||
* @param objectKey - S3 对象 key | ||
* @param temporay - 是否为临时文件 | ||
* @param ttl - TTL(单位:小时,仅临时文件有效),默认 7 天 | ||
*/ | ||
export async function afterCreatePresignedUrl({ | ||
bucketName, | ||
objectKey, | ||
temporay = false, | ||
ttl = 7 * 24 | ||
}: { | ||
bucketName: string; | ||
objectKey: string; | ||
temporay?: boolean; | ||
ttl?: number; | ||
}) { | ||
try { | ||
const expiredTime = temporay ? new Date(Date.now() + ttl * 3.6e6) : undefined; | ||
const info = `TTL: Registered ${temporay ? 'temporary' : 'permanent'} file: ${objectKey}${temporay ? `, expires in ${ttl} hours` : ''}`; | ||
await addMinioTtlFile({ bucketName, expiredTime, minioKey: objectKey }); | ||
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
addLog.info(info); | ||
} catch (error) { | ||
addLog.error('Failed to register minio TTL', error); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import { Schema, getMongoModel } from '../../../common/mongo'; | ||
import { type MinioTtlSchemaType } from '@fastgpt/global/common/file/minioTtl/type.d'; | ||
|
||
const collectionName = 'minio_ttl_files'; | ||
|
||
const MinioTtlSchema = new Schema({ | ||
bucketName: { | ||
type: String, | ||
required: true | ||
}, | ||
minioKey: { | ||
type: String, | ||
required: true | ||
}, | ||
expiredTime: { | ||
type: Date | ||
} | ||
}); | ||
|
||
try { | ||
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
MinioTtlSchema.index({ expiredTime: 1 }); | ||
MinioTtlSchema.index({ bucketName: 1, minioKey: 1 }); | ||
} catch (error) { | ||
console.log(error); | ||
} | ||
|
||
export const MongoMinioTtl = getMongoModel<MinioTtlSchemaType>(collectionName, MinioTtlSchema); |
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import { Client, type RemoveOptions, type CopyConditions, type LifecycleConfig } from 'minio'; | ||
import { | ||
defaultS3Options, | ||
type CreatePostPresignedUrlOptions, | ||
type CreatePostPresignedUrlParams, | ||
type CreatePostPresignedUrlResult, | ||
type S3BucketName, | ||
type S3Options | ||
} from '../types'; | ||
import type { IBucketBasicOperations } from '../interface'; | ||
import { | ||
createObjectKey, | ||
createPresignedUrlExpires, | ||
createTempObjectKey, | ||
inferContentType | ||
} from '../helpers'; | ||
import { afterCreatePresignedUrl } from '../../file/minioTtl/hooks'; | ||
|
||
export class S3BaseBucket implements IBucketBasicOperations { | ||
public client: Client; | ||
|
||
/** | ||
* | ||
* @param _bucket the bucket you want to operate | ||
* @param options the options for the s3 client | ||
* @param afterInits the function to be called after instantiating the s3 service | ||
*/ | ||
constructor( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 超过 2 个参数,用对象格式 {} |
||
private readonly _bucket: S3BucketName, | ||
private readonly afterInits?: (() => Promise<void> | void)[], | ||
public options: Partial<S3Options> = defaultS3Options | ||
) { | ||
options = { ...defaultS3Options, ...options }; | ||
this.options = options as S3Options; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this.options = { ...defaultS3Options, ...options } |
||
this.client = new Client(options as S3Options); | ||
|
||
const init = async () => { | ||
if (!(await this.exist())) { | ||
await this.client.makeBucket(this._bucket); | ||
} | ||
await Promise.all(this.afterInits?.map((afterInit) => afterInit()) ?? []); | ||
xqvvu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
init(); | ||
} | ||
|
||
get name(): string { | ||
return this._bucket; | ||
} | ||
|
||
async move(src: string, dst: string, options?: CopyConditions): Promise<void> { | ||
const bucket = this.name; | ||
await this.client.copyObject(bucket, dst, `/${bucket}/${src}`, options); | ||
await this.delete(src); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 失败了如何回退 |
||
} | ||
|
||
copy(src: string, dst: string, options?: CopyConditions): ReturnType<Client['copyObject']> { | ||
return this.client.copyObject(this.name, src, dst, options); | ||
} | ||
|
||
exist(): Promise<boolean> { | ||
return this.client.bucketExists(this.name); | ||
} | ||
|
||
async delete(objectKey: string, options?: RemoveOptions): Promise<void> { | ||
await this.client.removeObject(this.name, objectKey, options); | ||
} | ||
|
||
get(): Promise<void> { | ||
throw new Error('Method not implemented.'); | ||
} | ||
|
||
lifecycle(): Promise<LifecycleConfig | null> { | ||
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return this.client.getBucketLifecycle(this.name); | ||
} | ||
|
||
async createPostPresignedUrl( | ||
params: CreatePostPresignedUrlParams, | ||
options: CreatePostPresignedUrlOptions = {} | ||
): Promise<CreatePostPresignedUrlResult> { | ||
const { temporay, ttl: ttlDays = 7 } = options; | ||
const contentType = inferContentType(params.filename); | ||
const maxFileSize = this.options.maxFileSize as number; | ||
const key = temporay ? createTempObjectKey(params) : createObjectKey(params); | ||
|
||
const policy = this.client.newPostPolicy(); | ||
policy.setKey(key); | ||
policy.setBucket(this.name); | ||
policy.setContentType(contentType); | ||
policy.setContentLengthRange(1, maxFileSize); | ||
policy.setExpires(createPresignedUrlExpires(10)); | ||
policy.setUserMetaData({ | ||
filename: encodeURIComponent(params.filename), | ||
visibility: params.visibility | ||
}); | ||
|
||
const { formData, postURL } = await this.client.presignedPostPolicy(policy); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
await afterCreatePresignedUrl({ | ||
bucketName: this.name, | ||
objectKey: key, | ||
temporay, | ||
ttl: ttlDays | ||
xqvvu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
}); | ||
|
||
return { | ||
url: postURL, | ||
fields: formData | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import { type S3Options } from '../types'; | ||
import { S3PublicBucket } from './public'; | ||
import { S3PrivateBucket } from './private'; | ||
|
||
export class S3BucketManager { | ||
private static instance: S3BucketManager; | ||
private publicBucket: S3PublicBucket | null = null; | ||
private privateBucket: S3PrivateBucket | null = null; | ||
|
||
private constructor() {} | ||
|
||
static getInstance(): S3BucketManager { | ||
return (this.instance ??= new S3BucketManager()); | ||
} | ||
|
||
getPublicBucket(options?: Partial<S3Options>): S3PublicBucket { | ||
return (this.publicBucket ??= new S3PublicBucket(options)); | ||
} | ||
|
||
getPrivateBucket(options?: Partial<S3Options>): S3PrivateBucket { | ||
return (this.privateBucket ??= new S3PrivateBucket(options)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import { S3BaseBucket } from './base'; | ||
import { | ||
S3Buckets, | ||
type CreatePostPresignedUrlParams, | ||
type CreatePostPresignedUrlResult, | ||
type S3Options | ||
} from '../types'; | ||
|
||
export class S3PrivateBucket extends S3BaseBucket { | ||
constructor(options?: Partial<S3Options>) { | ||
super(S3Buckets.private, undefined, options); | ||
} | ||
|
||
override createPostPresignedUrl( | ||
params: Omit<CreatePostPresignedUrlParams, 'visibility'> | ||
): Promise<CreatePostPresignedUrlResult> { | ||
return super.createPostPresignedUrl({ ...params, visibility: 'private' }); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import { S3BaseBucket } from './base'; | ||
import { createBucketPolicy } from '../helpers'; | ||
import { | ||
S3Buckets, | ||
type CreatePostPresignedUrlOptions, | ||
type CreatePostPresignedUrlParams, | ||
type CreatePostPresignedUrlResult, | ||
type S3Options | ||
} from '../types'; | ||
import type { IPublicBucketOperations } from '../interface'; | ||
import { lifecycleOfTemporaryAvatars } from '../lifecycle'; | ||
|
||
export class S3PublicBucket extends S3BaseBucket implements IPublicBucketOperations { | ||
constructor(options?: Partial<S3Options>) { | ||
super( | ||
S3Buckets.public, | ||
[ | ||
// set bucket policy | ||
async () => { | ||
const bucket = this.name; | ||
const policy = createBucketPolicy(bucket); | ||
try { | ||
await this.client.setBucketPolicy(bucket, policy); | ||
} catch (error) { | ||
// TODO: maybe it was a cloud S3 that doesn't allow us to set the policy, so that cause the error, | ||
// maybe we can ignore the error, or we have other plan to handle this. | ||
} | ||
}, | ||
// set bucket lifecycle | ||
async () => { | ||
const bucket = this.name; | ||
await this.client.setBucketLifecycle(bucket, lifecycleOfTemporaryAvatars); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. try catch |
||
} | ||
], | ||
options | ||
); | ||
} | ||
|
||
createPublicUrl(objectKey: string): string { | ||
xqvvu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const protocol = this.options.useSSL ? 'https' : 'http'; | ||
const hostname = this.options.endPoint; | ||
const port = this.options.port; | ||
const bucket = this.name; | ||
|
||
return `${protocol}://${hostname}:${port}/${bucket}/${objectKey}`; | ||
} | ||
|
||
override createPostPresignedUrl( | ||
params: Omit<CreatePostPresignedUrlParams, 'visibility'>, | ||
options: CreatePostPresignedUrlOptions = {} | ||
): Promise<CreatePostPresignedUrlResult> { | ||
return super.createPostPresignedUrl({ ...params, visibility: 'public' }, options); | ||
} | ||
} |
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.