diff --git a/src/http/plugins/signals.ts b/src/http/plugins/signals.ts index a4734621..92914841 100644 --- a/src/http/plugins/signals.ts +++ b/src/http/plugins/signals.ts @@ -11,45 +11,39 @@ declare module 'fastify' { } } +const abortOnce = (ac: AbortController) => { + if (!ac.signal.aborted) ac.abort() +} + export const signals = fastifyPlugin( async function (fastify: FastifyInstance) { - fastify.addHook('onRequest', async (req, res) => { + fastify.addHook('onRequest', async (req, reply) => { req.signals = { body: new AbortController(), response: new AbortController(), disconnect: new AbortController(), } - // Client terminated the request before the body was fully sent + // Body upload interrupted (fires early) req.raw.once('close', () => { if (req.raw.aborted) { - req.signals.body.abort() - - if (!req.signals.disconnect.signal.aborted) { - req.signals.disconnect.abort() - } + abortOnce(req.signals.body) + abortOnce(req.signals.disconnect) } }) - // Client terminated the request before server finished sending the response - res.raw.once('close', () => { - const aborted = !res.raw.writableFinished - if (aborted) { - req.signals.response.abort() - - if (!req.signals.disconnect.signal.aborted) { - req.signals.disconnect.abort() - } + // Response interrupted (connection closed before finish) + reply.raw.once('close', () => { + if (!reply.raw.writableFinished) { + abortOnce(req.signals.response) + abortOnce(req.signals.disconnect) } }) }) fastify.addHook('onRequestAbort', async (req) => { - req.signals.body.abort() - - if (!req.signals.disconnect.signal.aborted) { - req.signals.disconnect.abort() - } + abortOnce(req.signals.body) + abortOnce(req.signals.disconnect) }) }, { name: 'request-signals' } diff --git a/src/http/routes/s3/commands/create-multipart-upload.ts b/src/http/routes/s3/commands/create-multipart-upload.ts index 6914400b..af631617 100644 --- a/src/http/routes/s3/commands/create-multipart-upload.ts +++ b/src/http/routes/s3/commands/create-multipart-upload.ts @@ -1,7 +1,6 @@ import { S3ProtocolHandler } from '@storage/protocols/s3/s3-handler' import { S3Router } from '../router' import { ROUTE_OPERATIONS } from '../../operations' -import { S3Backend } from '@storage/backend' import { ERRORS } from '@internal/errors' const CreateMultiPartUploadInput = { diff --git a/src/storage/backend/adapter.ts b/src/storage/backend/adapter.ts index 94466229..21c40597 100644 --- a/src/storage/backend/adapter.ts +++ b/src/storage/backend/adapter.ts @@ -46,11 +46,10 @@ export type UploadPart = { /** * A generic storage Adapter to interact with files */ -export abstract class StorageBackendAdapter { - client: any - constructor() { - this.client = null - } +export abstract class StorageBackendAdapter { + constructor() {} + + abstract getClient(): T async list( bucket: string, diff --git a/src/storage/backend/file.ts b/src/storage/backend/file.ts index 44e14e98..4537188d 100644 --- a/src/storage/backend/file.ts +++ b/src/storage/backend/file.ts @@ -58,6 +58,10 @@ export class FileBackend implements StorageBackendAdapter { this.etagAlgorithm = storageFileEtagAlgorithm } + getClient(): unknown { + return null + } + async list( bucket: string, options?: { diff --git a/src/storage/backend/index.ts b/src/storage/backend/index.ts index 46e5b5a4..80cb56fd 100644 --- a/src/storage/backend/index.ts +++ b/src/storage/backend/index.ts @@ -2,6 +2,7 @@ import { StorageBackendAdapter } from './adapter' import { FileBackend } from './file' import { S3Backend, S3ClientOptions } from './s3/adapter' import { getConfig, StorageBackendType } from '../../config' +import { S3Client } from '@aws-sdk/client-s3' export * from './s3' export * from './file' @@ -14,14 +15,18 @@ type ConfigForStorage = Type extends 's3' ? S3ClientOptions : undefined +type BackendAdapterForType = Type extends 's3' + ? StorageBackendAdapter + : StorageBackendAdapter + export function createStorageBackend( type: Type, config?: ConfigForStorage ) { - let storageBackend: StorageBackendAdapter + let storageBackend: BackendAdapterForType if (type === 'file') { - storageBackend = new FileBackend() + storageBackend = new FileBackend() as BackendAdapterForType } else { const defaultOptions: S3ClientOptions = { region: storageS3Region, diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index 6e730be3..7c6fa9b7 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -42,35 +42,74 @@ export interface S3ClientOptions { accessKey?: string secretKey?: string role?: string - httpAgent?: InstrumentedAgent requestTimeout?: number + httpAgents?: S3HttpAgents +} + +interface S3HttpAgents { + api: InstrumentedAgent + upload: InstrumentedAgent + download: InstrumentedAgent } /** * S3Backend * Interacts with a s3-compatible file system with this S3Adapter */ -export class S3Backend implements StorageBackendAdapter { - client: S3Client - agent: InstrumentedAgent +export class S3Backend implements StorageBackendAdapter { + apiClient: S3Client + uploadClient: S3Client + downloadClient: S3Client + + agents: { + api: InstrumentedAgent + upload: InstrumentedAgent + download: InstrumentedAgent + } constructor(options: S3ClientOptions) { - this.agent = - options.httpAgent ?? - createAgent('s3_default', { + this.agents = options.httpAgents ?? { + api: createAgent('s3_api', { maxSockets: storageS3MaxSockets, - }) + }), + upload: createAgent('s3_upload', { + maxSockets: storageS3MaxSockets, + }), + download: createAgent('s3_download', { + maxSockets: storageS3MaxSockets, + }), + } - if (this.agent.httpsAgent && tracingEnabled) { - this.agent.monitor() + if (this.agents && tracingEnabled) { + Object.values(this.agents).forEach((agent) => { + agent.monitor() + }) } // Default client for API operations - this.client = this.createS3Client({ + this.apiClient = this.createS3Client({ ...options, - name: 's3_default', - httpAgent: this.agent, + name: 's3_api', + httpAgent: this.agents.api, }) + + // Dedicated client for downloads + this.downloadClient = this.createS3Client({ + ...options, + name: 's3_download', + httpAgent: this.agents.download, + }) + + // Dedicated client for uploads + this.uploadClient = this.createS3Client({ + ...options, + name: 's3_upload', + httpAgent: this.agents.upload, + }) + } + + getClient() { + return this.apiClient } /** @@ -98,7 +137,7 @@ export class S3Backend implements StorageBackendAdapter { input.IfModifiedSince = new Date(headers.ifModifiedSince) } const command = new GetObjectCommand(input) - const data = await this.client.send(command, { + const data = await this.downloadClient.send(command, { abortSignal: signal, }) @@ -144,7 +183,7 @@ export class S3Backend implements StorageBackendAdapter { const dataStream = tracingFeatures?.upload ? monitorStream(body) : body const upload = new Upload({ - client: this.client, + client: this.uploadClient, params: { Bucket: bucketName, Key: withOptionalVersion(key, version), @@ -212,7 +251,7 @@ export class S3Backend implements StorageBackendAdapter { Bucket: bucket, Key: withOptionalVersion(key, version), }) - await this.client.send(command) + await this.apiClient.send(command) } /** @@ -251,7 +290,7 @@ export class S3Backend implements StorageBackendAdapter { ContentType: metadata?.mimetype, CacheControl: metadata?.cacheControl, }) - const data = await this.client.send(command) + const data = await this.apiClient.send(command) return { httpStatusCode: data.$metadata.httpStatusCode || 200, eTag: data.CopyObjectResult?.ETag || '', @@ -280,7 +319,7 @@ export class S3Backend implements StorageBackendAdapter { ContinuationToken: options?.nextToken || undefined, StartAfter: options?.startAfter, }) - const data = await this.client.send(command) + const data = await this.apiClient.send(command) const keys = data.Contents?.filter((ele) => { if (options?.beforeDate) { @@ -327,7 +366,7 @@ export class S3Backend implements StorageBackendAdapter { Objects: s3Prefixes, }, }) - await this.client.send(command) + await this.apiClient.send(command) } catch (e) { throw StorageBackendError.fromError(e) } @@ -349,7 +388,7 @@ export class S3Backend implements StorageBackendAdapter { Bucket: bucket, Key: withOptionalVersion(key, version), }) - const data = await this.client.send(command) + const data = await this.apiClient.send(command) return { cacheControl: data.CacheControl || 'no-cache', mimetype: data.ContentType || 'application/octet-stream', @@ -380,7 +419,7 @@ export class S3Backend implements StorageBackendAdapter { MaxParts: maxParts, }) - const result = await this.client.send(command) + const result = await this.apiClient.send(command) return { parts: result.Parts || [], @@ -406,7 +445,7 @@ export class S3Backend implements StorageBackendAdapter { } const command = new GetObjectCommand(input) - return getSignedUrl(this.client, command, { expiresIn: 600 }) + return getSignedUrl(this.apiClient, command, { expiresIn: 600 }) } async createMultiPartUpload( @@ -428,7 +467,7 @@ export class S3Backend implements StorageBackendAdapter { }, }) - const resp = await this.client.send(createMultiPart) + const resp = await this.apiClient.send(createMultiPart) if (!resp.UploadId) { throw ERRORS.InvalidUploadId() @@ -457,7 +496,7 @@ export class S3Backend implements StorageBackendAdapter { ContentLength: length, }) - const resp = await this.client.send(paralellUploadS3, { + const resp = await this.uploadClient.send(paralellUploadS3, { abortSignal: signal, }) @@ -491,7 +530,7 @@ export class S3Backend implements StorageBackendAdapter { UploadId: uploadId, }) - const partsResponse = await this.client.send(listPartsInput) + const partsResponse = await this.apiClient.send(listPartsInput) parts = partsResponse.Parts || [] } @@ -507,7 +546,7 @@ export class S3Backend implements StorageBackendAdapter { }, }) - const response = await this.client.send(completeUpload) + const response = await this.apiClient.send(completeUpload) let location = key let bucket = bucketName @@ -534,7 +573,7 @@ export class S3Backend implements StorageBackendAdapter { Key: key, UploadId: uploadId, }) - await this.client.send(abortUpload) + await this.apiClient.send(abortUpload) } async uploadPartCopy( @@ -556,7 +595,7 @@ export class S3Backend implements StorageBackendAdapter { CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined, }) - const part = await this.client.send(uploadPartCopy) + const part = await this.uploadClient.send(uploadPartCopy) return { eTag: part.CopyPartResult?.ETag, @@ -565,14 +604,18 @@ export class S3Backend implements StorageBackendAdapter { } async backup(backupInfo: BackupObjectInfo) { - return new ObjectBackup(this.client, backupInfo).backup() + return new ObjectBackup(this.apiClient, backupInfo).backup() } close() { - this.agent.close() + Object.values(this.agents).forEach((agent) => { + agent.close() + }) } - protected createS3Client(options: S3ClientOptions & { name: string }) { + protected createS3Client( + options: S3ClientOptions & { name: string; httpAgent: InstrumentedAgent } + ) { const params: S3ClientConfig = { region: options.region, runtime: 'node', diff --git a/src/storage/events/base-event.ts b/src/storage/events/base-event.ts index 0163eceb..efb1d9a4 100644 --- a/src/storage/events/base-event.ts +++ b/src/storage/events/base-event.ts @@ -80,23 +80,27 @@ export abstract class BaseEvent> extends return new Storage(this.getOrCreateStorageBackend(), db, new TenantLocation(storageS3Bucket)) } - protected static getOrCreateStorageBackend(monitor = false) { + protected static getOrCreateStorageBackend() { if (storageBackend) { return storageBackend } - const httpAgent = createAgent('s3_worker', { - maxSockets: storageS3MaxSockets, - }) + const httpAgents = { + api: createAgent('s3_worker_api', { + maxSockets: storageS3MaxSockets, + }), + download: createAgent('s3_worker_download', { + maxSockets: storageS3MaxSockets, + }), + upload: createAgent('s3_worker_upload', { + maxSockets: storageS3MaxSockets, + }), + } storageBackend = createStorageBackend(storageBackendType, { - httpAgent: httpAgent, + httpAgents, }) - if (monitor) { - httpAgent.monitor() - } - return storageBackend } } diff --git a/src/test/rls.test.ts b/src/test/rls.test.ts index f6084ee4..2f27fbd3 100644 --- a/src/test/rls.test.ts +++ b/src/test/rls.test.ts @@ -74,7 +74,7 @@ const testSpec = yaml.load( const { serviceKeyAsync, tenantId, jwtSecret, databaseURL, storageS3Bucket, storageBackendType } = getConfig() const backend = createStorageBackend(storageBackendType) -const client = backend.client +const client = backend.getClient() let appInstance: FastifyInstance jest.setTimeout(10000) diff --git a/src/test/s3-locker.test.ts b/src/test/s3-locker.test.ts index df882736..2cde6ff0 100644 --- a/src/test/s3-locker.test.ts +++ b/src/test/s3-locker.test.ts @@ -18,7 +18,7 @@ import { backends } from '../storage' const { storageS3Bucket, storageBackendType } = getConfig() const backend = backends.createStorageBackend(storageBackendType) -const s3ClientFromBackend = backend.client +const s3ClientFromBackend = backend.getClient() describe('S3Locker', () => { let s3Client: S3Client diff --git a/src/test/tus.test.ts b/src/test/tus.test.ts index 317e43b9..c5ecffae 100644 --- a/src/test/tus.test.ts +++ b/src/test/tus.test.ts @@ -23,7 +23,7 @@ const oneChunkFile = fs.createReadStream(path.resolve(__dirname, 'assets', 'sadc const localServerAddress = 'http://127.0.0.1:8999' const backend = backends.createStorageBackend(storageBackendType) -const client = backend.client +const client = backend.getClient() describe('Tus multipart', () => { let db: StorageKnexDB