88 "github.com/SpectraLogic/ds3_go_sdk/sdk_log"
99 "io"
1010 "sync"
11+ "time"
1112)
1213
1314type getProducer struct {
@@ -22,9 +23,25 @@ type getProducer struct {
2223 deferredBlobQueue BlobDescriptionQueue // queue of blobs whose channels are not yet ready for transfer
2324 rangeFinder ranges.BlobRangeFinder
2425 sdk_log.Logger
26+
27+ // Channel that represents blobs that have finished being process.
28+ // This will be written to once a get object operation has completed regardless of error or success.
29+ // This is used to notify the runner to re-check if any blobs are now ready to be retrieved.
30+ blobDoneChannel <- chan struct {}
31+
32+ // Used to track if we are done queuing blobs
33+ continueQueuingBlobs bool
2534}
2635
27- func newGetProducer (jobMasterObjectList * ds3Models.MasterObjectList , getObjects * []helperModels.GetObject , queue * chan TransferOperation , strategy * ReadTransferStrategy , client * ds3.Client , waitGroup * sync.WaitGroup ) * getProducer {
36+ func newGetProducer (
37+ jobMasterObjectList * ds3Models.MasterObjectList ,
38+ getObjects * []helperModels.GetObject ,
39+ queue * chan TransferOperation ,
40+ strategy * ReadTransferStrategy ,
41+ blobDoneChannel <- chan struct {},
42+ client * ds3.Client ,
43+ waitGroup * sync.WaitGroup ) * getProducer {
44+
2845 return & getProducer {
2946 JobMasterObjectList : jobMasterObjectList ,
3047 GetObjects : getObjects ,
@@ -37,6 +54,8 @@ func newGetProducer(jobMasterObjectList *ds3Models.MasterObjectList, getObjects
3754 deferredBlobQueue : NewBlobDescriptionQueue (),
3855 rangeFinder : ranges .NewBlobRangeFinder (getObjects ),
3956 Logger : client .Logger , //use the same logger as the client
57+ blobDoneChannel : blobDoneChannel ,
58+ continueQueuingBlobs : true ,
4059 }
4160}
4261
@@ -217,40 +236,103 @@ func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string
217236// Each transfer operation will retrieve one blob of content from the BP.
218237// Once all blobs have been queued to be transferred, the producer will finish, even if all operations have not been consumed yet.
219238func (producer * getProducer ) run () error {
220- defer close (* producer .queue )
221-
222239 // determine number of blobs to be processed
223240 var totalBlobCount int64 = producer .totalBlobCount ()
224241 producer .Debugf ("job status totalBlobs=%d processedBlobs=%d" , totalBlobCount , producer .processedBlobTracker .NumberOfProcessedBlobs ())
225242
226- // process all chunks and make sure all blobs are queued for transfer
227- for producer .processedBlobTracker .NumberOfProcessedBlobs () < totalBlobCount || producer .deferredBlobQueue .Size () > 0 {
228- // Get the list of available chunks that the server can receive. The server may
229- // not be able to receive everything, so not all chunks will necessarily be
230- // returned
231- chunksReady := ds3Models .NewGetJobChunksReadyForClientProcessingSpectraS3Request (producer .JobMasterObjectList .JobId )
232- chunksReadyResponse , err := producer .client .GetJobChunksReadyForClientProcessingSpectraS3 (chunksReady )
233- if err != nil {
234- producer .Errorf ("unrecoverable error: %v" , err )
235- return err
236- }
243+ // initiate first set of blob transfers
244+ err := producer .queueBlobsReadyForTransfer (totalBlobCount )
245+ if err != nil {
246+ return err
247+ }
237248
238- // Check to see if any chunks can be processed
239- numberOfChunks := len (chunksReadyResponse .MasterObjectList .Objects )
240- if numberOfChunks > 0 {
241- // Loop through all the chunks that are available for processing, and send
242- // the files that are contained within them.
243- for _ , curChunk := range chunksReadyResponse .MasterObjectList .Objects {
244- producer .processChunk (& curChunk , * chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId )
249+ // wait for either a timer or for at least one blob to finish before attempting to queue more items for transfer
250+ ticker := time .NewTicker (producer .strategy .BlobStrategy .delay ())
251+ var fatalErr error
252+ for {
253+ select {
254+ case _ , ok := <- producer .blobDoneChannel :
255+ if ok {
256+ // reset the timer
257+ ticker .Stop ()
258+ ticker = time .NewTicker (producer .strategy .BlobStrategy .delay ())
259+
260+ err = producer .queueBlobsReadyForTransfer (totalBlobCount )
261+ if err != nil {
262+ // A fatal error has occurred, stop queuing blobs for processing and
263+ // close processing queue to signal consumer we won't be sending any more blobs.
264+ producer .continueQueuingBlobs = false
265+ fatalErr = err
266+ close (* producer .queue )
267+ }
268+ } else {
269+ // The consumer closed the channel, signaling completion.
270+ return fatalErr
271+ }
272+ case <- ticker .C :
273+ err = producer .queueBlobsReadyForTransfer (totalBlobCount )
274+ if err != nil {
275+ // A fatal error has occurred, stop queuing blobs for processing and
276+ // close processing queue to signal consumer we won't be sending any more blobs.
277+ producer .continueQueuingBlobs = false
278+ fatalErr = err
279+ close (* producer .queue )
245280 }
281+ }
282+ }
283+ return fatalErr
284+ }
285+
286+ func (producer * getProducer ) hasMoreToProcess (totalBlobCount int64 ) bool {
287+ return producer .processedBlobTracker .NumberOfProcessedBlobs () < totalBlobCount || producer .deferredBlobQueue .Size () > 0
288+ }
289+
290+ func (producer * getProducer ) queueBlobsReadyForTransfer (totalBlobCount int64 ) error {
291+ if ! producer .continueQueuingBlobs {
292+ // We've queued up all the blobs we are going to for this job.
293+ return nil
294+ }
295+
296+ // check if there is anything left to be queued
297+ if ! producer .hasMoreToProcess (totalBlobCount ) {
298+ // Everything has been queued for processing.
299+ producer .continueQueuingBlobs = false
300+ // close processing queue to signal consumer we won't be sending any more blobs.
301+ close (* producer .queue )
302+ return nil
303+ }
304+
305+ // Attempt to transfer waiting blobs
306+ producer .processWaitingBlobs (* producer .JobMasterObjectList .BucketName , producer .JobMasterObjectList .JobId )
307+
308+ // Check if we need to query the BP for allocated blobs, or if we already know everything is allocated.
309+ if int64 (producer .deferredBlobQueue .Size ()) + producer .processedBlobTracker .NumberOfProcessedBlobs () >= totalBlobCount {
310+ // Everything is already allocated, no need to query BP for allocated chunks
311+ return nil
312+ }
313+
314+ // Get the list of available chunks that the server can receive. The server may
315+ // not be able to receive everything, so not all chunks will necessarily be
316+ // returned
317+ chunksReady := ds3Models .NewGetJobChunksReadyForClientProcessingSpectraS3Request (producer .JobMasterObjectList .JobId )
318+ chunksReadyResponse , err := producer .client .GetJobChunksReadyForClientProcessingSpectraS3 (chunksReady )
319+ if err != nil {
320+ producer .Errorf ("unrecoverable error: %v" , err )
321+ return err
322+ }
246323
247- // Attempt to transfer waiting blobs
248- producer .processWaitingBlobs (* chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId )
249- } else {
250- // When no chunks are returned we need to sleep to allow for cache space to
251- // be freed.
252- producer .strategy .BlobStrategy .delay ()
324+ // Check to see if any chunks can be processed
325+ numberOfChunks := len (chunksReadyResponse .MasterObjectList .Objects )
326+ if numberOfChunks > 0 {
327+ // Loop through all the chunks that are available for processing, and send
328+ // the files that are contained within them.
329+ for _ , curChunk := range chunksReadyResponse .MasterObjectList .Objects {
330+ producer .processChunk (& curChunk , * chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId )
253331 }
332+ } else {
333+ // When no chunks are returned we need to sleep to allow for cache space to
334+ // be freed.
335+ producer .strategy .BlobStrategy .delay ()
254336 }
255337 return nil
256338}
0 commit comments