@@ -6,11 +6,16 @@ import (
6
6
"io"
7
7
"net/http"
8
8
"strconv"
9
+ "time"
9
10
10
11
"github.com/OpenListTeam/OpenList/v4/drivers/base"
11
12
"github.com/OpenListTeam/OpenList/v4/internal/driver"
12
13
"github.com/OpenListTeam/OpenList/v4/internal/model"
14
+ "github.com/OpenListTeam/OpenList/v4/internal/stream"
15
+ "github.com/OpenListTeam/OpenList/v4/pkg/errgroup"
16
+ "github.com/OpenListTeam/OpenList/v4/pkg/singleflight"
13
17
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
18
+ "github.com/avast/retry-go"
14
19
"github.com/go-resty/resty/v2"
15
20
)
16
21
@@ -69,18 +74,15 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F
69
74
}
70
75
71
76
func (d * Pan123 ) newUpload (ctx context.Context , upReq * UploadResp , file model.FileStreamer , up driver.UpdateProgress ) error {
72
- tmpF , err := file .CacheFullInTempFile ()
73
- if err != nil {
74
- return err
75
- }
76
77
// fetch s3 pre signed urls
77
78
size := file .GetSize ()
78
- chunkSize := min (size , 16 * utils .MB )
79
- chunkCount := int (size / chunkSize )
79
+ chunkSize := int64 (16 * utils .MB )
80
+ chunkCount := 1
81
+ if size > chunkSize {
82
+ chunkCount = int ((size + chunkSize - 1 ) / chunkSize )
83
+ }
80
84
lastChunkSize := size % chunkSize
81
- if lastChunkSize > 0 {
82
- chunkCount ++
83
- } else {
85
+ if lastChunkSize == 0 {
84
86
lastChunkSize = chunkSize
85
87
}
86
88
// only 1 batch is allowed
@@ -90,73 +92,103 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
90
92
batchSize = 10
91
93
getS3UploadUrl = d .getS3PreSignedUrls
92
94
}
95
+ ss , err := stream .NewStreamSectionReader (file , int (chunkSize ))
96
+ if err != nil {
97
+ return err
98
+ }
99
+
100
+ thread := min (int (chunkCount ), d .UploadThread )
101
+ threadG , uploadCtx := errgroup .NewOrderedGroupWithContext (ctx , thread ,
102
+ retry .Attempts (3 ),
103
+ retry .Delay (time .Second ),
104
+ retry .DelayType (retry .BackOffDelay ))
93
105
for i := 1 ; i <= chunkCount ; i += batchSize {
94
- if utils .IsCanceled (ctx ) {
95
- return ctx . Err ()
106
+ if utils .IsCanceled (uploadCtx ) {
107
+ break
96
108
}
97
109
start := i
98
110
end := min (i + batchSize , chunkCount + 1 )
99
- s3PreSignedUrls , err := getS3UploadUrl (ctx , upReq , start , end )
111
+ s3PreSignedUrls , err := getS3UploadUrl (uploadCtx , upReq , start , end )
100
112
if err != nil {
101
113
return err
102
114
}
103
115
// upload each chunk
104
- for j := start ; j < end ; j ++ {
105
- if utils .IsCanceled (ctx ) {
106
- return ctx . Err ()
116
+ for cur := start ; cur < end ; cur ++ {
117
+ if utils .IsCanceled (uploadCtx ) {
118
+ break
107
119
}
120
+ offset := int64 (cur - 1 ) * chunkSize
108
121
curSize := chunkSize
109
- if j == chunkCount {
122
+ if cur == chunkCount {
110
123
curSize = lastChunkSize
111
124
}
112
- err = d .uploadS3Chunk (ctx , upReq , s3PreSignedUrls , j , end , io .NewSectionReader (tmpF , chunkSize * int64 (j - 1 ), curSize ), curSize , false , getS3UploadUrl )
113
- if err != nil {
114
- return err
115
- }
116
- up (float64 (j ) * 100 / float64 (chunkCount ))
125
+ var reader * stream.SectionReader
126
+ var rateLimitedRd io.Reader
127
+ threadG .GoWithLifecycle (errgroup.Lifecycle {
128
+ Before : func (ctx context.Context ) error {
129
+ if reader == nil {
130
+ var err error
131
+ reader , err = ss .GetSectionReader (offset , curSize )
132
+ if err != nil {
133
+ return err
134
+ }
135
+ rateLimitedRd = driver .NewLimitedUploadStream (ctx , reader )
136
+ }
137
+ return nil
138
+ },
139
+ Do : func (ctx context.Context ) error {
140
+ reader .Seek (0 , io .SeekStart )
141
+ uploadUrl := s3PreSignedUrls .Data .PreSignedUrls [strconv .Itoa (cur )]
142
+ if uploadUrl == "" {
143
+ return fmt .Errorf ("upload url is empty, s3PreSignedUrls: %+v" , s3PreSignedUrls )
144
+ }
145
+ reader .Seek (0 , io .SeekStart )
146
+ req , err := http .NewRequestWithContext (ctx , http .MethodPut , uploadUrl , rateLimitedRd )
147
+ if err != nil {
148
+ return err
149
+ }
150
+ req .ContentLength = curSize
151
+ //req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10))
152
+ res , err := base .HttpClient .Do (req )
153
+ if err != nil {
154
+ return err
155
+ }
156
+ defer res .Body .Close ()
157
+ if res .StatusCode == http .StatusForbidden {
158
+ singleflight .AnyGroup .Do (fmt .Sprintf ("Pan123.newUpload_%p" , threadG ), func () (any , error ) {
159
+ newS3PreSignedUrls , err := getS3UploadUrl (ctx , upReq , cur , end )
160
+ if err != nil {
161
+ return nil , err
162
+ }
163
+ s3PreSignedUrls .Data .PreSignedUrls = newS3PreSignedUrls .Data .PreSignedUrls
164
+ return nil , nil
165
+ })
166
+ if err != nil {
167
+ return err
168
+ }
169
+ return fmt .Errorf ("upload s3 chunk %d failed, status code: %d" , cur , res .StatusCode )
170
+ }
171
+ if res .StatusCode != http .StatusOK {
172
+ body , err := io .ReadAll (res .Body )
173
+ if err != nil {
174
+ return err
175
+ }
176
+ return fmt .Errorf ("upload s3 chunk %d failed, status code: %d, body: %s" , cur , res .StatusCode , body )
177
+ }
178
+ progress := 10.0 + 85.0 * float64 (threadG .Success ())/ float64 (chunkCount )
179
+ up (progress )
180
+ return nil
181
+ },
182
+ After : func (err error ) {
183
+ ss .RecycleSectionReader (reader )
184
+ },
185
+ })
117
186
}
118
187
}
119
- // complete s3 upload
120
- return d .completeS3 (ctx , upReq , file , chunkCount > 1 )
121
- }
122
-
123
- func (d * Pan123 ) uploadS3Chunk (ctx context.Context , upReq * UploadResp , s3PreSignedUrls * S3PreSignedURLs , cur , end int , reader * io.SectionReader , curSize int64 , retry bool , getS3UploadUrl func (ctx context.Context , upReq * UploadResp , start int , end int ) (* S3PreSignedURLs , error )) error {
124
- uploadUrl := s3PreSignedUrls .Data .PreSignedUrls [strconv .Itoa (cur )]
125
- if uploadUrl == "" {
126
- return fmt .Errorf ("upload url is empty, s3PreSignedUrls: %+v" , s3PreSignedUrls )
127
- }
128
- req , err := http .NewRequest ("PUT" , uploadUrl , driver .NewLimitedUploadStream (ctx , reader ))
129
- if err != nil {
188
+ if err := threadG .Wait (); err != nil {
130
189
return err
131
190
}
132
- req = req .WithContext (ctx )
133
- req .ContentLength = curSize
134
- //req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10))
135
- res , err := base .HttpClient .Do (req )
136
- if err != nil {
137
- return err
138
- }
139
- defer res .Body .Close ()
140
- if res .StatusCode == http .StatusForbidden {
141
- if retry {
142
- return fmt .Errorf ("upload s3 chunk %d failed, status code: %d" , cur , res .StatusCode )
143
- }
144
- // refresh s3 pre signed urls
145
- newS3PreSignedUrls , err := getS3UploadUrl (ctx , upReq , cur , end )
146
- if err != nil {
147
- return err
148
- }
149
- s3PreSignedUrls .Data .PreSignedUrls = newS3PreSignedUrls .Data .PreSignedUrls
150
- // retry
151
- reader .Seek (0 , io .SeekStart )
152
- return d .uploadS3Chunk (ctx , upReq , s3PreSignedUrls , cur , end , reader , curSize , true , getS3UploadUrl )
153
- }
154
- if res .StatusCode != http .StatusOK {
155
- body , err := io .ReadAll (res .Body )
156
- if err != nil {
157
- return err
158
- }
159
- return fmt .Errorf ("upload s3 chunk %d failed, status code: %d, body: %s" , cur , res .StatusCode , body )
160
- }
161
- return nil
191
+ defer up (100 )
192
+ // complete s3 upload
193
+ return d .completeS3 (ctx , upReq , file , chunkCount > 1 )
162
194
}
0 commit comments