@@ -24,23 +24,18 @@ type getProducer struct {
2424 rangeFinder ranges.BlobRangeFinder
2525 sdk_log.Logger
2626
27- // Channel that represents blobs that have finished being process.
28- // This will be written to once a get object operation has completed regardless of error or success.
29- // This is used to notify the runner to re-check if any blobs are now ready to be retrieved.
30- blobDoneChannel <- chan struct {}
31-
32- // Used to track if we are done queuing blobs
33- continueQueuingBlobs bool
27+ // Conditional value that gets triggered when a blob has finished being transferred
28+ doneNotifier NotifyBlobDone
3429}
3530
3631func newGetProducer (
3732 jobMasterObjectList * ds3Models.MasterObjectList ,
3833 getObjects * []helperModels.GetObject ,
3934 queue * chan TransferOperation ,
4035 strategy * ReadTransferStrategy ,
41- blobDoneChannel <- chan struct {},
4236 client * ds3.Client ,
43- waitGroup * sync.WaitGroup ) * getProducer {
37+ waitGroup * sync.WaitGroup ,
38+ doneNotifier NotifyBlobDone ) * getProducer {
4439
4540 return & getProducer {
4641 JobMasterObjectList : jobMasterObjectList ,
@@ -54,8 +49,7 @@ func newGetProducer(
5449 deferredBlobQueue : NewBlobDescriptionQueue (),
5550 rangeFinder : ranges .NewBlobRangeFinder (getObjects ),
5651 Logger : client .Logger , //use the same logger as the client
57- blobDoneChannel : blobDoneChannel ,
58- continueQueuingBlobs : true ,
52+ doneNotifier : doneNotifier ,
5953 }
6054}
6155
@@ -75,15 +69,20 @@ func toReadObjectMap(getObjects *[]helperModels.GetObject) map[string]helperMode
7569}
7670
7771// Processes all the blobs in a chunk that are ready for transfer from BP
78- func (producer * getProducer ) processChunk (curChunk * ds3Models.Objects , bucketName string , jobId string ) {
72+ // Returns the number of blobs queued for process
73+ func (producer * getProducer ) processChunk (curChunk * ds3Models.Objects , bucketName string , jobId string ) int {
7974 producer .Debugf ("begin chunk processing %s" , curChunk .ChunkId )
8075
76+ processedCount := 0
8177 // transfer blobs that are ready, and queue those that are waiting for channel
8278 for _ , curObj := range curChunk .Objects {
8379 producer .Debugf ("queuing object in waiting to be processed %s offset=%d length=%d" , * curObj .Name , curObj .Offset , curObj .Length )
8480 blob := helperModels .NewBlobDescription (* curObj .Name , curObj .Offset , curObj .Length )
85- producer .queueBlobForTransfer (& blob , bucketName , jobId )
81+ if producer .queueBlobForTransfer (& blob , bucketName , jobId ) {
82+ processedCount ++
83+ }
8684 }
85+ return processedCount
8786}
8887
8988// Information required to perform a get operation of a blob with BP as data source and channelBuilder as destination
@@ -174,9 +173,10 @@ func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, bl
174173
175174// Attempts to transfer a single blob from the BP to the client. If the blob is not ready for transfer,
176175// then it is added to the waiting to transfer queue
177- func (producer * getProducer ) queueBlobForTransfer (blob * helperModels.BlobDescription , bucketName string , jobId string ) {
176+ // Returns whether or not the blob was queued for transfer
177+ func (producer * getProducer ) queueBlobForTransfer (blob * helperModels.BlobDescription , bucketName string , jobId string ) bool {
178178 if producer .processedBlobTracker .IsProcessed (* blob ) {
179- return
179+ return false // already been processed
180180 }
181181
182182 curReadObj := producer .readObjectMap [blob .Name ()]
@@ -185,13 +185,13 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
185185 // a fatal error happened on a previous blob for this file, skip processing
186186 producer .Warningf ("fatal error occurred while transferring previous blob on this file, skipping blob '%s' offset=%d length=%d" , blob .Name (), blob .Offset (), blob .Length ())
187187 producer .processedBlobTracker .MarkProcessed (* blob )
188- return
188+ return false // not going to process
189189 }
190190
191191 if ! curReadObj .ChannelBuilder .IsChannelAvailable (blob .Offset ()) {
192192 producer .Debugf ("channel is not currently available for getting blob '%s' offset=%d length=%d" , blob .Name (), blob .Offset (), blob .Length ())
193193 producer .deferredBlobQueue .Push (blob )
194- return
194+ return false // not ready to be processed
195195 }
196196
197197 producer .Debugf ("channel is available for getting blob '%s' offset=%d length=%d" , blob .Name (), blob .Offset (), blob .Length ())
@@ -212,11 +212,16 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
212212
213213 // Mark blob as processed
214214 producer .processedBlobTracker .MarkProcessed (* blob )
215+
216+ return true
215217}
216218
217219// Attempts to process all blobs whose channels were not available for transfer.
218220// Blobs whose channels are still not available are placed back on the queue.
219- func (producer * getProducer ) processWaitingBlobs (bucketName string , jobId string ) {
221+ // Returns the number of blobs queued for processing.
222+ func (producer * getProducer ) processWaitingBlobs (bucketName string , jobId string ) int {
223+ processedCount := 0
224+
220225 // attempt to process all blobs in waiting to be transferred
221226 waitingBlobs := producer .deferredBlobQueue .Size ()
222227 for i := 0 ; i < waitingBlobs ; i ++ {
@@ -228,87 +233,54 @@ func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string
228233 producer .Errorf ("failure during blob transfer '%s' at offset %d: %s" , curBlob .Name (), curBlob .Offset (), err .Error ())
229234 break
230235 }
231- producer .queueBlobForTransfer (curBlob , bucketName , jobId )
236+ if producer .queueBlobForTransfer (curBlob , bucketName , jobId ) {
237+ processedCount ++
238+ }
232239 }
240+ return processedCount
233241}
234242
235243// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
236244// Each transfer operation will retrieve one blob of content from the BP.
237245// Once all blobs have been queued to be transferred, the producer will finish, even if all operations have not been consumed yet.
238246func (producer * getProducer ) run () error {
247+ defer close (* producer .queue )
248+
239249 // determine number of blobs to be processed
240250 var totalBlobCount int64 = producer .totalBlobCount ()
241251 producer .Debugf ("job status totalBlobs=%d processedBlobs=%d" , totalBlobCount , producer .processedBlobTracker .NumberOfProcessedBlobs ())
242252
243- // initiate first set of blob transfers
244- err := producer .queueBlobsReadyForTransfer (totalBlobCount )
245- if err != nil {
246- return err
247- }
253+ // process all chunks and make sure all blobs are queued for transfer
254+ for producer .hasMoreToProcess (totalBlobCount ) {
255+ processedCount , err := producer .queueBlobsReadyForTransfer (totalBlobCount )
256+ if err != nil {
257+ return err
258+ }
248259
249- // wait for either a timer or for at least one blob to finish before attempting to queue more items for transfer
250- ticker := time .NewTicker (producer .strategy .BlobStrategy .delay ())
251- var fatalErr error
252- for {
253- select {
254- case _ , ok := <- producer .blobDoneChannel :
255- if ok {
256- // reset the timer
257- ticker .Stop ()
258- ticker = time .NewTicker (producer .strategy .BlobStrategy .delay ())
259-
260- err = producer .queueBlobsReadyForTransfer (totalBlobCount )
261- if err != nil {
262- // A fatal error has occurred, stop queuing blobs for processing and
263- // close processing queue to signal consumer we won't be sending any more blobs.
264- producer .continueQueuingBlobs = false
265- fatalErr = err
266- close (* producer .queue )
267- }
268- } else {
269- // The consumer closed the channel, signaling completion.
270- return fatalErr
271- }
272- case <- ticker .C :
273- err = producer .queueBlobsReadyForTransfer (totalBlobCount )
274- if err != nil {
275- // A fatal error has occurred, stop queuing blobs for processing and
276- // close processing queue to signal consumer we won't be sending any more blobs.
277- producer .continueQueuingBlobs = false
278- fatalErr = err
279- close (* producer .queue )
280- }
260+ // If the last operation processed blobs, then wait for something to finish
261+ if processedCount > 0 {
262+ producer .doneNotifier .Wait ()
263+ } else if producer .hasMoreToProcess (totalBlobCount ) {
264+ // nothing could be processed, cache is probably full, wait a bit before trying again
265+ time .Sleep (producer .strategy .BlobStrategy .delay ())
281266 }
282267 }
283- return fatalErr
268+ return nil
284269}
285270
286271func (producer * getProducer ) hasMoreToProcess (totalBlobCount int64 ) bool {
287272 return producer .processedBlobTracker .NumberOfProcessedBlobs () < totalBlobCount || producer .deferredBlobQueue .Size () > 0
288273}
289274
290- func (producer * getProducer ) queueBlobsReadyForTransfer (totalBlobCount int64 ) error {
291- if ! producer .continueQueuingBlobs {
292- // We've queued up all the blobs we are going to for this job.
293- return nil
294- }
295-
296- // check if there is anything left to be queued
297- if ! producer .hasMoreToProcess (totalBlobCount ) {
298- // Everything has been queued for processing.
299- producer .continueQueuingBlobs = false
300- // close processing queue to signal consumer we won't be sending any more blobs.
301- close (* producer .queue )
302- return nil
303- }
304-
275+ // Returns the number of blobs that have been queued for transfer
276+ func (producer * getProducer ) queueBlobsReadyForTransfer (totalBlobCount int64 ) (int , error ) {
305277 // Attempt to transfer waiting blobs
306- producer .processWaitingBlobs (* producer .JobMasterObjectList .BucketName , producer .JobMasterObjectList .JobId )
278+ processedCount := producer .processWaitingBlobs (* producer .JobMasterObjectList .BucketName , producer .JobMasterObjectList .JobId )
307279
308280 // Check if we need to query the BP for allocated blobs, or if we already know everything is allocated.
309281 if int64 (producer .deferredBlobQueue .Size ()) + producer .processedBlobTracker .NumberOfProcessedBlobs () >= totalBlobCount {
310282 // Everything is already allocated, no need to query BP for allocated chunks
311- return nil
283+ return processedCount , nil
312284 }
313285
314286 // Get the list of available chunks that the server can receive. The server may
@@ -318,7 +290,7 @@ func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) er
318290 chunksReadyResponse , err := producer .client .GetJobChunksReadyForClientProcessingSpectraS3 (chunksReady )
319291 if err != nil {
320292 producer .Errorf ("unrecoverable error: %v" , err )
321- return err
293+ return processedCount , err
322294 }
323295
324296 // Check to see if any chunks can be processed
@@ -327,14 +299,10 @@ func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) er
327299 // Loop through all the chunks that are available for processing, and send
328300 // the files that are contained within them.
329301 for _ , curChunk := range chunksReadyResponse .MasterObjectList .Objects {
330- producer .processChunk (& curChunk , * chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId )
302+ processedCount += producer .processChunk (& curChunk , * chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId )
331303 }
332- } else {
333- // When no chunks are returned we need to sleep to allow for cache space to
334- // be freed.
335- producer .strategy .BlobStrategy .delay ()
336304 }
337- return nil
305+ return processedCount , nil
338306}
339307
340308// Determines the number of blobs to be transferred.
0 commit comments