Skip to content

Commit 905ce9c

Browse files
committed
fix: split s3 client into 3 separate clients
1 parent 635177e commit 905ce9c

File tree

9 files changed

+105
-51
lines changed

9 files changed

+105
-51
lines changed

src/http/routes/s3/commands/create-multipart-upload.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { S3ProtocolHandler } from '@storage/protocols/s3/s3-handler'
22
import { S3Router } from '../router'
33
import { ROUTE_OPERATIONS } from '../../operations'
4-
import { S3Backend } from '@storage/backend'
54
import { ERRORS } from '@internal/errors'
65

76
const CreateMultiPartUploadInput = {

src/storage/backend/adapter.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@ export type UploadPart = {
4646
/**
4747
* A generic storage Adapter to interact with files
4848
*/
49-
export abstract class StorageBackendAdapter {
50-
client: any
51-
constructor() {
52-
this.client = null
53-
}
49+
export abstract class StorageBackendAdapter<T = unknown> {
50+
constructor() {}
51+
52+
abstract getClient(): T
5453

5554
async list(
5655
bucket: string,

src/storage/backend/file.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ export class FileBackend implements StorageBackendAdapter {
5858
this.etagAlgorithm = storageFileEtagAlgorithm
5959
}
6060

61+
getClient(): unknown {
62+
return null
63+
}
64+
6165
async list(
6266
bucket: string,
6367
options?: {

src/storage/backend/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { StorageBackendAdapter } from './adapter'
22
import { FileBackend } from './file'
33
import { S3Backend, S3ClientOptions } from './s3/adapter'
44
import { getConfig, StorageBackendType } from '../../config'
5+
import { S3Client } from '@aws-sdk/client-s3'
56

67
export * from './s3'
78
export * from './file'
@@ -14,14 +15,18 @@ type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
1415
? S3ClientOptions
1516
: undefined
1617

18+
type BackendAdapterForType<Type extends StorageBackendType> = Type extends 's3'
19+
? StorageBackendAdapter<S3Client>
20+
: StorageBackendAdapter
21+
1722
export function createStorageBackend<Type extends StorageBackendType>(
1823
type: Type,
1924
config?: ConfigForStorage<Type>
2025
) {
21-
let storageBackend: StorageBackendAdapter
26+
let storageBackend: BackendAdapterForType<Type>
2227

2328
if (type === 'file') {
24-
storageBackend = new FileBackend()
29+
storageBackend = new FileBackend() as BackendAdapterForType<Type>
2530
} else {
2631
const defaultOptions: S3ClientOptions = {
2732
region: storageS3Region,

src/storage/backend/s3/adapter.ts

Lines changed: 74 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,35 +42,74 @@ export interface S3ClientOptions {
4242
accessKey?: string
4343
secretKey?: string
4444
role?: string
45-
httpAgent?: InstrumentedAgent
4645
requestTimeout?: number
46+
httpAgents?: S3HttpAgents
47+
}
48+
49+
interface S3HttpAgents {
50+
api: InstrumentedAgent
51+
upload: InstrumentedAgent
52+
download: InstrumentedAgent
4753
}
4854

4955
/**
5056
* S3Backend
5157
* Interacts with a s3-compatible file system with this S3Adapter
5258
*/
53-
export class S3Backend implements StorageBackendAdapter {
54-
client: S3Client
55-
agent: InstrumentedAgent
59+
export class S3Backend implements StorageBackendAdapter<S3Client> {
60+
apiClient: S3Client
61+
uploadClient: S3Client
62+
downloadClient: S3Client
63+
64+
agents: {
65+
api: InstrumentedAgent
66+
upload: InstrumentedAgent
67+
download: InstrumentedAgent
68+
}
5669

5770
constructor(options: S3ClientOptions) {
58-
this.agent =
59-
options.httpAgent ??
60-
createAgent('s3_default', {
71+
this.agents = options.httpAgents ?? {
72+
api: createAgent('s3_api', {
6173
maxSockets: storageS3MaxSockets,
62-
})
74+
}),
75+
upload: createAgent('s3_upload', {
76+
maxSockets: storageS3MaxSockets,
77+
}),
78+
download: createAgent('s3_download', {
79+
maxSockets: storageS3MaxSockets,
80+
}),
81+
}
6382

64-
if (this.agent.httpsAgent && tracingEnabled) {
65-
this.agent.monitor()
83+
if (this.agents && tracingEnabled) {
84+
Object.values(this.agents).forEach((agent) => {
85+
agent.monitor()
86+
})
6687
}
6788

6889
// Default client for API operations
69-
this.client = this.createS3Client({
90+
this.apiClient = this.createS3Client({
7091
...options,
71-
name: 's3_default',
72-
httpAgent: this.agent,
92+
name: 's3_api',
93+
httpAgent: this.agents.api,
7394
})
95+
96+
// Dedicated client for downloads
97+
this.downloadClient = this.createS3Client({
98+
...options,
99+
name: 's3_download',
100+
httpAgent: this.agents.download,
101+
})
102+
103+
// Dedicated client for uploads
104+
this.uploadClient = this.createS3Client({
105+
...options,
106+
name: 's3_upload',
107+
httpAgent: this.agents.upload,
108+
})
109+
}
110+
111+
getClient() {
112+
return this.apiClient
74113
}
75114

76115
/**
@@ -98,7 +137,7 @@ export class S3Backend implements StorageBackendAdapter {
98137
input.IfModifiedSince = new Date(headers.ifModifiedSince)
99138
}
100139
const command = new GetObjectCommand(input)
101-
const data = await this.client.send(command, {
140+
const data = await this.downloadClient.send(command, {
102141
abortSignal: signal,
103142
})
104143

@@ -144,7 +183,7 @@ export class S3Backend implements StorageBackendAdapter {
144183
const dataStream = tracingFeatures?.upload ? monitorStream(body) : body
145184

146185
const upload = new Upload({
147-
client: this.client,
186+
client: this.uploadClient,
148187
params: {
149188
Bucket: bucketName,
150189
Key: withOptionalVersion(key, version),
@@ -212,7 +251,7 @@ export class S3Backend implements StorageBackendAdapter {
212251
Bucket: bucket,
213252
Key: withOptionalVersion(key, version),
214253
})
215-
await this.client.send(command)
254+
await this.apiClient.send(command)
216255
}
217256

218257
/**
@@ -251,7 +290,7 @@ export class S3Backend implements StorageBackendAdapter {
251290
ContentType: metadata?.mimetype,
252291
CacheControl: metadata?.cacheControl,
253292
})
254-
const data = await this.client.send(command)
293+
const data = await this.apiClient.send(command)
255294
return {
256295
httpStatusCode: data.$metadata.httpStatusCode || 200,
257296
eTag: data.CopyObjectResult?.ETag || '',
@@ -280,7 +319,7 @@ export class S3Backend implements StorageBackendAdapter {
280319
ContinuationToken: options?.nextToken || undefined,
281320
StartAfter: options?.startAfter,
282321
})
283-
const data = await this.client.send(command)
322+
const data = await this.apiClient.send(command)
284323
const keys =
285324
data.Contents?.filter((ele) => {
286325
if (options?.beforeDate) {
@@ -327,7 +366,7 @@ export class S3Backend implements StorageBackendAdapter {
327366
Objects: s3Prefixes,
328367
},
329368
})
330-
await this.client.send(command)
369+
await this.apiClient.send(command)
331370
} catch (e) {
332371
throw StorageBackendError.fromError(e)
333372
}
@@ -349,7 +388,7 @@ export class S3Backend implements StorageBackendAdapter {
349388
Bucket: bucket,
350389
Key: withOptionalVersion(key, version),
351390
})
352-
const data = await this.client.send(command)
391+
const data = await this.apiClient.send(command)
353392
return {
354393
cacheControl: data.CacheControl || 'no-cache',
355394
mimetype: data.ContentType || 'application/octet-stream',
@@ -380,7 +419,7 @@ export class S3Backend implements StorageBackendAdapter {
380419
MaxParts: maxParts,
381420
})
382421

383-
const result = await this.client.send(command)
422+
const result = await this.apiClient.send(command)
384423

385424
return {
386425
parts: result.Parts || [],
@@ -406,7 +445,7 @@ export class S3Backend implements StorageBackendAdapter {
406445
}
407446

408447
const command = new GetObjectCommand(input)
409-
return getSignedUrl(this.client, command, { expiresIn: 600 })
448+
return getSignedUrl(this.apiClient, command, { expiresIn: 600 })
410449
}
411450

412451
async createMultiPartUpload(
@@ -428,7 +467,7 @@ export class S3Backend implements StorageBackendAdapter {
428467
},
429468
})
430469

431-
const resp = await this.client.send(createMultiPart)
470+
const resp = await this.apiClient.send(createMultiPart)
432471

433472
if (!resp.UploadId) {
434473
throw ERRORS.InvalidUploadId()
@@ -457,7 +496,7 @@ export class S3Backend implements StorageBackendAdapter {
457496
ContentLength: length,
458497
})
459498

460-
const resp = await this.client.send(paralellUploadS3, {
499+
const resp = await this.uploadClient.send(paralellUploadS3, {
461500
abortSignal: signal,
462501
})
463502

@@ -491,7 +530,7 @@ export class S3Backend implements StorageBackendAdapter {
491530
UploadId: uploadId,
492531
})
493532

494-
const partsResponse = await this.client.send(listPartsInput)
533+
const partsResponse = await this.apiClient.send(listPartsInput)
495534
parts = partsResponse.Parts || []
496535
}
497536

@@ -507,7 +546,7 @@ export class S3Backend implements StorageBackendAdapter {
507546
},
508547
})
509548

510-
const response = await this.client.send(completeUpload)
549+
const response = await this.apiClient.send(completeUpload)
511550

512551
let location = key
513552
let bucket = bucketName
@@ -534,7 +573,7 @@ export class S3Backend implements StorageBackendAdapter {
534573
Key: key,
535574
UploadId: uploadId,
536575
})
537-
await this.client.send(abortUpload)
576+
await this.apiClient.send(abortUpload)
538577
}
539578

540579
async uploadPartCopy(
@@ -556,7 +595,7 @@ export class S3Backend implements StorageBackendAdapter {
556595
CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined,
557596
})
558597

559-
const part = await this.client.send(uploadPartCopy)
598+
const part = await this.uploadClient.send(uploadPartCopy)
560599

561600
return {
562601
eTag: part.CopyPartResult?.ETag,
@@ -565,14 +604,18 @@ export class S3Backend implements StorageBackendAdapter {
565604
}
566605

567606
async backup(backupInfo: BackupObjectInfo) {
568-
return new ObjectBackup(this.client, backupInfo).backup()
607+
return new ObjectBackup(this.apiClient, backupInfo).backup()
569608
}
570609

571610
close() {
572-
this.agent.close()
611+
Object.values(this.agents).forEach((agent) => {
612+
agent.close()
613+
})
573614
}
574615

575-
protected createS3Client(options: S3ClientOptions & { name: string }) {
616+
protected createS3Client(
617+
options: S3ClientOptions & { name: string; httpAgent: InstrumentedAgent }
618+
) {
576619
const params: S3ClientConfig = {
577620
region: options.region,
578621
runtime: 'node',

src/storage/events/base-event.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,27 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends
8080
return new Storage(this.getOrCreateStorageBackend(), db, new TenantLocation(storageS3Bucket))
8181
}
8282

83-
protected static getOrCreateStorageBackend(monitor = false) {
83+
protected static getOrCreateStorageBackend() {
8484
if (storageBackend) {
8585
return storageBackend
8686
}
8787

88-
const httpAgent = createAgent('s3_worker', {
89-
maxSockets: storageS3MaxSockets,
90-
})
88+
const httpAgents = {
89+
api: createAgent('s3_worker_api', {
90+
maxSockets: storageS3MaxSockets,
91+
}),
92+
download: createAgent('s3_worker_download', {
93+
maxSockets: storageS3MaxSockets,
94+
}),
95+
upload: createAgent('s3_worker_upload', {
96+
maxSockets: storageS3MaxSockets,
97+
}),
98+
}
9199

92100
storageBackend = createStorageBackend(storageBackendType, {
93-
httpAgent: httpAgent,
101+
httpAgents,
94102
})
95103

96-
if (monitor) {
97-
httpAgent.monitor()
98-
}
99-
100104
return storageBackend
101105
}
102106
}

src/test/rls.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ const testSpec = yaml.load(
7474
const { serviceKeyAsync, tenantId, jwtSecret, databaseURL, storageS3Bucket, storageBackendType } =
7575
getConfig()
7676
const backend = createStorageBackend(storageBackendType)
77-
const client = backend.client
77+
const client = backend.getClient()
7878
let appInstance: FastifyInstance
7979

8080
jest.setTimeout(10000)

src/test/s3-locker.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { backends } from '../storage'
1818

1919
const { storageS3Bucket, storageBackendType } = getConfig()
2020
const backend = backends.createStorageBackend(storageBackendType)
21-
const s3ClientFromBackend = backend.client
21+
const s3ClientFromBackend = backend.getClient()
2222

2323
describe('S3Locker', () => {
2424
let s3Client: S3Client

src/test/tus.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const oneChunkFile = fs.createReadStream(path.resolve(__dirname, 'assets', 'sadc
2323
const localServerAddress = 'http://127.0.0.1:8999'
2424

2525
const backend = backends.createStorageBackend(storageBackendType)
26-
const client = backend.client
26+
const client = backend.getClient()
2727

2828
describe('Tus multipart', () => {
2929
let db: StorageKnexDB

0 commit comments

Comments
 (0)