Skip to content

Commit c8e042d

Browse files
authored
fix: support s3 chunked upload (#684)
1 parent c3cbf2e commit c8e042d

File tree

8 files changed

+626
-25
lines changed

8 files changed

+626
-25
lines changed

src/http/plugins/signature-v4.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import { ERRORS } from '@internal/errors'
77

88
import { getConfig } from '../../config'
99
import { MultipartFile, MultipartValue } from '@fastify/multipart'
10+
import {
11+
ChunkSignatureV4Parser,
12+
V4StreamingAlgorithm,
13+
} from '@storage/protocols/s3/signature-v4-stream'
1014

1115
const {
1216
anonKeyAsync,
@@ -27,6 +31,7 @@ type AWSRequest = FastifyRequest<{ Querystring: { 'X-Amz-Credential'?: string }
2731
declare module 'fastify' {
2832
interface FastifyRequest {
2933
multiPartFileStream?: MultipartFile
34+
streamingSignatureV4?: ChunkSignatureV4Parser
3035
}
3136
}
3237

@@ -80,6 +85,15 @@ export const signatureV4 = fastifyPlugin(
8085
request.jwt = token
8186
request.jwtPayload = payload
8287
request.owner = payload.sub
88+
89+
if (SignatureV4.isChunkedUpload(request.headers)) {
90+
request.streamingSignatureV4 = createStreamingSignatureV4Parser({
91+
signatureV4,
92+
streamAlgorithm: request.headers['x-amz-content-sha256'] as V4StreamingAlgorithm,
93+
clientSignature,
94+
trailers: request.headers['x-amz-trailer'] as string,
95+
})
96+
}
8397
return
8498
}
8599

@@ -92,6 +106,15 @@ export const signatureV4 = fastifyPlugin(
92106
request.jwt = jwt
93107
request.jwtPayload = claims
94108
request.owner = claims.sub
109+
110+
if (SignatureV4.isChunkedUpload(request.headers)) {
111+
request.streamingSignatureV4 = createStreamingSignatureV4Parser({
112+
signatureV4,
113+
streamAlgorithm: request.headers['x-amz-content-sha256'] as V4StreamingAlgorithm,
114+
clientSignature,
115+
trailers: request.headers['x-amz-trailer'] as string,
116+
})
117+
}
95118
})
96119
},
97120
{ name: 'auth-signature-v4' }
@@ -202,3 +225,43 @@ async function createServerSignature(tenantId: string, clientSignature: ClientSi
202225

203226
return { signature, claims: undefined, token: await serviceKeyAsync }
204227
}
228+
229+
interface CreateSignatureV3ParserOpts {
230+
signatureV4: SignatureV4
231+
streamAlgorithm: string
232+
clientSignature: ClientSignature
233+
trailers: string
234+
}
235+
236+
function createStreamingSignatureV4Parser(opts: CreateSignatureV3ParserOpts) {
237+
const algorithm = opts.streamAlgorithm as V4StreamingAlgorithm
238+
const trailers = opts.trailers
239+
240+
const chunkedSignatureV4 = new ChunkSignatureV4Parser({
241+
maxChunkSize: 8 * 1024 * 1024,
242+
maxHeaderLength: 256,
243+
streamingAlgorithm: algorithm,
244+
trailerHeaderNames: trailers.split(','),
245+
})
246+
247+
chunkedSignatureV4.on(
248+
'signatureReadyForVerification',
249+
(signature: string, _: number, hash: string, previousSign) => {
250+
const isValid = opts.signatureV4.validateChunkSignature(
251+
algorithm,
252+
opts.clientSignature,
253+
hash,
254+
signature,
255+
previousSign || opts.clientSignature.signature
256+
)
257+
258+
if (!isValid) {
259+
throw ERRORS.SignatureDoesNotMatch(
260+
'The request signature we calculated does not match the signature you provided. Check your key and signing method.'
261+
)
262+
}
263+
}
264+
)
265+
266+
return chunkedSignatureV4
267+
}

src/http/routes/s3/commands/put-object.ts

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { fileUploadFromRequest, getStandardMaxFileSizeLimit } from '@storage/upl
66
import { ERRORS } from '@internal/errors'
77
import { pipeline } from 'stream/promises'
88
import { ByteLimitTransformStream } from '@storage/protocols/s3/byte-limit-stream'
9-
import stream from 'stream'
9+
import stream, { PassThrough, Readable } from 'stream'
1010

1111
const PutObjectInput = {
1212
summary: 'Put Object',
@@ -35,7 +35,6 @@ const PutObjectInput = {
3535
'content-encoding': { type: 'string' },
3636
expires: { type: 'string' },
3737
},
38-
required: ['content-length'],
3938
},
4039
} as const
4140

@@ -80,18 +79,25 @@ export default function PutObject(s3Router: S3Router) {
8079
fileSizeLimit: bucket.file_size_limit || undefined,
8180
})
8281

83-
return s3Protocol.putObject(
84-
{
85-
Body: uploadRequest.body,
86-
Bucket: req.Params.Bucket,
87-
Key: key,
88-
CacheControl: uploadRequest.cacheControl,
89-
ContentType: uploadRequest.mimeType,
90-
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
91-
ContentEncoding: req.Headers?.['content-encoding'],
92-
Metadata: metadata,
93-
},
94-
{ signal: ctx.signals.body, isTruncated: uploadRequest.isTruncated }
82+
return pipeline(
83+
uploadRequest.body,
84+
new ByteLimitTransformStream(uploadRequest.maxFileSize),
85+
ctx.req.streamingSignatureV4 || new PassThrough(),
86+
async (fileStream) => {
87+
return s3Protocol.putObject(
88+
{
89+
Body: fileStream as Readable,
90+
Bucket: req.Params.Bucket,
91+
Key: key,
92+
CacheControl: uploadRequest.cacheControl,
93+
ContentType: uploadRequest.mimeType,
94+
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
95+
ContentEncoding: req.Headers?.['content-encoding'],
96+
Metadata: metadata,
97+
},
98+
{ signal: ctx.signals.body, isTruncated: uploadRequest.isTruncated }
99+
)
100+
}
95101
)
96102
}
97103
)

src/http/routes/s3/commands/upload-part.ts

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { S3ProtocolHandler } from '@storage/protocols/s3/s3-handler'
22
import { S3Router } from '../router'
33
import { ROUTE_OPERATIONS } from '../../operations'
4+
import { pipeline } from 'stream/promises'
5+
import { PassThrough, Readable } from 'stream'
46

57
const UploadPartInput = {
68
summary: 'Upload Part',
@@ -25,11 +27,11 @@ const UploadPartInput = {
2527
properties: {
2628
host: { type: 'string' },
2729
'x-amz-content-sha256': { type: 'string' },
30+
'x-amz-decoded-content-length': { type: 'integer' },
2831
'x-amz-date': { type: 'string' },
2932
'content-type': { type: 'string' },
3033
'content-length': { type: 'integer' },
3134
},
32-
required: ['content-length'],
3335
},
3436
} as const
3537

@@ -44,14 +46,39 @@ export default function UploadPart(s3Router: S3Router) {
4446
(req, ctx) => {
4547
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)
4648

47-
return s3Protocol.uploadPart({
48-
Body: ctx.req.raw,
49-
UploadId: req.Querystring?.uploadId,
50-
Bucket: req.Params.Bucket,
51-
Key: req.Params['*'],
52-
PartNumber: req.Querystring?.partNumber,
53-
ContentLength: req.Headers?.['content-length'],
54-
})
49+
if (ctx.req.streamingSignatureV4) {
50+
const passThrough = new PassThrough()
51+
ctx.req.raw.pipe(passThrough)
52+
ctx.req.raw.on('error', (err) => {
53+
passThrough.destroy(err)
54+
})
55+
56+
return pipeline(passThrough, ctx.req.streamingSignatureV4, async (body) => {
57+
return s3Protocol.uploadPart(
58+
{
59+
Body: body as Readable,
60+
UploadId: req.Querystring?.uploadId,
61+
Bucket: req.Params.Bucket,
62+
Key: req.Params['*'],
63+
ContentLength: req.Headers?.['x-amz-decoded-content-length'],
64+
PartNumber: req.Querystring?.partNumber,
65+
},
66+
{ signal: ctx.req.signals.body.signal }
67+
)
68+
})
69+
}
70+
71+
return s3Protocol.uploadPart(
72+
{
73+
Body: ctx.req.raw,
74+
UploadId: req.Querystring?.uploadId,
75+
Bucket: req.Params.Bucket,
76+
Key: req.Params['*'],
77+
PartNumber: req.Querystring?.partNumber,
78+
ContentLength: req.Headers?.['content-length'],
79+
},
80+
{ signal: ctx.req.signals.body.signal }
81+
)
5582
}
5683
)
5784
}

src/storage/protocols/s3/s3-handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ export class S3ProtocolHandler {
535535
* @param command
536536
* @param signal
537537
*/
538-
async uploadPart(command: UploadPartCommandInput, signal?: AbortSignal) {
538+
async uploadPart(command: UploadPartCommandInput, { signal }: { signal?: AbortSignal }) {
539539
if (signal?.aborted) {
540540
throw ERRORS.AbortedTerminate('UploadPart aborted')
541541
}

0 commit comments

Comments
 (0)