@@ -17,6 +17,8 @@ import (
1717	"code.gitea.io/gitea/modules/log" 
1818	"code.gitea.io/gitea/modules/proxy" 
1919	"code.gitea.io/gitea/modules/setting" 
20+ 
21+ 	"golang.org/x/sync/errgroup" 
2022)
2123
2224// HTTPClient is used to communicate with the LFS server 
@@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
113115	return  c .performOperation (ctx , objects , nil , callback )
114116}
115117
118+ // performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch 
116119func  (c  * HTTPClient ) performOperation (ctx  context.Context , objects  []Pointer , dc  DownloadCallback , uc  UploadCallback ) error  {
117120	if  len (objects ) ==  0  {
118121		return  nil 
@@ -133,71 +136,91 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
133136		return  fmt .Errorf ("TransferAdapter not found: %s" , result .Transfer )
134137	}
135138
139+ 	errGroup , groupCtx  :=  errgroup .WithContext (ctx )
140+ 	errGroup .SetLimit (setting .LFSClient .BatchConcurrency )
136141	for  _ , object  :=  range  result .Objects  {
137- 		if  object .Error  !=  nil  {
138- 			log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
139- 			if  uc  !=  nil  {
140- 				if  _ , err  :=  uc (object .Pointer , object .Error ); err  !=  nil  {
141- 					return  err 
142- 				}
143- 			} else  {
144- 				if  err  :=  dc (object .Pointer , nil , object .Error ); err  !=  nil  {
145- 					return  err 
146- 				}
147- 			}
148- 			continue 
149- 		}
150- 
151- 		if  uc  !=  nil  {
152- 			if  len (object .Actions ) ==  0  {
153- 				log .Trace ("%v already present on server" , object .Pointer )
154- 				continue 
155- 			}
142+ 		errGroup .Go (func () error  {
143+ 			err  :=  performSingleOperation (groupCtx , object , dc , uc , transferAdapter )
144+ 			return  err 
145+ 		})
146+ 	}
156147
157- 			link , ok  :=  object .Actions ["upload" ]
158- 			if  ! ok  {
159- 				log .Debug ("%+v" , object )
160- 				return  errors .New ("missing action 'upload'" )
161- 			}
148+ 	// only the first error is returned, preserving legacy behavior before concurrency 
149+ 	return  errGroup .Wait ()
150+ }
162151
163- 			content , err  :=  uc (object .Pointer , nil )
164- 			if  err  !=  nil  {
165- 				return  err 
166- 			}
152+ // performSingleOperation performs an LFS upload or download operation on a single object 
153+ func  performSingleOperation (ctx  context.Context , object  * ObjectResponse , dc  DownloadCallback , uc  UploadCallback , transferAdapter  TransferAdapter ) error  {
154+ 	// the response from an lfs batch api request for this specific object id contained an error 
155+ 	if  object .Error  !=  nil  {
156+ 		log .Trace ("Error on object %v: %v" , object .Pointer , object .Error )
167157
168- 			err  =  transferAdapter .Upload (ctx , link , object .Pointer , content )
169- 			if  err  !=  nil  {
158+ 		// this was an 'upload' request inside the batch request 
159+ 		if  uc  !=  nil  {
160+ 			if  _ , err  :=  uc (object .Pointer , object .Error ); err  !=  nil  {
170161				return  err 
171162			}
172- 
173- 			link , ok  =  object .Actions ["verify" ]
174- 			if  ok  {
175- 				if  err  :=  transferAdapter .Verify (ctx , link , object .Pointer ); err  !=  nil  {
176- 					return  err 
177- 				}
178- 			}
179163		} else  {
180- 			link , ok  :=  object .Actions ["download" ]
181- 			if  ! ok  {
182- 				// no actions block in response, try legacy response schema 
183- 				link , ok  =  object .Links ["download" ]
184- 			}
185- 			if  ! ok  {
186- 				log .Debug ("%+v" , object )
187- 				return  errors .New ("missing action 'download'" )
164+ 			// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request 
165+ 			err  :=  dc (object .Pointer , nil , object .Error )
166+ 			if  errors .Is (object .Error , ErrObjectNotExist ) {
167+ 				log .Warn ("Ignoring missing upstream LFS object %-v: %v" , object .Pointer , err )
168+ 				return  nil 
188169			}
189170
190- 			content ,  err   :=   transferAdapter . Download ( ctx ,  link )
191- 			if  err   !=   nil  { 
192- 				 return   err 
193- 			 }
171+ 			// this was a 'download' request which was a legitimate error response from the batch api (not an http/404 )
172+ 			return  err 
173+ 		} 
174+ 	}
194175
195- 			if  err  :=  dc (object .Pointer , content , nil ); err  !=  nil  {
176+ 	// the response from an lfs batch api request contained necessary upload/download fields to act upon 
177+ 	if  uc  !=  nil  {
178+ 		if  len (object .Actions ) ==  0  {
179+ 			log .Trace ("%v already present on server" , object .Pointer )
180+ 			return  nil 
181+ 		}
182+ 
183+ 		link , ok  :=  object .Actions ["upload" ]
184+ 		if  ! ok  {
185+ 			return  errors .New ("missing action 'upload'" )
186+ 		}
187+ 
188+ 		content , err  :=  uc (object .Pointer , nil )
189+ 		if  err  !=  nil  {
190+ 			return  err 
191+ 		}
192+ 
193+ 		err  =  transferAdapter .Upload (ctx , link , object .Pointer , content )
194+ 		if  err  !=  nil  {
195+ 			return  err 
196+ 		}
197+ 
198+ 		link , ok  =  object .Actions ["verify" ]
199+ 		if  ok  {
200+ 			if  err  :=  transferAdapter .Verify (ctx , link , object .Pointer ); err  !=  nil  {
196201				return  err 
197202			}
198203		}
199- 	}
204+ 	} else  {
205+ 		link , ok  :=  object .Actions ["download" ]
206+ 		if  ! ok  {
207+ 			// no actions block in response, try legacy response schema 
208+ 			link , ok  =  object .Links ["download" ]
209+ 		}
210+ 		if  ! ok  {
211+ 			log .Debug ("%+v" , object )
212+ 			return  errors .New ("missing action 'download'" )
213+ 		}
200214
215+ 		content , err  :=  transferAdapter .Download (ctx , link )
216+ 		if  err  !=  nil  {
217+ 			return  err 
218+ 		}
219+ 
220+ 		if  err  :=  dc (object .Pointer , content , nil ); err  !=  nil  {
221+ 			return  err 
222+ 		}
223+ 	}
201224	return  nil 
202225}
203226
0 commit comments