@@ -17,6 +17,8 @@ import (
1717 "code.gitea.io/gitea/modules/log"
1818 "code.gitea.io/gitea/modules/proxy"
1919 "code.gitea.io/gitea/modules/setting"
20+
21+ "golang.org/x/sync/errgroup"
2022)
2123
2224// HTTPClient is used to communicate with the LFS server
@@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
113115 return c .performOperation (ctx , objects , nil , callback )
114116}
115117
118+ // performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
116119func (c * HTTPClient ) performOperation (ctx context.Context , objects []Pointer , dc DownloadCallback , uc UploadCallback ) error {
117120 if len (objects ) == 0 {
118121 return nil
@@ -133,71 +136,92 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
133136 return fmt .Errorf ("TransferAdapter not found: %s" , result .Transfer )
134137 }
135138
139+ errGroup , groupCtx := errgroup .WithContext (context .Background ())
136140 for _ , object := range result .Objects {
137- if object .Error != nil {
138- log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
139- if uc != nil {
140- if _ , err := uc (object .Pointer , object .Error ); err != nil {
141- return err
142- }
143- } else {
144- if err := dc (object .Pointer , nil , object .Error ); err != nil {
145- return err
146- }
147- }
148- continue
149- }
141+ func (groupCtx context.Context , object * ObjectResponse , dc DownloadCallback , uc UploadCallback , transferAdapter TransferAdapter ) {
142+ errGroup .Go (func () error {
143+ err := performSingleOperation (groupCtx , object , dc , uc , transferAdapter )
144+ return err
145+ })
146+ }(groupCtx , object , dc , uc , transferAdapter )
147+ }
150148
151- if uc != nil {
152- if len (object .Actions ) == 0 {
153- log .Trace ("%v already present on server" , object .Pointer )
154- continue
155- }
149+ // only the first error is returned, preserving legacy behavior before concurrency
150+ return errGroup .Wait ()
151+ }
156152
157- link , ok := object . Actions [ " upload" ]
158- if ! ok {
159- log . Debug ( "%+v" , object )
160- return errors . New ( "missing action 'upload'" )
161- }
153+ // performSingleOperation performs an LFS upload or download operation on a single object
154+ func performSingleOperation ( ctx context. Context , object * ObjectResponse , dc DownloadCallback , uc UploadCallback , transferAdapter TransferAdapter ) error {
155+ // the response from an lfs batch api request for this specific object id contained an error
156+ if object . Error != nil {
157+ log . Trace ( "Error on object %v: %v" , object . Pointer , object . Error )
162158
163- content , err := uc (object .Pointer , nil )
164- if err != nil {
159+ // this was an 'upload' request inside the batch request
160+ if uc != nil {
161+ if _ , err := uc (object .Pointer , object .Error ); err != nil {
165162 return err
166163 }
164+ }
167165
168- err = transferAdapter .Upload (ctx , link , object .Pointer , content )
169- if err != nil {
170- return err
171- }
166+ // this was not an 'upload' request inside the batch request, meaning in must be a 'download' request
167+ err := dc (object .Pointer , nil , object .Error )
168+ if errors .Is (object .Error , ErrObjectNotExist ) {
169+ log .Warn ("Ignoring missing upstream LFS object %-v: %v" , object .Pointer , err )
170+ return nil
171+ }
172172
173- link , ok = object .Actions ["verify" ]
174- if ok {
175- if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
176- return err
177- }
178- }
179- } else {
180- link , ok := object .Actions ["download" ]
181- if ! ok {
182- // no actions block in response, try legacy response schema
183- link , ok = object .Links ["download" ]
184- }
185- if ! ok {
186- log .Debug ("%+v" , object )
187- return errors .New ("missing action 'download'" )
188- }
173+ // this was a 'download' request which was a legitimate error response from the batch api (not an http/404)
174+ return err
175+ }
189176
190- content , err := transferAdapter .Download (ctx , link )
191- if err != nil {
192- return err
193- }
177+ // the response from an lfs batch api request contained necessary upload/download fields to act upon
178+ if uc != nil {
179+ if len (object .Actions ) == 0 {
180+ log .Trace ("%v already present on server" , object .Pointer )
181+ return nil
182+ }
183+
184+ link , ok := object .Actions ["upload" ]
185+ if ! ok {
186+ return errors .New ("missing action 'upload'" )
187+ }
188+
189+ content , err := uc (object .Pointer , nil )
190+ if err != nil {
191+ return err
192+ }
194193
195- if err := dc (object .Pointer , content , nil ); err != nil {
194+ err = transferAdapter .Upload (ctx , link , object .Pointer , content )
195+ if err != nil {
196+ return err
197+ }
198+
199+ link , ok = object .Actions ["verify" ]
200+ if ok {
201+ if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
196202 return err
197203 }
198204 }
199- }
205+ } else {
206+ link , ok := object .Actions ["download" ]
207+ if ! ok {
208+ // no actions block in response, try legacy response schema
209+ link , ok = object .Links ["download" ]
210+ }
211+ if ! ok {
212+ log .Debug ("%+v" , object )
213+ return errors .New ("missing action 'download'" )
214+ }
200215
216+ content , err := transferAdapter .Download (ctx , link )
217+ if err != nil {
218+ return err
219+ }
220+
221+ if err := dc (object .Pointer , content , nil ); err != nil {
222+ return err
223+ }
224+ }
201225 return nil
202226}
203227
0 commit comments