11package helpers
22
33import (
4- ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
54 "github.com/SpectraLogic/ds3_go_sdk/ds3"
6- "sync"
7- "io"
8- "github.com/SpectraLogic/ds3_go_sdk/helpers/ranges"
5+ ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
96 helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models"
7+ "github.com/SpectraLogic/ds3_go_sdk/helpers/ranges"
108 "github.com/SpectraLogic/ds3_go_sdk/sdk_log"
9+ "io"
10+ "sync"
1111)
1212
1313type getProducer struct {
@@ -56,14 +56,14 @@ func toReadObjectMap(getObjects *[]helperModels.GetObject) map[string]helperMode
5656}
5757
5858// Processes all the blobs in a chunk that are ready for transfer from BP
59- func (producer * getProducer ) processChunk (curChunk * ds3Models.Objects , bucketName string , jobId string , aggErr * ds3Models. AggregateError ) {
59+ func (producer * getProducer ) processChunk (curChunk * ds3Models.Objects , bucketName string , jobId string ) {
6060 producer .Debugf ("begin chunk processing %s" , curChunk .ChunkId )
6161
6262 // transfer blobs that are ready, and queue those that are waiting for channel
6363 for _ , curObj := range curChunk .Objects {
6464 producer .Debugf ("queuing object in waiting to be processed %s offset=%d length=%d" , * curObj .Name , curObj .Offset , curObj .Length )
6565 blob := helperModels .NewBlobDescription (* curObj .Name , curObj .Offset , curObj .Length )
66- producer .queueBlobForTransfer (& blob , bucketName , jobId , aggErr )
66+ producer .queueBlobForTransfer (& blob , bucketName , jobId )
6767 }
6868}
6969
@@ -76,7 +76,7 @@ type getObjectInfo struct {
7676}
7777
7878// Creates the transfer operation that will perform the data retrieval of the specified blob from BP
79- func (producer * getProducer ) transferOperationBuilder (info getObjectInfo , aggErr * ds3Models. AggregateError ) TransferOperation {
79+ func (producer * getProducer ) transferOperationBuilder (info getObjectInfo ) TransferOperation {
8080 return func () {
8181 // has this file fatally errored while transferring a different blob?
8282 if info .channelBuilder .HasFatalError () {
@@ -98,7 +98,7 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
9898
9999 getObjResponse , err := producer .client .GetObject (getObjRequest )
100100 if err != nil {
101- aggErr . Append ( err )
101+ producer . strategy . Listeners . Errored ( info . blob . Name (), err )
102102 info .channelBuilder .SetFatalError (err )
103103 producer .Errorf ("unable to retrieve object '%s' at offset %d: %s" , info .blob .Name (), info .blob .Offset (), err .Error ())
104104 return
@@ -111,15 +111,15 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
111111 if len (blobRanges ) == 0 {
112112 writer , err := info .channelBuilder .GetChannel (info .blob .Offset ())
113113 if err != nil {
114- aggErr . Append ( err )
114+ producer . strategy . Listeners . Errored ( info . blob . Name (), err )
115115 info .channelBuilder .SetFatalError (err )
116116 producer .Errorf ("unable to read contents of object '%s' at offset '%d': %s" , info .blob .Name (), info .blob .Offset (), err .Error ())
117117 return
118118 }
119119 defer info .channelBuilder .OnDone (writer )
120120 _ , err = io .Copy (writer , getObjResponse .Content ) //copy all content from response reader to destination writer
121121 if err != nil {
122- aggErr . Append ( err )
122+ producer . strategy . Listeners . Errored ( info . blob . Name (), err )
123123 info .channelBuilder .SetFatalError (err )
124124 producer .Errorf ("unable to copy content of object '%s' at offset '%d' from source to destination: %s" , info .blob .Name (), info .blob .Offset (), err .Error ())
125125 }
@@ -130,7 +130,7 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
130130 for _ , r := range blobRanges {
131131 err := writeRangeToDestination (info .channelBuilder , r , getObjResponse .Content )
132132 if err != nil {
133- aggErr . Append ( err )
133+ producer . strategy . Listeners . Errored ( info . blob . Name (), err )
134134 info .channelBuilder .SetFatalError (err )
135135 producer .Errorf ("unable to write to destination channel for object '%s' with range '%v': %s" , info .blob .Name (), r , err .Error ())
136136 }
@@ -153,7 +153,7 @@ func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, bl
153153
154154// Attempts to transfer a single blob from the BP to the client. If the blob is not ready for transfer,
155155// then it is added to the waiting to transfer queue
156- func (producer * getProducer ) queueBlobForTransfer (blob * helperModels.BlobDescription , bucketName string , jobId string , aggErr * ds3Models. AggregateError ) {
156+ func (producer * getProducer ) queueBlobForTransfer (blob * helperModels.BlobDescription , bucketName string , jobId string ) {
157157 if producer .processedBlobTracker .IsProcessed (* blob ) {
158158 return
159159 }
@@ -183,7 +183,7 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
183183 jobId : jobId ,
184184 }
185185
186- var transfer TransferOperation = producer .transferOperationBuilder (objInfo , aggErr )
186+ var transfer TransferOperation = producer .transferOperationBuilder (objInfo )
187187
188188 // Increment wait group, and enqueue transfer operation
189189 producer .waitGroup .Add (1 )
@@ -195,7 +195,7 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
195195
196196// Attempts to process all blobs whose channels were not available for transfer.
197197// Blobs whose channels are still not available are placed back on the queue.
198- func (producer * getProducer ) processWaitingBlobs (bucketName string , jobId string , aggErr * ds3Models. AggregateError ) {
198+ func (producer * getProducer ) processWaitingBlobs (bucketName string , jobId string ) {
199199 // attempt to process all blobs in waiting to be transferred
200200 waitingBlobs := producer .deferredBlobQueue .Size ()
201201 for i := 0 ; i < waitingBlobs ; i ++ {
@@ -204,18 +204,17 @@ func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string
204204 producer .Debugf ("attempting to process '%s' offset=%d length=%d" , curBlob .Name (), curBlob .Offset (), curBlob .Length ())
205205 if err != nil {
206206 //should not be possible to get here
207- aggErr . Append ( err )
207+ producer . strategy . Listeners . Errored ( curBlob . Name (), err )
208208 producer .Errorf ("failure during blob transfer '%s' at offset %d: %s" , curBlob .Name (), curBlob .Offset (), err .Error ())
209209 }
210- producer .queueBlobForTransfer (curBlob , bucketName , jobId , aggErr )
210+ producer .queueBlobForTransfer (curBlob , bucketName , jobId )
211211 }
212212}
213213
214214// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
215215// Each transfer operation will retrieve one blob of content from the BP.
216216// Once all blobs have been queued to be transferred, the producer will finish, even if all operations have not been consumed yet.
217- func (producer * getProducer ) run (aggErr * ds3Models.AggregateError ) {
218- defer producer .waitGroup .Done ()
217+ func (producer * getProducer ) run () error {
219218 defer close (* producer .queue )
220219
221220 // determine number of blobs to be processed
@@ -230,9 +229,8 @@ func (producer *getProducer) run(aggErr *ds3Models.AggregateError) {
230229 chunksReady := ds3Models .NewGetJobChunksReadyForClientProcessingSpectraS3Request (producer .JobMasterObjectList .JobId )
231230 chunksReadyResponse , err := producer .client .GetJobChunksReadyForClientProcessingSpectraS3 (chunksReady )
232231 if err != nil {
233- aggErr .Append (err )
234232 producer .Errorf ("unrecoverable error: %v" , err )
235- return
233+ return err
236234 }
237235
238236 // Check to see if any chunks can be processed
@@ -241,17 +239,18 @@ func (producer *getProducer) run(aggErr *ds3Models.AggregateError) {
241239 // Loop through all the chunks that are available for processing, and send
242240 // the files that are contained within them.
243241 for _ , curChunk := range chunksReadyResponse .MasterObjectList .Objects {
244- producer .processChunk (& curChunk , * chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId , aggErr )
242+ producer .processChunk (& curChunk , * chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId )
245243 }
246244
247245 // Attempt to transfer waiting blobs
248- producer .processWaitingBlobs (* chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId , aggErr )
246+ producer .processWaitingBlobs (* chunksReadyResponse .MasterObjectList .BucketName , chunksReadyResponse .MasterObjectList .JobId )
249247 } else {
250248 // When no chunks are returned we need to sleep to allow for cache space to
251249 // be freed.
252250 producer .strategy .BlobStrategy .delay ()
253251 }
254252 }
253+ return nil
255254}
256255
257256// Determines the number of blobs to be transferred.
0 commit comments