@@ -10,11 +10,11 @@ import {
1010 S3Client ,
1111} from '@aws-sdk/client-s3'
1212import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
13- import * as R from 'remeda'
1413
14+ import * as R from 'remeda'
1515import { z } from 'zod'
1616import { parseEnv , StorageDriver } from '~/lib/storage/defineStorageDriver'
17- import { createTempDir } from '~/lib/utils'
17+ import { createTempDir , streamToBuffer } from '~/lib/utils'
1818
1919export class S3StorageDriver extends StorageDriver {
2020 s3
@@ -95,20 +95,15 @@ export class S3StorageDriver extends StorageDriver {
9595 )
9696 }
9797
98- async uploadPart ( opts : {
99- objectName : string
100- uploadId : string
101- partNumber : number
102- data : ReadableStream
103- } ) {
98+ async uploadPart ( opts : { uploadId : string ; partNumber : number ; data : ReadableStream } ) {
10499 await this . s3 . send (
105100 new PutObjectCommand ( {
106101 Bucket : this . bucket ,
107102 Key : this . getUploadPartObjectName ( {
108103 uploadId : opts . uploadId ,
109104 partNumber : opts . partNumber ,
110105 } ) ,
111- Body : opts . data ,
106+ Body : await streamToBuffer ( opts . data ) ,
112107 } ) ,
113108 )
114109 }
@@ -121,41 +116,40 @@ export class S3StorageDriver extends StorageDriver {
121116 const tempDir = await createTempDir ( )
122117 const outputTempFilePath = path . join ( tempDir , 'output' )
123118
119+ await fs . writeFile ( outputTempFilePath , '' )
124120 const outputTempFile = await fs . open ( outputTempFilePath , 'r+' )
125121
126122 let currentChunk = 0
127123 for ( const partNumber of opts . partNumbers ) {
128124 const part = await this . s3 . send (
129125 new GetObjectCommand ( {
130126 Bucket : this . bucket ,
131- Key : this . addUploadFolderPrefix ( {
127+ Key : this . getUploadPartObjectName ( {
128+ partNumber,
132129 uploadId : opts . uploadId ,
133- objectName : this . getUploadPartObjectName ( {
134- partNumber,
135- uploadId : opts . uploadId ,
136- } ) ,
137130 } ) ,
138131 } ) ,
139132 )
140133
141134 if ( ! part . Body ) throw new Error ( `Part ${ partNumber } is missing` )
142135
143- const partStream = part . Body as ReadableStream
144-
136+ const partStream = part . Body . transformToWebStream ( )
145137 const bufferWriteStream = new WritableStream < Buffer > ( {
146138 async write ( chunk ) {
139+ const start = currentChunk
147140 currentChunk += chunk . length
148- await outputTempFile . write ( chunk , 0 , chunk . length , currentChunk )
141+ await outputTempFile . write ( chunk , 0 , chunk . length , start )
149142 } ,
150143 } )
151144 await partStream . pipeTo ( bufferWriteStream )
152145 }
153146
147+ const readStream = createReadStream ( outputTempFilePath )
154148 await this . s3 . send (
155149 new PutObjectCommand ( {
156150 Bucket : this . bucket ,
157151 Key : this . addBaseFolderPrefix ( opts . finalOutputObjectName ) ,
158- Body : await createReadStream ( outputTempFilePath ) ,
152+ Body : readStream ,
159153 } ) ,
160154 )
161155
0 commit comments