Skip to content

Commit f318acf

Browse files
authored
Merge pull request #90 from RachelTucker/GOSDK-18-per-file-listeners-helpers
GOSDK-18: Add object error and completed listeners to Strategies in helpers
2 parents 5b85672 + a8994e7 commit f318acf

File tree

10 files changed

+138
-78
lines changed

10 files changed

+138
-78
lines changed

helpers/getProducer.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package helpers
22

33
import (
4-
ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
54
"github.com/SpectraLogic/ds3_go_sdk/ds3"
6-
"sync"
7-
"io"
8-
"github.com/SpectraLogic/ds3_go_sdk/helpers/ranges"
5+
ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
96
helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models"
7+
"github.com/SpectraLogic/ds3_go_sdk/helpers/ranges"
108
"github.com/SpectraLogic/ds3_go_sdk/sdk_log"
9+
"io"
10+
"sync"
1111
)
1212

1313
type getProducer struct {
@@ -56,14 +56,14 @@ func toReadObjectMap(getObjects *[]helperModels.GetObject) map[string]helperMode
5656
}
5757

5858
// Processes all the blobs in a chunk that are ready for transfer from BP
59-
func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
59+
func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) {
6060
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)
6161

6262
// transfer blobs that are ready, and queue those that are waiting for channel
6363
for _, curObj := range curChunk.Objects {
6464
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
6565
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
66-
producer.queueBlobForTransfer(&blob, bucketName, jobId, aggErr)
66+
producer.queueBlobForTransfer(&blob, bucketName, jobId)
6767
}
6868
}
6969

@@ -76,7 +76,7 @@ type getObjectInfo struct {
7676
}
7777

7878
// Creates the transfer operation that will perform the data retrieval of the specified blob from BP
79-
func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr *ds3Models.AggregateError) TransferOperation {
79+
func (producer *getProducer) transferOperationBuilder(info getObjectInfo) TransferOperation {
8080
return func() {
8181
// has this file fatally errored while transferring a different blob?
8282
if info.channelBuilder.HasFatalError() {
@@ -98,7 +98,7 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
9898

9999
getObjResponse, err := producer.client.GetObject(getObjRequest)
100100
if err != nil {
101-
aggErr.Append(err)
101+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
102102
info.channelBuilder.SetFatalError(err)
103103
producer.Errorf("unable to retrieve object '%s' at offset %d: %s", info.blob.Name(), info.blob.Offset(), err.Error())
104104
return
@@ -111,15 +111,15 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
111111
if len(blobRanges) == 0 {
112112
writer, err := info.channelBuilder.GetChannel(info.blob.Offset())
113113
if err != nil {
114-
aggErr.Append(err)
114+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
115115
info.channelBuilder.SetFatalError(err)
116116
producer.Errorf("unable to read contents of object '%s' at offset '%d': %s", info.blob.Name(), info.blob.Offset(), err.Error())
117117
return
118118
}
119119
defer info.channelBuilder.OnDone(writer)
120120
_, err = io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer
121121
if err != nil {
122-
aggErr.Append(err)
122+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
123123
info.channelBuilder.SetFatalError(err)
124124
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
125125
}
@@ -130,7 +130,7 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo, aggErr
130130
for _, r := range blobRanges {
131131
err := writeRangeToDestination(info.channelBuilder, r, getObjResponse.Content)
132132
if err != nil {
133-
aggErr.Append(err)
133+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
134134
info.channelBuilder.SetFatalError(err)
135135
producer.Errorf("unable to write to destination channel for object '%s' with range '%v': %s", info.blob.Name(), r, err.Error())
136136
}
@@ -153,7 +153,7 @@ func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, bl
153153

154154
// Attempts to transfer a single blob from the BP to the client. If the blob is not ready for transfer,
155155
// then it is added to the waiting to transfer queue
156-
func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
156+
func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) {
157157
if producer.processedBlobTracker.IsProcessed(*blob) {
158158
return
159159
}
@@ -183,7 +183,7 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
183183
jobId: jobId,
184184
}
185185

186-
var transfer TransferOperation = producer.transferOperationBuilder(objInfo, aggErr)
186+
var transfer TransferOperation = producer.transferOperationBuilder(objInfo)
187187

188188
// Increment wait group, and enqueue transfer operation
189189
producer.waitGroup.Add(1)
@@ -195,7 +195,7 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
195195

196196
// Attempts to process all blobs whose channels were not available for transfer.
197197
// Blobs whose channels are still not available are placed back on the queue.
198-
func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
198+
func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string) {
199199
// attempt to process all blobs in waiting to be transferred
200200
waitingBlobs := producer.deferredBlobQueue.Size()
201201
for i := 0; i < waitingBlobs; i++ {
@@ -204,18 +204,17 @@ func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string
204204
producer.Debugf("attempting to process '%s' offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
205205
if err != nil {
206206
//should not be possible to get here
207-
aggErr.Append(err)
208207
producer.Errorf("failure during blob transfer '%s' at offset %d: %s", curBlob.Name(), curBlob.Offset(), err.Error())
208+
break
209209
}
210-
producer.queueBlobForTransfer(curBlob, bucketName, jobId, aggErr)
210+
producer.queueBlobForTransfer(curBlob, bucketName, jobId)
211211
}
212212
}
213213

214214
// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
215215
// Each transfer operation will retrieve one blob of content from the BP.
216216
// Once all blobs have been queued to be transferred, the producer will finish, even if all operations have not been consumed yet.
217-
func (producer *getProducer) run(aggErr *ds3Models.AggregateError) {
218-
defer producer.waitGroup.Done()
217+
func (producer *getProducer) run() error {
219218
defer close(*producer.queue)
220219

221220
// determine number of blobs to be processed
@@ -230,9 +229,8 @@ func (producer *getProducer) run(aggErr *ds3Models.AggregateError) {
230229
chunksReady := ds3Models.NewGetJobChunksReadyForClientProcessingSpectraS3Request(producer.JobMasterObjectList.JobId)
231230
chunksReadyResponse, err := producer.client.GetJobChunksReadyForClientProcessingSpectraS3(chunksReady)
232231
if err != nil {
233-
aggErr.Append(err)
234232
producer.Errorf("unrecoverable error: %v", err)
235-
return
233+
return err
236234
}
237235

238236
// Check to see if any chunks can be processed
@@ -241,17 +239,18 @@ func (producer *getProducer) run(aggErr *ds3Models.AggregateError) {
241239
// Loop through all the chunks that are available for processing, and send
242240
// the files that are contained within them.
243241
for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects {
244-
producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId, aggErr)
242+
producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
245243
}
246244

247245
// Attempt to transfer waiting blobs
248-
producer.processWaitingBlobs(*chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId, aggErr)
246+
producer.processWaitingBlobs(*chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
249247
} else {
250248
// When no chunks are returned we need to sleep to allow for cache space to
251249
// be freed.
252250
producer.strategy.BlobStrategy.delay()
253251
}
254252
}
253+
return nil
255254
}
256255

257256
// Determines the number of blobs to be transferred.

helpers/getTransfernator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ func (transceiver *getTransceiver) transfer() (string, error) {
8181

8282
// Wait for completion of producer-consumer goroutines
8383
var aggErr ds3Models.AggregateError
84-
waitGroup.Add(2) // adding producer and consumer goroutines to wait group
85-
go producer.run(&aggErr) // producer will add to waitGroup for every blob retrieval added to queue, and each transfer performed will decrement from waitGroup
84+
waitGroup.Add(1) // adding producer and consumer goroutines to wait group
8685
go consumer.run()
86+
err = producer.run() // producer will add to waitGroup for every blob retrieval added to queue, and each transfer performed will decrement from waitGroup
8787
waitGroup.Wait()
8888

8989
return bulkGetResponse.MasterObjectList.JobId, aggErr.GetErrors()

helpers/listeners.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package helpers
2+
3+
type ListenerStrategy struct {
4+
// Called when an error occurred during transfer of an object.
5+
// This must be a thread safe function.
6+
ErrorCallback func(objectName string, err error)
7+
}
8+
9+
func (listener *ListenerStrategy) Errored(objectName string, err error) {
10+
if listener.ErrorCallback != nil {
11+
listener.ErrorCallback(objectName, err)
12+
}
13+
}

helpers/putProducer.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type putObjectInfo struct {
6060
}
6161

6262
// Creates the transfer operation that will perform the data upload of the specified blob to BP
63-
func (producer *putProducer) transferOperationBuilder(info putObjectInfo, aggErr *ds3Models.AggregateError) TransferOperation {
63+
func (producer *putProducer) transferOperationBuilder(info putObjectInfo) TransferOperation {
6464
return func() {
6565
// has this file fatally errored while transferring a different blob?
6666
if info.channelBuilder.HasFatalError() {
@@ -70,7 +70,8 @@ func (producer *putProducer) transferOperationBuilder(info putObjectInfo, aggErr
7070
}
7171
reader, err := info.channelBuilder.GetChannel(info.blob.Offset())
7272
if err != nil {
73-
aggErr.Append(err)
73+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
74+
7475
info.channelBuilder.SetFatalError(err)
7576
producer.Errorf("could not get reader for object with name='%s' offset=%d length=%d: %v", info.blob.Name(), info.blob.Offset(), info.blob.Length(), err)
7677
return
@@ -86,7 +87,8 @@ func (producer *putProducer) transferOperationBuilder(info putObjectInfo, aggErr
8687

8788
_, err = producer.client.PutObject(putObjRequest)
8889
if err != nil {
89-
aggErr.Append(err)
90+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
91+
9092
info.channelBuilder.SetFatalError(err)
9193
producer.Errorf("problem during transfer of %s: %s", info.blob.Name(), err.Error())
9294
}
@@ -120,38 +122,38 @@ func (producer *putProducer) metadataFrom(info putObjectInfo) map[string]string
120122

121123
// Processes all the blobs in a chunk and attempts to add them to the transfer queue.
122124
// If a blob is not ready for transfer, then it is added to the waiting to be transferred queue.
123-
func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
125+
func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) {
124126
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)
125127

126128
// transfer blobs that are ready, and queue those that are waiting for channel
127129
for _, curObj := range curChunk.Objects {
128130
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
129131
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
130-
producer.queueBlobForTransfer(&blob, bucketName, jobId, aggErr)
132+
producer.queueBlobForTransfer(&blob, bucketName, jobId)
131133
}
132134
}
133135

134136
// Iterates through blobs that are waiting to be transferred and attempts to transfer.
135137
// If successful, blob is removed from queue. Else, it is re-queued.
136-
func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
138+
func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string) {
137139
// attempt to process all blobs in waiting to be transferred
138140
waitingBlobs := producer.deferredBlobQueue.Size()
139141
for i := 0; i < waitingBlobs; i++ {
140142
//attempt transfer
141143
curBlob, err := producer.deferredBlobQueue.Pop()
142144
if err != nil {
143-
aggErr.Append(err)
145+
//should not be possible to get here
144146
producer.Errorf("problem when getting next blob to be transferred: %s", err.Error())
145-
continue
147+
break
146148
}
147149
producer.Debugf("attempting to process %s offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
148-
producer.queueBlobForTransfer(curBlob, bucketName, jobId, aggErr)
150+
producer.queueBlobForTransfer(curBlob, bucketName, jobId)
149151
}
150152
}
151153

152154
// Attempts to transfer a single blob. If the blob is not ready for transfer,
153155
// it is added to the waiting to transfer queue.
154-
func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string, aggErr *ds3Models.AggregateError) {
156+
func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) {
155157
if producer.processedBlobTracker.IsProcessed(*blob) {
156158
return
157159
}
@@ -183,7 +185,7 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
183185
jobId: jobId,
184186
}
185187

186-
var transfer TransferOperation = producer.transferOperationBuilder(objInfo, aggErr)
188+
var transfer TransferOperation = producer.transferOperationBuilder(objInfo)
187189

188190
// Increment wait group, and enqueue transfer operation
189191
producer.waitGroup.Add(1)
@@ -196,8 +198,7 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
196198
// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
197199
// Each transfer operation will put one blob of content to the BP.
198200
// Once all blobs have been queued to be transferred, the producer will finish, even if all operations have not been consumed yet.
199-
func (producer *putProducer) run(aggErr *ds3Models.AggregateError) {
200-
defer producer.waitGroup.Done()
201+
func (producer *putProducer) run() error {
201202
defer close(*producer.queue)
202203

203204
// determine number of blobs to be processed
@@ -212,9 +213,8 @@ func (producer *putProducer) run(aggErr *ds3Models.AggregateError) {
212213
chunksReady := ds3Models.NewGetJobChunksReadyForClientProcessingSpectraS3Request(producer.JobMasterObjectList.JobId)
213214
chunksReadyResponse, err := producer.client.GetJobChunksReadyForClientProcessingSpectraS3(chunksReady)
214215
if err != nil {
215-
aggErr.Append(err)
216216
producer.Errorf("unrecoverable error: %v", err)
217-
return
217+
return err
218218
}
219219

220220
// Check to see if any chunks can be processed
@@ -223,17 +223,18 @@ func (producer *putProducer) run(aggErr *ds3Models.AggregateError) {
223223
// Loop through all the chunks that are available for processing, and send
224224
// the files that are contained within them.
225225
for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects {
226-
producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId, aggErr)
226+
producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
227227
}
228228

229229
// Attempt to transfer waiting blobs
230-
producer.processWaitingBlobs(*chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId, aggErr)
230+
producer.processWaitingBlobs(*chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
231231
} else {
232232
// When no chunks are returned we need to sleep to allow for cache space to
233233
// be freed.
234234
producer.strategy.BlobStrategy.delay()
235235
}
236236
}
237+
return nil
237238
}
238239

239240
// Determines the number of blobs to be transferred.

helpers/putTransceiver.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,13 @@ func (transceiver *putTransceiver) transfer() (string, error) {
7878
consumer := newConsumer(&queue, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers())
7979

8080
// Wait for completion of producer-consumer goroutines
81-
waitGroup.Add(2) // adding producer and consumer goroutines to wait group
81+
waitGroup.Add(1) // adding producer and consumer goroutines to wait group
8282

83-
var aggErr ds3Models.AggregateError
84-
go producer.run(&aggErr) // producer will add to waitGroup for every blob added to queue, and each transfer performed will decrement from waitGroup
8583
go consumer.run()
84+
err = producer.run() // producer will add to waitGroup for every blob added to queue, and each transfer performed will decrement from waitGroup
8685
waitGroup.Wait()
8786

88-
return bulkPutResponse.MasterObjectList.JobId, aggErr.GetErrors()
87+
return bulkPutResponse.MasterObjectList.JobId, err
8988
}
9089

9190
/*

helpers/readTransferStrategy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
55
type ReadTransferStrategy struct {
66
BlobStrategy ReadBlobStrategy
77
Options ReadBulkJobOptions
8+
Listeners ListenerStrategy
89
}
910

1011
// Defines the options to use on the get bulk job

helpers/writeTransferStrategy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ var MinUploadSize int64 = 10485760
1010
type WriteTransferStrategy struct {
1111
BlobStrategy WriteBlobStrategy
1212
Options WriteBulkJobOptions
13+
Listeners ListenerStrategy
1314
}
1415

1516
// Defines the options to use on the put bulk job

0 commit comments

Comments
 (0)