@@ -17,6 +17,8 @@ import (
1717 "code.gitea.io/gitea/modules/log"
1818 "code.gitea.io/gitea/modules/proxy"
1919 "code.gitea.io/gitea/modules/setting"
20+
21+ "golang.org/x/sync/errgroup"
2022)
2123
2224// HTTPClient is used to communicate with the LFS server
@@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
113115 return c .performOperation (ctx , objects , nil , callback )
114116}
115117
118+ // performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
116119func (c * HTTPClient ) performOperation (ctx context.Context , objects []Pointer , dc DownloadCallback , uc UploadCallback ) error {
117120 if len (objects ) == 0 {
118121 return nil
@@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
133136 return fmt .Errorf ("TransferAdapter not found: %s" , result .Transfer )
134137 }
135138
139+ errGroup , groupCtx := errgroup .WithContext (ctx )
140+ errGroup .SetLimit (setting .LFSClient .BatchOperationConcurrency )
136141 for _ , object := range result .Objects {
137- if object .Error != nil {
138- log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
139- if uc != nil {
140- if _ , err := uc (object .Pointer , object .Error ); err != nil {
141- return err
142- }
143- } else {
144- if err := dc (object .Pointer , nil , object .Error ); err != nil {
145- return err
146- }
147- }
148- continue
149- }
142+ errGroup .Go (func () error {
143+ return performSingleOperation (groupCtx , object , dc , uc , transferAdapter )
144+ })
145+ }
150146
151- if uc != nil {
152- if len (object .Actions ) == 0 {
153- log .Trace ("%v already present on server" , object .Pointer )
154- continue
155- }
147+ // only the first error is returned, preserving legacy behavior before concurrency
148+ return errGroup .Wait ()
149+ }
156150
157- link , ok := object . Actions [ " upload" ]
158- if ! ok {
159- log . Debug ( "%+v" , object )
160- return errors . New ( "missing action 'upload'" )
161- }
151+ // performSingleOperation performs an LFS upload or download operation on a single object
152+ func performSingleOperation ( ctx context. Context , object * ObjectResponse , dc DownloadCallback , uc UploadCallback , transferAdapter TransferAdapter ) error {
153+ // the response from a lfs batch api request for this specific object id contained an error
154+ if object . Error != nil {
155+ log . Trace ( "Error on object %v: %v" , object . Pointer , object . Error )
162156
163- content , err := uc (object .Pointer , nil )
164- if err != nil {
157+ // this was an 'upload' request inside the batch request
158+ if uc != nil {
159+ if _ , err := uc (object .Pointer , object .Error ); err != nil {
165160 return err
166161 }
167-
168- err = transferAdapter . Upload ( ctx , link , object . Pointer , content )
169- if err != nil {
162+ } else {
163+ // this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
164+ if err := dc ( object . Pointer , nil , object . Error ); err != nil {
170165 return err
171166 }
167+ }
168+ // if the callback returns no err, then the error could be ignored, and the operations should continue
169+ return nil
170+ }
172171
173- link , ok = object .Actions ["verify" ]
174- if ok {
175- if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
176- return err
177- }
178- }
179- } else {
180- link , ok := object .Actions ["download" ]
181- if ! ok {
182- // no actions block in response, try legacy response schema
183- link , ok = object .Links ["download" ]
184- }
185- if ! ok {
186- log .Debug ("%+v" , object )
187- return errors .New ("missing action 'download'" )
188- }
172+ // the response from an lfs batch api request contained necessary upload/download fields to act upon
173+ if uc != nil {
174+ if len (object .Actions ) == 0 {
175+ log .Trace ("%v already present on server" , object .Pointer )
176+ return nil
177+ }
189178
190- content , err := transferAdapter .Download (ctx , link )
191- if err != nil {
192- return err
193- }
179+ link , ok := object .Actions ["upload" ]
180+ if ! ok {
181+ return errors .New ("missing action 'upload'" )
182+ }
183+
184+ content , err := uc (object .Pointer , nil )
185+ if err != nil {
186+ return err
187+ }
194188
195- if err := dc (object .Pointer , content , nil ); err != nil {
189+ err = transferAdapter .Upload (ctx , link , object .Pointer , content )
190+ if err != nil {
191+ return err
192+ }
193+
194+ link , ok = object .Actions ["verify" ]
195+ if ok {
196+ if err := transferAdapter .Verify (ctx , link , object .Pointer ); err != nil {
196197 return err
197198 }
198199 }
199- }
200+ } else {
201+ link , ok := object .Actions ["download" ]
202+ if ! ok {
203+ // no actions block in response, try legacy response schema
204+ link , ok = object .Links ["download" ]
205+ }
206+ if ! ok {
207+ log .Debug ("%+v" , object )
208+ return errors .New ("missing action 'download'" )
209+ }
200210
211+ content , err := transferAdapter .Download (ctx , link )
212+ if err != nil {
213+ return err
214+ }
215+
216+ if err := dc (object .Pointer , content , nil ); err != nil {
217+ return err
218+ }
219+ }
201220 return nil
202221}
203222
0 commit comments