@@ -17,6 +17,8 @@ import (
1717	"code.gitea.io/gitea/modules/log" 
1818	"code.gitea.io/gitea/modules/proxy" 
1919	"code.gitea.io/gitea/modules/setting" 
20+ 
21+ 	"golang.org/x/sync/errgroup" 
2022)
2123
2224// HTTPClient is used to communicate with the LFS server 
@@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
113115	return  c .performOperation (ctx , objects , nil , callback )
114116}
115117
118+ // performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch 
116119func  (c  * HTTPClient ) performOperation (ctx  context.Context , objects  []Pointer , dc  DownloadCallback , uc  UploadCallback ) error  {
117120	if  len (objects ) ==  0  {
118121		return  nil 
@@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
133136		return  fmt .Errorf ("TransferAdapter not found: %s" , result .Transfer )
134137	}
135138
139+ 	errGroup , groupCtx  :=  errgroup .WithContext (ctx )
140+ 	errGroup .SetLimit (setting .LFSClient .BatchOperationConcurrency )
136141	for  _ , object  :=  range  result .Objects  {
137- 		if  object .Error  !=  nil  {
138- 			log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
139- 			if  uc  !=  nil  {
140- 				if  _ , err  :=  uc (object .Pointer , object .Error ); err  !=  nil  {
141- 					return  err 
142- 				}
143- 			} else  {
144- 				if  err  :=  dc (object .Pointer , nil , object .Error ); err  !=  nil  {
145- 					return  err 
146- 				}
147- 			}
148- 			continue 
149- 		}
142+ 		errGroup .Go (func () error  {
143+ 			return  performSingleOperation (groupCtx , object , dc , uc , transferAdapter )
144+ 		})
145+ 	}
150146
151- 		if  uc  !=  nil  {
152- 			if  len (object .Actions ) ==  0  {
153- 				log .Trace ("%v already present on server" , object .Pointer )
154- 				continue 
155- 			}
147+ 	// only the first error is returned, preserving legacy behavior before concurrency 
148+ 	return  errGroup .Wait ()
149+ }
156150
157- 			 link ,  ok   :=   object . Actions [ " upload" ] 
158- 			 if   ! ok  {
159- 				 log . Debug ( "%+v" ,  object ) 
160- 				 return   errors . New ( "missing action 'upload'" ) 
161- 			} 
151+ // performSingleOperation performs an LFS  upload or download operation on a single object 
152+ func   performSingleOperation ( ctx  context. Context ,  object   * ObjectResponse ,  dc   DownloadCallback ,  uc   UploadCallback ,  transferAdapter   TransferAdapter )  error  {
153+ 	// the response from a lfs batch api request for this specific  object id contained an error 
154+ 	if   object . Error   !=   nil  { 
155+ 		log . Trace ( "Error on object %v: %v" ,  object . Pointer ,  object . Error ) 
162156
163- 			content , err  :=  uc (object .Pointer , nil )
164- 			if  err  !=  nil  {
157+ 		// this was an 'upload' request inside the batch request 
158+ 		if  uc  !=  nil  {
159+ 			if  _ , err  :=  uc (object .Pointer , object .Error ); err  !=  nil  {
165160				return  err 
166161			}
167- 
168- 			err   =   transferAdapter . Upload ( ctx ,  link ,  object . Pointer ,  content ) 
169- 			if  err  !=  nil  {
162+ 		}  else  { 
163+ 			// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request 
164+ 			if  err  :=   dc ( object . Pointer ,  nil ,  object . Error );  err   !=  nil  {
170165				return  err 
171166			}
167+ 		}
168+ 		// if the callback returns no err, then the error could be ignored, and the operations should continue 
169+ 		return  nil 
170+ 	}
172171
173- 			link , ok  =  object .Actions ["verify" ]
174- 			if  ok  {
175- 				if  err  :=  transferAdapter .Verify (ctx , link , object .Pointer ); err  !=  nil  {
176- 					return  err 
177- 				}
178- 			}
179- 		} else  {
180- 			link , ok  :=  object .Actions ["download" ]
181- 			if  ! ok  {
182- 				// no actions block in response, try legacy response schema 
183- 				link , ok  =  object .Links ["download" ]
184- 			}
185- 			if  ! ok  {
186- 				log .Debug ("%+v" , object )
187- 				return  errors .New ("missing action 'download'" )
188- 			}
172+ 	// the response from an lfs batch api request contained necessary upload/download fields to act upon 
173+ 	if  uc  !=  nil  {
174+ 		if  len (object .Actions ) ==  0  {
175+ 			log .Trace ("%v already present on server" , object .Pointer )
176+ 			return  nil 
177+ 		}
189178
190- 			content , err  :=  transferAdapter .Download (ctx , link )
191- 			if  err  !=  nil  {
192- 				return  err 
193- 			}
179+ 		link , ok  :=  object .Actions ["upload" ]
180+ 		if  ! ok  {
181+ 			return  errors .New ("missing action 'upload'" )
182+ 		}
183+ 
184+ 		content , err  :=  uc (object .Pointer , nil )
185+ 		if  err  !=  nil  {
186+ 			return  err 
187+ 		}
194188
195- 			if  err  :=  dc (object .Pointer , content , nil ); err  !=  nil  {
189+ 		err  =  transferAdapter .Upload (ctx , link , object .Pointer , content )
190+ 		if  err  !=  nil  {
191+ 			return  err 
192+ 		}
193+ 
194+ 		link , ok  =  object .Actions ["verify" ]
195+ 		if  ok  {
196+ 			if  err  :=  transferAdapter .Verify (ctx , link , object .Pointer ); err  !=  nil  {
196197				return  err 
197198			}
198199		}
199- 	}
200+ 	} else  {
201+ 		link , ok  :=  object .Actions ["download" ]
202+ 		if  ! ok  {
203+ 			// no actions block in response, try legacy response schema 
204+ 			link , ok  =  object .Links ["download" ]
205+ 		}
206+ 		if  ! ok  {
207+ 			log .Debug ("%+v" , object )
208+ 			return  errors .New ("missing action 'download'" )
209+ 		}
200210
211+ 		content , err  :=  transferAdapter .Download (ctx , link )
212+ 		if  err  !=  nil  {
213+ 			return  err 
214+ 		}
215+ 
216+ 		if  err  :=  dc (object .Pointer , content , nil ); err  !=  nil  {
217+ 			return  err 
218+ 		}
219+ 	}
201220	return  nil 
202221}
203222
0 commit comments