Skip to content

Commit dc0d996

Browse files
committed
OTHER: adding retry around bulk get when a blob only partially transfers. If the blob is not retrieved in its entirety, then ranged naked gets are used to retrieve the remaining portion of the blob. This retry only applies to blobs where no range was specified.
1 parent 045fb30 commit dc0d996

File tree

3 files changed

+174
-6
lines changed

3 files changed

+174
-6
lines changed

ds3_integration/utils/testUtils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func VerifyBookContent(t *testing.T, bookName string, actual io.ReadCloser) {
6060
verifyContent(t, expected, actual)
6161
}
6262

63-
func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.ReadCloser) {
63+
func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.Reader) {
6464
f, err := os.Open(filePath)
6565
ds3Testing.AssertNilError(t, err)
6666

@@ -73,7 +73,7 @@ func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64
7373
verifyPartialContent(t, *expected, actual, length)
7474
}
7575

76-
func verifyPartialContent(t *testing.T, expected []byte, actual io.ReadCloser, length int64) {
76+
func verifyPartialContent(t *testing.T, expected []byte, actual io.Reader, length int64) {
7777
content, err := getNBytesFromReader(actual, length)
7878
ds3Testing.AssertNilError(t, err)
7979

helpers/getProducer.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"time"
1313
)
1414

15+
const timesToRetryGettingPartialBlob = 5
16+
1517
type getProducer struct {
1618
JobMasterObjectList *ds3Models.MasterObjectList //MOL from put bulk job creation
1719
GetObjects *[]helperModels.GetObject
@@ -146,10 +148,13 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf
146148
return
147149
}
148150
if bytesWritten != info.blob.Length() {
149-
err = fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length())
150-
producer.strategy.Listeners.Errored(info.blob.Name(), err)
151-
info.channelBuilder.SetFatalError(err)
152-
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
151+
producer.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length())
152+
err := GetRemainingBlob(producer.client, info.bucketName, info.blob, bytesWritten, writer, producer.Logger)
153+
if err != nil {
154+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
155+
info.channelBuilder.SetFatalError(err)
156+
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
157+
}
153158
}
154159
return
155160
}
@@ -166,6 +171,52 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf
166171
}
167172
}
168173

174+
func GetRemainingBlob(client *ds3.Client, bucketName string, blob *helperModels.BlobDescription, amountAlreadyRetrieved int64, writer io.Writer, logger sdk_log.Logger) error {
175+
logger.Debugf("starting retry for fetching partial blob '%s' at offset '%d': amount to retrieve %d", blob.Name(), blob.Offset(), blob.Length() - amountAlreadyRetrieved)
176+
bytesRetrievedSoFar := amountAlreadyRetrieved
177+
timesRetried := 0
178+
rangeEnd := blob.Offset() + blob.Length() -1
179+
for bytesRetrievedSoFar < blob.Length() && timesRetried < timesToRetryGettingPartialBlob {
180+
rangeStart := blob.Offset() + bytesRetrievedSoFar
181+
bytesRetrievedThisRound, err := RetryGettingBlobRange(client, bucketName, blob.Name(), blob.Offset(), rangeStart, rangeEnd, writer, logger)
182+
if err != nil {
183+
logger.Errorf("failed to get object '%s' at offset '%d', range %d=%d attempt %d: %s", blob.Name(), blob.Offset(), rangeStart, rangeEnd, timesRetried, err.Error())
184+
}
185+
bytesRetrievedSoFar+= bytesRetrievedThisRound
186+
timesRetried++
187+
}
188+
189+
if bytesRetrievedSoFar < blob.Length() {
190+
return fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", blob.Name(), blob.Offset(), bytesRetrievedSoFar, blob.Length())
191+
}
192+
return nil
193+
}
194+
195+
func RetryGettingBlobRange(client *ds3.Client, bucketName string, objectName string, blobOffset int64, rangeStart int64, rangeEnd int64, writer io.Writer, logger sdk_log.Logger) (int64, error) {
196+
// perform a naked get call for the rest of the blob that we originally failed to get
197+
partOfBlobToFetch := ds3Models.Range{
198+
Start: rangeStart,
199+
End: rangeEnd,
200+
}
201+
getObjRequest := ds3Models.NewGetObjectRequest(bucketName, objectName).
202+
WithOffset(blobOffset).
203+
WithRanges(partOfBlobToFetch)
204+
205+
getObjResponse, err := client.GetObject(getObjRequest)
206+
if err != nil {
207+
return 0, err
208+
}
209+
defer func() {
210+
err := getObjResponse.Content.Close()
211+
if err != nil {
212+
logger.Warningf("failed to close response body for get object '%s' with range %d-%d: %v", objectName, rangeStart, rangeEnd, err)
213+
}
214+
}()
215+
216+
bytesWritten, err := io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer
217+
return bytesWritten, err
218+
}
219+
169220
// Writes a range of a blob to its destination channel
170221
func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, blobRange ds3Models.Range, content io.Reader) error {
171222
writer, err := channelBuilder.GetChannel(blobRange.Start)

helpers_integration/helpersImpl_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,3 +504,120 @@ func TestBulkPutAndGetLotsOfFiles(t *testing.T) {
504504
t.Errorf("expected to get a BP job ID, but instead got nothing")
505505
}
506506
}
507+
508+
func TestRetryGettingBlobRange(t *testing.T) {
509+
defer testutils.DeleteBucketContents(client, testBucket)
510+
511+
helper := helpers.NewHelpers(client)
512+
strategy := newTestTransferStrategy(t)
513+
514+
// Put a blobbed object to BP
515+
const bigFilePath = LargeBookPath + LargeBookTitle
516+
writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath)
517+
ds3Testing.AssertNilError(t, err)
518+
519+
var writeObjects []helperModels.PutObject
520+
writeObjects = append(writeObjects, *writeObj)
521+
522+
putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy)
523+
ds3Testing.AssertNilError(t, err)
524+
if putJobId == "" {
525+
t.Error("expected to get a BP job ID, but instead got nothing")
526+
}
527+
528+
// Try to get some data from each blob
529+
getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId))
530+
ds3Testing.AssertNilError(t, err)
531+
532+
blobsChecked := 0
533+
for _, curObj := range getJob.MasterObjectList.Objects {
534+
for _, blob := range curObj.Objects {
535+
func() {
536+
// create a temp file for writing the blob to
537+
tempFile, err := ioutil.TempFile("", "go-sdk-test-")
538+
ds3Testing.AssertNilError(t, err)
539+
defer func() {
540+
tempFile.Close()
541+
os.Remove(tempFile.Name())
542+
}()
543+
544+
// get a range of the blob
545+
startRange := blob.Offset+10 // retrieve subset of blob
546+
endRange := blob.Length+blob.Offset-1
547+
bytesWritten, err := helpers.RetryGettingBlobRange(client, testBucket, writeObj.PutObject.Name, blob.Offset, startRange, endRange, tempFile, client.Logger)
548+
ds3Testing.AssertNilError(t, err)
549+
ds3Testing.AssertInt64(t, "bytes written", endRange-startRange+1, bytesWritten)
550+
551+
// verify that retrieved partial blob is correct
552+
err = tempFile.Sync()
553+
ds3Testing.AssertNilError(t, err)
554+
555+
tempFile.Seek(0, 0)
556+
length := endRange-startRange
557+
testutils.VerifyPartialFile(t, bigFilePath, length, startRange, tempFile)
558+
}()
559+
blobsChecked++
560+
}
561+
}
562+
if blobsChecked == 0 {
563+
t.Fatalf("didn't verify any blobs")
564+
}
565+
}
566+
567+
func TestGetRemainingBlob(t *testing.T) {
568+
defer testutils.DeleteBucketContents(client, testBucket)
569+
570+
helper := helpers.NewHelpers(client)
571+
strategy := newTestTransferStrategy(t)
572+
573+
// Put a blobbed object to BP
574+
const bigFilePath = LargeBookPath + LargeBookTitle
575+
writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath)
576+
ds3Testing.AssertNilError(t, err)
577+
578+
var writeObjects []helperModels.PutObject
579+
writeObjects = append(writeObjects, *writeObj)
580+
581+
putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy)
582+
ds3Testing.AssertNilError(t, err)
583+
if putJobId == "" {
584+
t.Error("expected to get a BP job ID, but instead got nothing")
585+
}
586+
587+
// Try to get some data from each blob
588+
getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId))
589+
ds3Testing.AssertNilError(t, err)
590+
591+
blobsChecked := 0
592+
for _, curObj := range getJob.MasterObjectList.Objects {
593+
for _, blob := range curObj.Objects {
594+
func() {
595+
// create a temp file for writing the blob to
596+
tempFile, err := ioutil.TempFile("", "go-sdk-test-")
597+
ds3Testing.AssertNilError(t, err)
598+
defer func() {
599+
tempFile.Close()
600+
os.Remove(tempFile.Name())
601+
}()
602+
603+
// get the remainder of the blob after skipping some bytes
604+
blob := helperModels.NewBlobDescription(*blob.Name, blob.Offset, blob.Length)
605+
var amountToSkip int64 = 10
606+
err = helpers.GetRemainingBlob(client, testBucket, &blob, amountToSkip, tempFile, client.Logger)
607+
ds3Testing.AssertNilError(t, err)
608+
609+
// verify that retrieved partial blob is correct
610+
err = tempFile.Sync()
611+
ds3Testing.AssertNilError(t, err)
612+
613+
tempFile.Seek(0, 0)
614+
length := blob.Length() - amountToSkip
615+
testutils.VerifyPartialFile(t, bigFilePath, length, blob.Offset()+amountToSkip, tempFile)
616+
}()
617+
blobsChecked++
618+
}
619+
}
620+
if blobsChecked == 0 {
621+
t.Fatalf("didn't verify any blobs")
622+
}
623+
}

0 commit comments

Comments
 (0)