Skip to content

Commit 6155b21

Browse files
committed
Malformed objects in helpers will produce BP job failures so clients have quick feedback of incorrect usage.
1 parent c4fce09 commit 6155b21

File tree

2 files changed

+76
-40
lines changed

2 files changed

+76
-40
lines changed

helpers/getProducer.go

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,24 @@ func toReadObjectMap(getObjects *[]helperModels.GetObject) map[string]helperMode
7171

7272
// Processes all the blobs in a chunk that are ready for transfer from BP
7373
// Returns the number of blobs queued for process
74-
func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) int {
74+
func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) (int, error) {
7575
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)
7676

7777
processedCount := 0
7878
// transfer blobs that are ready, and queue those that are waiting for channel
7979
for _, curObj := range curChunk.Objects {
8080
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
8181
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
82-
if producer.queueBlobForTransfer(&blob, bucketName, jobId) {
82+
83+
blobQueued, err := producer.queueBlobForTransfer(&blob, bucketName, jobId)
84+
if err != nil {
85+
return 0, err
86+
}
87+
if blobQueued {
8388
processedCount++
8489
}
8590
}
86-
return processedCount
91+
return processedCount, nil
8792
}
8893

8994
// Information required to perform a get operation of a blob with BP as data source and channelBuilder as destination
@@ -231,35 +236,37 @@ func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, bl
231236
// Attempts to transfer a single blob from the BP to the client. If the blob is not ready for transfer,
232237
// then it is added to the waiting to transfer queue
233238
// Returns whether or not the blob was queued for transfer
234-
func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) bool {
239+
func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) (bool, error) {
235240
if producer.processedBlobTracker.IsProcessed(*blob) {
236-
return false // already been processed
241+
return false, nil // already been processed
237242
}
238243

239244
curReadObj, ok := producer.readObjectMap[blob.Name()]
240245
if !ok {
241-
producer.Errorf("failed to find object associated with blob in object map: %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
246+
err := fmt.Errorf("failed to find object associated with blob in object map: %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
247+
producer.Errorf("unrecoverable error: %v", err)
242248
producer.processedBlobTracker.MarkProcessed(*blob)
243-
return false // not actually transferring this blob
249+
return false, err // fatal error occurred
244250
}
245251

246252
if curReadObj.ChannelBuilder == nil {
247-
producer.Errorf("failed to transfer object, it does not have a channel builder: %s", curReadObj.Name)
253+
err := fmt.Errorf("failed to transfer object, it does not have a channel builder: %s", curReadObj.Name)
254+
producer.Errorf("unrecoverable error: %v", err)
248255
producer.processedBlobTracker.MarkProcessed(*blob)
249-
return false // not actually transferring this blob
256+
return false, err // fatal error occurred
250257
}
251258

252259
if curReadObj.ChannelBuilder.HasFatalError() {
253260
// a fatal error happened on a previous blob for this file, skip processing
254261
producer.Warningf("fatal error occurred while transferring previous blob on this file, skipping blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
255262
producer.processedBlobTracker.MarkProcessed(*blob)
256-
return false // not going to process
263+
return false, nil // not going to process
257264
}
258265

259266
if !curReadObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) {
260267
producer.Debugf("channel is not currently available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
261268
producer.deferredBlobQueue.Push(blob)
262-
return false // not ready to be processed
269+
return false, nil // not ready to be processed
263270
}
264271

265272
producer.Debugf("channel is available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
@@ -272,7 +279,7 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
272279
jobId: jobId,
273280
}
274281

275-
var transfer TransferOperation = producer.transferOperationBuilder(objInfo)
282+
transfer := producer.transferOperationBuilder(objInfo)
276283

277284
// Increment wait group, and enqueue transfer operation
278285
producer.waitGroup.Add(1)
@@ -281,31 +288,35 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
281288
// Mark blob as processed
282289
producer.processedBlobTracker.MarkProcessed(*blob)
283290

284-
return true
291+
return true, nil
285292
}
286293

287294
// Attempts to process all blobs whose channels were not available for transfer.
288295
// Blobs whose channels are still not available are placed back on the queue.
289296
// Returns the number of blobs queued for processing.
290-
func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string) int {
297+
func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string) (int, error) {
291298
processedCount := 0
292299

293300
// attempt to process all blobs in waiting to be transferred
294301
waitingBlobs := producer.deferredBlobQueue.Size()
295302
for i := 0; i < waitingBlobs; i++ {
296303
//attempt transfer
297304
curBlob, err := producer.deferredBlobQueue.Pop()
298-
producer.Debugf("attempting to process '%s' offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
299305
if err != nil {
300306
//should not be possible to get here
301-
producer.Errorf("failure during blob transfer '%s' at offset %d: %s", curBlob.Name(), curBlob.Offset(), err.Error())
307+
producer.Errorf(err.Error())
302308
break
303309
}
304-
if producer.queueBlobForTransfer(curBlob, bucketName, jobId) {
310+
producer.Debugf("attempting to process '%s' offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
311+
blobQueued, err := producer.queueBlobForTransfer(curBlob, bucketName, jobId)
312+
if err != nil {
313+
return 0, err
314+
}
315+
if blobQueued {
305316
processedCount++
306317
}
307318
}
308-
return processedCount
319+
return processedCount, nil
309320
}
310321

311322
// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
@@ -315,7 +326,7 @@ func (producer *getProducer) run() error {
315326
defer close(*producer.queue)
316327

317328
// determine number of blobs to be processed
318-
var totalBlobCount int64 = producer.totalBlobCount()
329+
var totalBlobCount = producer.totalBlobCount()
319330
producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())
320331

321332
// process all chunks and make sure all blobs are queued for transfer
@@ -343,7 +354,10 @@ func (producer *getProducer) hasMoreToProcess(totalBlobCount int64) bool {
343354
// Returns the number of blobs that have been queued for transfer
344355
func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (int, error) {
345356
// Attempt to transfer waiting blobs
346-
processedCount := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
357+
processedCount, err := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
358+
if err != nil {
359+
return 0, err
360+
}
347361

348362
// Check if we need to query the BP for allocated blobs, or if we already know everything is allocated.
349363
if int64(producer.deferredBlobQueue.Size()) + producer.processedBlobTracker.NumberOfProcessedBlobs() >= totalBlobCount {
@@ -367,7 +381,11 @@ func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (i
367381
// Loop through all the chunks that are available for processing, and send
368382
// the files that are contained within them.
369383
for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects {
370-
processedCount += producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
384+
justProcessedCount, err := producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
385+
if err != nil {
386+
return 0, err
387+
}
388+
processedCount += justProcessedCount
371389
}
372390
}
373391
return processedCount, nil

helpers/putProducer.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package helpers
22

33
import (
4+
"fmt"
45
"github.com/SpectraLogic/ds3_go_sdk/ds3"
56
ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models"
67
helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models"
@@ -136,25 +137,29 @@ func (producer *putProducer) metadataFrom(info putObjectInfo) map[string]string
136137
// Processes all the blobs in a chunk and attempts to add them to the transfer queue.
137138
// If a blob is not ready for transfer, then it is added to the waiting to be transferred queue.
138139
// Returns the number of blobs added to queue.
139-
func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) int {
140+
func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) (int, error) {
140141
processedCount := 0
141142
producer.Debugf("begin chunk processing %s", curChunk.ChunkId)
142143

143144
// transfer blobs that are ready, and queue those that are waiting for channel
144145
for _, curObj := range curChunk.Objects {
145146
producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length)
146147
blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length)
147-
if producer.queueBlobForTransfer(&blob, bucketName, jobId) {
148+
blobQueued, err := producer.queueBlobForTransfer(&blob, bucketName, jobId)
149+
if err != nil {
150+
return 0, err
151+
}
152+
if blobQueued {
148153
processedCount++
149154
}
150155
}
151-
return processedCount
156+
return processedCount, nil
152157
}
153158

154159
// Iterates through blobs that are waiting to be transferred and attempts to transfer.
155160
// If successful, blob is removed from queue. Else, it is re-queued.
156161
// Returns the number of blobs added to queue.
157-
func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string) int {
162+
func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string) (int, error) {
158163
processedCount := 0
159164

160165
// attempt to process all blobs in waiting to be transferred
@@ -168,47 +173,53 @@ func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string
168173
break
169174
}
170175
producer.Debugf("attempting to process %s offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length())
171-
if producer.queueBlobForTransfer(curBlob, bucketName, jobId) {
176+
blobQueued, err := producer.queueBlobForTransfer(curBlob, bucketName, jobId)
177+
if err != nil {
178+
return 0, err
179+
}
180+
if blobQueued {
172181
processedCount++
173182
}
174183
}
175184

176-
return processedCount
185+
return processedCount, nil
177186
}
178187

179188
// Attempts to transfer a single blob. If the blob is not ready for transfer,
180189
// it is added to the waiting to transfer queue.
181190
// Returns whether or not the blob was queued for transfer.
182-
func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) bool {
191+
func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) (bool, error) {
183192
if producer.processedBlobTracker.IsProcessed(*blob) {
184-
return false // this was already processed
193+
return false, nil // this was already processed
185194
}
186195

187196
curWriteObj, ok := producer.writeObjectMap[blob.Name()]
188197
if !ok {
189-
producer.Errorf("failed to find object associated with blob in object map: %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
198+
err := fmt.Errorf("failed to find object associated with blob in object map: %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
199+
producer.Errorf("unrecoverable error: %v", err)
190200
producer.processedBlobTracker.MarkProcessed(*blob)
191-
return false // not actually transferring this blob
201+
return false, err // fatal error occurred
192202
}
193203

194204
if curWriteObj.ChannelBuilder == nil {
195-
producer.Errorf("failed to transfer object, it does not have a channel builder: %s", curWriteObj.PutObject.Name)
205+
err := fmt.Errorf("failed to transfer object, it does not have a channel builder: %s", curWriteObj.PutObject.Name)
206+
producer.Errorf("unrecoverable error: %v", err)
196207
producer.processedBlobTracker.MarkProcessed(*blob)
197-
return false // not actually transferring this blob
208+
return false, err // fatal error occurred
198209
}
199210

200211
if curWriteObj.ChannelBuilder.HasFatalError() {
201212
// a fatal error happened on a previous blob for this file, skip processing
202213
producer.Warningf("fatal error occurred while transferring previous blob on this file, skipping blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
203214
producer.processedBlobTracker.MarkProcessed(*blob)
204-
return false // not actually transferring this blob
215+
return false, nil // not actually transferring this blob
205216
}
206217

207218
if !curWriteObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) {
208219
producer.Debugf("channel is not currently available for blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length())
209220
// Not ready to be transferred
210221
producer.deferredBlobQueue.Push(blob)
211-
return false // not ready to be sent
222+
return false, nil // not ready to be sent
212223
}
213224

214225
producer.Debugf("channel is available for blob %s offset=%d length=%d", curWriteObj.PutObject.Name, blob.Offset(), blob.Length())
@@ -222,7 +233,7 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
222233
jobId: jobId,
223234
}
224235

225-
var transfer TransferOperation = producer.transferOperationBuilder(objInfo)
236+
transfer := producer.transferOperationBuilder(objInfo)
226237

227238
// Increment wait group, and enqueue transfer operation
228239
producer.waitGroup.Add(1)
@@ -231,7 +242,7 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip
231242
// Mark blob as processed
232243
producer.processedBlobTracker.MarkProcessed(*blob)
233244

234-
return true
245+
return true, nil
235246
}
236247

237248
// This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine.
@@ -241,7 +252,7 @@ func (producer *putProducer) run() error {
241252
defer close(*producer.queue)
242253

243254
// determine number of blobs to be processed
244-
var totalBlobCount int64 = producer.totalBlobCount()
255+
totalBlobCount := producer.totalBlobCount()
245256
producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs())
246257

247258
// process all chunks and make sure all blobs are queued for transfer
@@ -270,7 +281,10 @@ func (producer *putProducer) hasMoreToProcess(totalBlobCount int64) bool {
270281
// Returns the number of items queued for work.
271282
func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (int, error) {
272283
// Attempt to transfer waiting blobs
273-
processedCount := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
284+
processedCount, err := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId)
285+
if err != nil {
286+
return 0, err
287+
}
274288

275289
// Check if we need to query the BP for allocated blobs, or if we already know everything is allocated.
276290
if int64(producer.deferredBlobQueue.Size()) + producer.processedBlobTracker.NumberOfProcessedBlobs() >= totalBlobCount {
@@ -294,7 +308,11 @@ func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (i
294308
// Loop through all the chunks that are available for processing, and send
295309
// the files that are contained within them.
296310
for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects {
297-
processedCount += producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
311+
justProcessedCount, err := producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId)
312+
if err != nil {
313+
return 0, err
314+
}
315+
processedCount += justProcessedCount
298316
}
299317
}
300318
return processedCount, nil

0 commit comments

Comments
 (0)