Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions src/http/plugins/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,39 @@ declare module 'fastify' {
}
}

const abortOnce = (ac: AbortController) => {
if (!ac.signal.aborted) ac.abort()
}

export const signals = fastifyPlugin(
async function (fastify: FastifyInstance) {
fastify.addHook('onRequest', async (req, res) => {
fastify.addHook('onRequest', async (req, reply) => {
req.signals = {
body: new AbortController(),
response: new AbortController(),
disconnect: new AbortController(),
}

// Client terminated the request before the body was fully sent
// Body upload interrupted (fires early)
req.raw.once('close', () => {
if (req.raw.aborted) {
req.signals.body.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
abortOnce(req.signals.body)
abortOnce(req.signals.disconnect)
}
})

// Client terminated the request before server finished sending the response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
if (aborted) {
req.signals.response.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
// Response interrupted (connection closed before finish)
reply.raw.once('close', () => {
if (!reply.raw.writableFinished) {
abortOnce(req.signals.response)
abortOnce(req.signals.disconnect)
}
})
})

fastify.addHook('onRequestAbort', async (req) => {
req.signals.body.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
abortOnce(req.signals.body)
abortOnce(req.signals.disconnect)
})
},
{ name: 'request-signals' }
Expand Down
1 change: 0 additions & 1 deletion src/http/routes/s3/commands/create-multipart-upload.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { S3ProtocolHandler } from '@storage/protocols/s3/s3-handler'
import { S3Router } from '../router'
import { ROUTE_OPERATIONS } from '../../operations'
import { S3Backend } from '@storage/backend'
import { ERRORS } from '@internal/errors'

const CreateMultiPartUploadInput = {
Expand Down
9 changes: 4 additions & 5 deletions src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ export type UploadPart = {
/**
* A generic storage Adapter to interact with files
*/
export abstract class StorageBackendAdapter {
client: any
constructor() {
this.client = null
}
export abstract class StorageBackendAdapter<T = unknown> {
constructor() {}

abstract getClient(): T

async list(
bucket: string,
Expand Down
4 changes: 4 additions & 0 deletions src/storage/backend/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export class FileBackend implements StorageBackendAdapter {
this.etagAlgorithm = storageFileEtagAlgorithm
}

getClient(): unknown {
return null
}

async list(
bucket: string,
options?: {
Expand Down
9 changes: 7 additions & 2 deletions src/storage/backend/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { StorageBackendAdapter } from './adapter'
import { FileBackend } from './file'
import { S3Backend, S3ClientOptions } from './s3/adapter'
import { getConfig, StorageBackendType } from '../../config'
import { S3Client } from '@aws-sdk/client-s3'

export * from './s3'
export * from './file'
Expand All @@ -14,14 +15,18 @@ type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
? S3ClientOptions
: undefined

type BackendAdapterForType<Type extends StorageBackendType> = Type extends 's3'
? StorageBackendAdapter<S3Client>
: StorageBackendAdapter

export function createStorageBackend<Type extends StorageBackendType>(
type: Type,
config?: ConfigForStorage<Type>
) {
let storageBackend: StorageBackendAdapter
let storageBackend: BackendAdapterForType<Type>

if (type === 'file') {
storageBackend = new FileBackend()
storageBackend = new FileBackend() as BackendAdapterForType<Type>
} else {
const defaultOptions: S3ClientOptions = {
region: storageS3Region,
Expand Down
105 changes: 74 additions & 31 deletions src/storage/backend/s3/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,74 @@ export interface S3ClientOptions {
accessKey?: string
secretKey?: string
role?: string
httpAgent?: InstrumentedAgent
requestTimeout?: number
httpAgents?: S3HttpAgents
}

interface S3HttpAgents {
api: InstrumentedAgent
upload: InstrumentedAgent
download: InstrumentedAgent
}

/**
* S3Backend
* Interacts with a s3-compatible file system with this S3Adapter
*/
export class S3Backend implements StorageBackendAdapter {
client: S3Client
agent: InstrumentedAgent
export class S3Backend implements StorageBackendAdapter<S3Client> {
apiClient: S3Client
uploadClient: S3Client
downloadClient: S3Client

agents: {
api: InstrumentedAgent
upload: InstrumentedAgent
download: InstrumentedAgent
}

constructor(options: S3ClientOptions) {
this.agent =
options.httpAgent ??
createAgent('s3_default', {
this.agents = options.httpAgents ?? {
api: createAgent('s3_api', {
maxSockets: storageS3MaxSockets,
})
}),
upload: createAgent('s3_upload', {
maxSockets: storageS3MaxSockets,
}),
download: createAgent('s3_download', {
maxSockets: storageS3MaxSockets,
}),
}

if (this.agent.httpsAgent && tracingEnabled) {
this.agent.monitor()
if (this.agents && tracingEnabled) {
Object.values(this.agents).forEach((agent) => {
agent.monitor()
})
}

// Default client for API operations
this.client = this.createS3Client({
this.apiClient = this.createS3Client({
...options,
name: 's3_default',
httpAgent: this.agent,
name: 's3_api',
httpAgent: this.agents.api,
})

// Dedicated client for downloads
this.downloadClient = this.createS3Client({
...options,
name: 's3_download',
httpAgent: this.agents.download,
})

// Dedicated client for uploads
this.uploadClient = this.createS3Client({
...options,
name: 's3_upload',
httpAgent: this.agents.upload,
})
}

getClient() {
return this.apiClient
}

/**
Expand Down Expand Up @@ -98,7 +137,7 @@ export class S3Backend implements StorageBackendAdapter {
input.IfModifiedSince = new Date(headers.ifModifiedSince)
}
const command = new GetObjectCommand(input)
const data = await this.client.send(command, {
const data = await this.downloadClient.send(command, {
abortSignal: signal,
})

Expand Down Expand Up @@ -144,7 +183,7 @@ export class S3Backend implements StorageBackendAdapter {
const dataStream = tracingFeatures?.upload ? monitorStream(body) : body

const upload = new Upload({
client: this.client,
client: this.uploadClient,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Expand Down Expand Up @@ -212,7 +251,7 @@ export class S3Backend implements StorageBackendAdapter {
Bucket: bucket,
Key: withOptionalVersion(key, version),
})
await this.client.send(command)
await this.apiClient.send(command)
}

/**
Expand Down Expand Up @@ -251,7 +290,7 @@ export class S3Backend implements StorageBackendAdapter {
ContentType: metadata?.mimetype,
CacheControl: metadata?.cacheControl,
})
const data = await this.client.send(command)
const data = await this.apiClient.send(command)
return {
httpStatusCode: data.$metadata.httpStatusCode || 200,
eTag: data.CopyObjectResult?.ETag || '',
Expand Down Expand Up @@ -280,7 +319,7 @@ export class S3Backend implements StorageBackendAdapter {
ContinuationToken: options?.nextToken || undefined,
StartAfter: options?.startAfter,
})
const data = await this.client.send(command)
const data = await this.apiClient.send(command)
const keys =
data.Contents?.filter((ele) => {
if (options?.beforeDate) {
Expand Down Expand Up @@ -327,7 +366,7 @@ export class S3Backend implements StorageBackendAdapter {
Objects: s3Prefixes,
},
})
await this.client.send(command)
await this.apiClient.send(command)
} catch (e) {
throw StorageBackendError.fromError(e)
}
Expand All @@ -349,7 +388,7 @@ export class S3Backend implements StorageBackendAdapter {
Bucket: bucket,
Key: withOptionalVersion(key, version),
})
const data = await this.client.send(command)
const data = await this.apiClient.send(command)
return {
cacheControl: data.CacheControl || 'no-cache',
mimetype: data.ContentType || 'application/octet-stream',
Expand Down Expand Up @@ -380,7 +419,7 @@ export class S3Backend implements StorageBackendAdapter {
MaxParts: maxParts,
})

const result = await this.client.send(command)
const result = await this.apiClient.send(command)

return {
parts: result.Parts || [],
Expand All @@ -406,7 +445,7 @@ export class S3Backend implements StorageBackendAdapter {
}

const command = new GetObjectCommand(input)
return getSignedUrl(this.client, command, { expiresIn: 600 })
return getSignedUrl(this.apiClient, command, { expiresIn: 600 })
}

async createMultiPartUpload(
Expand All @@ -428,7 +467,7 @@ export class S3Backend implements StorageBackendAdapter {
},
})

const resp = await this.client.send(createMultiPart)
const resp = await this.apiClient.send(createMultiPart)

if (!resp.UploadId) {
throw ERRORS.InvalidUploadId()
Expand Down Expand Up @@ -457,7 +496,7 @@ export class S3Backend implements StorageBackendAdapter {
ContentLength: length,
})

const resp = await this.client.send(paralellUploadS3, {
const resp = await this.uploadClient.send(paralellUploadS3, {
abortSignal: signal,
})

Expand Down Expand Up @@ -491,7 +530,7 @@ export class S3Backend implements StorageBackendAdapter {
UploadId: uploadId,
})

const partsResponse = await this.client.send(listPartsInput)
const partsResponse = await this.apiClient.send(listPartsInput)
parts = partsResponse.Parts || []
}

Expand All @@ -507,7 +546,7 @@ export class S3Backend implements StorageBackendAdapter {
},
})

const response = await this.client.send(completeUpload)
const response = await this.apiClient.send(completeUpload)

let location = key
let bucket = bucketName
Expand All @@ -534,7 +573,7 @@ export class S3Backend implements StorageBackendAdapter {
Key: key,
UploadId: uploadId,
})
await this.client.send(abortUpload)
await this.apiClient.send(abortUpload)
}

async uploadPartCopy(
Expand All @@ -556,7 +595,7 @@ export class S3Backend implements StorageBackendAdapter {
CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined,
})

const part = await this.client.send(uploadPartCopy)
const part = await this.uploadClient.send(uploadPartCopy)

return {
eTag: part.CopyPartResult?.ETag,
Expand All @@ -565,14 +604,18 @@ export class S3Backend implements StorageBackendAdapter {
}

async backup(backupInfo: BackupObjectInfo) {
return new ObjectBackup(this.client, backupInfo).backup()
return new ObjectBackup(this.apiClient, backupInfo).backup()
}

close() {
this.agent.close()
Object.values(this.agents).forEach((agent) => {
agent.close()
})
}

protected createS3Client(options: S3ClientOptions & { name: string }) {
protected createS3Client(
options: S3ClientOptions & { name: string; httpAgent: InstrumentedAgent }
) {
const params: S3ClientConfig = {
region: options.region,
runtime: 'node',
Expand Down
Loading
Loading