11package _123
22
33import (
4+ "bytes"
45 "context"
56 "fmt"
67 "io"
@@ -69,25 +70,23 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F
6970}
7071
7172func (d * Pan123 ) newUpload (ctx context.Context , upReq * UploadResp , file model.FileStreamer , up driver.UpdateProgress ) error {
72- chunkSize := int64 (16 * utils .MB )
7373 // fetch s3 pre signed urls
7474 size := file .GetSize ()
75+ chunkSize := min (size , 16 * utils .MB )
7576 chunkCount := int (size / chunkSize )
7677 lastChunkSize := size % chunkSize
7778 if lastChunkSize > 0 {
7879 chunkCount ++
79- } else {
80- lastChunkSize = chunkSize
8180 }
8281 // only 1 batch is allowed
83- isMultipart := chunkCount > 1
8482 batchSize := 1
8583 getS3UploadUrl := d .getS3Auth
86- if isMultipart {
84+ if chunkCount > 1 {
8785 batchSize = 10
8886 getS3UploadUrl = d .getS3PreSignedUrls
8987 }
9088 limited := driver .NewLimitedUploadStream (ctx , file )
89+ buf := make ([]byte , chunkSize )
9190 for i := 1 ; i <= chunkCount ; i += batchSize {
9291 if utils .IsCanceled (ctx ) {
9392 return ctx .Err ()
@@ -103,10 +102,17 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
103102 if utils .IsCanceled (ctx ) {
104103 return ctx .Err ()
105104 }
106- if j == chunkCount {
107- chunkSize = lastChunkSize
105+ if j == chunkCount && lastChunkSize > 0 {
106+ buf = buf [: lastChunkSize ]
108107 }
109- err = d .uploadS3Chunk (ctx , upReq , s3PreSignedUrls , j , end , io .LimitReader (limited , chunkSize ), chunkSize , false , getS3UploadUrl )
108+ n , err := io .ReadFull (limited , buf )
109+ if err == io .ErrUnexpectedEOF {
110+ return fmt .Errorf ("upload s3 chunk %d failed, can't read data, expected=%d, got=%d" , j , len (buf ), n )
111+ }
112+ if err != nil {
113+ return err
114+ }
115+ err = d .uploadS3Chunk (ctx , upReq , s3PreSignedUrls , j , end , bytes .NewReader (buf ), int64 (n ), false , getS3UploadUrl )
110116 if err != nil {
111117 return err
112118 }
@@ -117,7 +123,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
117123 return d .completeS3 (ctx , upReq , file , chunkCount > 1 )
118124}
119125
120- func (d * Pan123 ) uploadS3Chunk (ctx context.Context , upReq * UploadResp , s3PreSignedUrls * S3PreSignedURLs , cur , end int , reader io .Reader , curSize int64 , retry bool , getS3UploadUrl func (ctx context.Context , upReq * UploadResp , start int , end int ) (* S3PreSignedURLs , error )) error {
126+ func (d * Pan123 ) uploadS3Chunk (ctx context.Context , upReq * UploadResp , s3PreSignedUrls * S3PreSignedURLs , cur , end int , reader * bytes .Reader , curSize int64 , retry bool , getS3UploadUrl func (ctx context.Context , upReq * UploadResp , start int , end int ) (* S3PreSignedURLs , error )) error {
121127 uploadUrl := s3PreSignedUrls .Data .PreSignedUrls [strconv .Itoa (cur )]
122128 if uploadUrl == "" {
123129 return fmt .Errorf ("upload url is empty, s3PreSignedUrls: %+v" , s3PreSignedUrls )
@@ -145,6 +151,7 @@ func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSign
145151 }
146152 s3PreSignedUrls .Data .PreSignedUrls = newS3PreSignedUrls .Data .PreSignedUrls
147153 // retry
154+ reader .Seek (0 , io .SeekStart )
148155 return d .uploadS3Chunk (ctx , upReq , s3PreSignedUrls , cur , end , reader , curSize , true , getS3UploadUrl )
149156 }
150157 if res .StatusCode != http .StatusOK {
0 commit comments