Skip to content

Commit eb72495

Browse files
authored
Merge pull request #121 from RachelTucker/improved_error_handling
Improving error handling in helpers in case of malformed objects
2 parents 3e66613 + 6155b21 commit eb72495

File tree

2 files changed

+92
-34
lines changed

2 files changed

+92
-34
lines changed

helpers/getProducer.go

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,24 @@ func toReadObjectMap(getObjects *[]helperModels.GetObject) map[string]helperMode
7171

7272
// Processes all the blobs in a chunk that are ready for transfer from BP
7373
// Returns the number of blobs queued for process
74-
func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) int {
74+
func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) (int, error) {
7575
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)
7676

7777
processedCount := 0
7878
// transfer blobs that are ready, and queue those that are waiting for channel
7979
for _, curObj := range curChunk.Objects {
8080
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
8181
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
82-
if producer.queueBlobForTransfer(&blob, bucketName, jobId) {
82+
83+
blobQueued, err := producer.queueBlobForTransfer(&blob, bucketName, jobId)
84+
if err != nil {
85+
return 0, err
86+
}
87+
if blobQueued {
8388
processedCount++
8489
}
8590
}
86-
return processedCount
91+
return processedCount, nil
8792
}
8893

8994
// Information required to perform a get operation of a blob with BP as data source and channelBuilder as destination
@@ -231,24 +236,37 @@ func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, bl
231236
// Attempts to transfer a single blob from the BP to the client. If the blob is not ready for transfer,
232237
// then it is added to the waiting to transfer queue
233238
// Returns whether or not the blob was queued for transfer
234-
func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) bool {
239+
func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) (bool, error) {
235240
if producer.processedBlobTracker.IsProcessed(*blob) {
236-
return false // already been processed
241+
return false, nil // already been processed
237242
}
238243

239-
curReadObj := producer.readObjectMap[blob.Name()]
244+
curReadObj, ok := producer.readObjectMap[blob.Name()]
245+
if !ok {
246+
err := fmt.Errorf("failed to find object associated with blob in object map: %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
247+
producer.Errorf("unrecoverable error: %v", err)
248+
producer.processedBlobTracker.MarkProcessed(*blob)
249+
return false, err // fatal error occurred
250+
}
251+
252+
if curReadObj.ChannelBuilder == nil {
253+
err := fmt.Errorf("failed to transfer object, it does not have a channel builder: %s", curReadObj.Name)
254+
producer.Errorf("unrecoverable error: %v", err)
255+
producer.processedBlobTracker.MarkProcessed(*blob)
256+
return false, err // fatal error occurred
257+
}
240258

241259
if curReadObj.ChannelBuilder.HasFatalError() {
242260
// a fatal error happened on a previous blob for this file, skip processing
243261
producer.Warningf("fatal error occurred while transferring previous blob on this file, skipping blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
244262
producer.processedBlobTracker.MarkProcessed(*blob)
245-
return false // not going to process
263+
return false, nil // not going to process
246264
}
247265

248266
if !curReadObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) {
249267
producer.Debugf("channel is not currently available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
250268
producer.deferredBlobQueue.Push(blob)
251-
return false // not ready to be processed
269+
return false, nil // not ready to be processed
252270
}
253271

254272
producer.Debugf("channel is available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
@@ -261,7 +279,7 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
261279
jobId: jobId,
262280
}
263281

264-
var transfer TransferOperation = producer.transferOperationBuilder(objInfo)
282+
transfer := producer.transferOperationBuilder(objInfo)
265283

266284
// Increment wait group, and enqueue transfer operation
267285
producer.waitGroup.Add(1)
@@ -270,31 +288,35 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
270288
// Mark blob as processed
271289
producer.processedBlobTracker.MarkProcessed(*blob)
272290

273-
return true
291+
return true, nil
274292
}
275293

276294
// Attempts to process all blobs whose channels were not available for transfer.
277295
// Blobs whose channels are still not available are placed back on the queue.
278296
// Returns the number of blobs queued for processing.
279-
func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string) int {
297+
func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string) (int, error) {
280298
processedCount := 0
281299

282300
// attempt to process all blobs in waiting to be transferred
283301
waitingBlobs := producer.deferredBlobQueue.Size()
284302
for i := 0; i < waitingBlobs; i++ {
285303
//attempt transfer
286304
curBlob, err := producer.deferredBlobQueue.Pop()
287-
producer.Debugf("attempting to process '%s' offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
288305
if err != nil {
289306
//should not be possible to get here
290-
producer.Errorf("failure during blob transfer '%s' at offset %d: %s", curBlob.Name(), curBlob.Offset(), err.Error())
307+
producer.Errorf(err.Error())
291308
break
292309
}
293-
if producer.queueBlobForTransfer(curBlob, bucketName, jobId) {
310+
producer.Debugf("attempting to process '%s' offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
311+
blobQueued, err := producer.queueBlobForTransfer(curBlob, bucketName, jobId)
312+
if err != nil {
313+
return 0, err
314+
}
315+
if blobQueued {
294316
processedCount++
295317
}
296318
}
297-
return processedCount
319+
return processedCount, nil
298320
}
299321

300322
// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
@@ -304,7 +326,7 @@ func (producer *getProducer) run() error {
304326
defer close(*producer.queue)
305327

306328
// determine number of blobs to be processed
307-
var totalBlobCount int64 = producer.totalBlobCount()
329+
var totalBlobCount = producer.totalBlobCount()
308330
producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())
309331

310332
// process all chunks and make sure all blobs are queued for transfer
@@ -332,7 +354,10 @@ func (producer *getProducer) hasMoreToProcess(totalBlobCount int64) bool {
332354
// Returns the number of blobs that have been queued for transfer
333355
func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (int, error) {
334356
// Attempt to transfer waiting blobs
335-
processedCount := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
357+
processedCount, err := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
358+
if err != nil {
359+
return 0, err
360+
}
336361

337362
// Check if we need to query the BP for allocated blobs, or if we already know everything is allocated.
338363
if int64(producer.deferredBlobQueue.Size()) + producer.processedBlobTracker.NumberOfProcessedBlobs() >= totalBlobCount {
@@ -356,7 +381,11 @@ func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (i
356381
// Loop through all the chunks that are available for processing, and send
357382
// the files that are contained within them.
358383
for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects {
359-
processedCount += producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
384+
justProcessedCount, err := producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
385+
if err != nil {
386+
return 0, err
387+
}
388+
processedCount += justProcessedCount
360389
}
361390
}
362391
return processedCount, nil

helpers/putProducer.go

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package helpers
22

33
import (
4+
"fmt"
45
"github.com/SpectraLogic/ds3_go_sdk/ds3"
56
ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
67
helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models"
@@ -136,25 +137,29 @@ func (producer *putProducer) metadataFrom(info putObjectInfo) map[string]string
136137
// Processes all the blobs in a chunk and attempts to add them to the transfer queue.
137138
// If a blob is not ready for transfer, then it is added to the waiting to be transferred queue.
138139
// Returns the number of blobs added to queue.
139-
func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) int {
140+
func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) (int, error) {
140141
processedCount := 0
141142
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)
142143

143144
// transfer blobs that are ready, and queue those that are waiting for channel
144145
for _, curObj := range curChunk.Objects {
145146
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
146147
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
147-
if producer.queueBlobForTransfer(&blob, bucketName, jobId) {
148+
blobQueued, err := producer.queueBlobForTransfer(&blob, bucketName, jobId)
149+
if err != nil {
150+
return 0, err
151+
}
152+
if blobQueued {
148153
processedCount++
149154
}
150155
}
151-
return processedCount
156+
return processedCount, nil
152157
}
153158

154159
// Iterates through blobs that are waiting to be transferred and attempts to transfer.
155160
// If successful, blob is removed from queue. Else, it is re-queued.
156161
// Returns the number of blobs added to queue.
157-
func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string) int {
162+
func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string) (int, error) {
158163
processedCount := 0
159164

160165
// attempt to process all blobs in waiting to be transferred
@@ -168,36 +173,53 @@ func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string
168173
break
169174
}
170175
producer.Debugf("attempting to process %s offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
171-
if producer.queueBlobForTransfer(curBlob, bucketName, jobId) {
176+
blobQueued, err := producer.queueBlobForTransfer(curBlob, bucketName, jobId)
177+
if err != nil {
178+
return 0, err
179+
}
180+
if blobQueued {
172181
processedCount++
173182
}
174183
}
175184

176-
return processedCount
185+
return processedCount, nil
177186
}
178187

179188
// Attempts to transfer a single blob. If the blob is not ready for transfer,
180189
// it is added to the waiting to transfer queue.
181190
// Returns whether or not the blob was queued for transfer.
182-
func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) bool {
191+
func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) (bool, error) {
183192
if producer.processedBlobTracker.IsProcessed(*blob) {
184-
return false // this was already processed
193+
return false, nil // this was already processed
185194
}
186195

187-
curWriteObj := producer.writeObjectMap[blob.Name()]
196+
curWriteObj, ok := producer.writeObjectMap[blob.Name()]
197+
if !ok {
198+
err := fmt.Errorf("failed to find object associated with blob in object map: %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
199+
producer.Errorf("unrecoverable error: %v", err)
200+
producer.processedBlobTracker.MarkProcessed(*blob)
201+
return false, err // fatal error occurred
202+
}
203+
204+
if curWriteObj.ChannelBuilder == nil {
205+
err := fmt.Errorf("failed to transfer object, it does not have a channel builder: %s", curWriteObj.PutObject.Name)
206+
producer.Errorf("unrecoverable error: %v", err)
207+
producer.processedBlobTracker.MarkProcessed(*blob)
208+
return false, err // fatal error occurred
209+
}
188210

189211
if curWriteObj.ChannelBuilder.HasFatalError() {
190212
// a fatal error happened on a previous blob for this file, skip processing
191213
producer.Warningf("fatal error occurred while transferring previous blob on this file, skipping blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
192214
producer.processedBlobTracker.MarkProcessed(*blob)
193-
return false // not actually transferring this blob
215+
return false, nil // not actually transferring this blob
194216
}
195217

196218
if !curWriteObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) {
197219
producer.Debugf("channel is not currently available for blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
198220
// Not ready to be transferred
199221
producer.deferredBlobQueue.Push(blob)
200-
return false // not ready to be sent
222+
return false, nil // not ready to be sent
201223
}
202224

203225
producer.Debugf("channel is available for blob %s offset=%d length=%d", curWriteObj.PutObject.Name, blob.Offset(), blob.Length())
@@ -211,7 +233,7 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
211233
jobId: jobId,
212234
}
213235

214-
var transfer TransferOperation = producer.transferOperationBuilder(objInfo)
236+
transfer := producer.transferOperationBuilder(objInfo)
215237

216238
// Increment wait group, and enqueue transfer operation
217239
producer.waitGroup.Add(1)
@@ -220,7 +242,7 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
220242
// Mark blob as processed
221243
producer.processedBlobTracker.MarkProcessed(*blob)
222244

223-
return true
245+
return true, nil
224246
}
225247

226248
// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
@@ -230,7 +252,7 @@ func (producer *putProducer) run() error {
230252
defer close(*producer.queue)
231253

232254
// determine number of blobs to be processed
233-
var totalBlobCount int64 = producer.totalBlobCount()
255+
totalBlobCount := producer.totalBlobCount()
234256
producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())
235257

236258
// process all chunks and make sure all blobs are queued for transfer
@@ -259,7 +281,10 @@ func (producer *putProducer) hasMoreToProcess(totalBlobCount int64) bool {
259281
// Returns the number of items queued for work.
260282
func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (int, error) {
261283
// Attempt to transfer waiting blobs
262-
processedCount := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
284+
processedCount, err := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
285+
if err != nil {
286+
return 0, err
287+
}
263288

264289
// Check if we need to query the BP for allocated blobs, or if we already know everything is allocated.
265290
if int64(producer.deferredBlobQueue.Size()) + producer.processedBlobTracker.NumberOfProcessedBlobs() >= totalBlobCount {
@@ -283,7 +308,11 @@ func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (i
283308
// Loop through all the chunks that are available for processing, and send
284309
// the files that are contained within them.
285310
for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects {
286-
processedCount += producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
311+
justProcessedCount, err := producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
312+
if err != nil {
313+
return 0, err
314+
}
315+
processedCount += justProcessedCount
287316
}
288317
}
289318
return processedCount, nil

0 commit comments

Comments
 (0)