Skip to content

Commit b2df8fe

Browse files
author
Sharon Shabtai
authored
Merge pull request #114 from RachelTucker/5.1-retry-get
OTHER: adding retry around bulk get when a blob only partially transfers
2 parents 045fb30 + dc0d996 commit b2df8fe

File tree

3 files changed

+174
-6
lines changed

3 files changed

+174
-6
lines changed

ds3_integration/utils/testUtils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func VerifyBookContent(t *testing.T, bookName string, actual io.ReadCloser) {
6060
verifyContent(t, expected, actual)
6161
}
6262

63-
func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.ReadCloser) {
63+
func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.Reader) {
6464
f, err := os.Open(filePath)
6565
ds3Testing.AssertNilError(t, err)
6666

@@ -73,7 +73,7 @@ func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64
7373
verifyPartialContent(t, *expected, actual, length)
7474
}
7575

76-
func verifyPartialContent(t *testing.T, expected []byte, actual io.ReadCloser, length int64) {
76+
func verifyPartialContent(t *testing.T, expected []byte, actual io.Reader, length int64) {
7777
content, err := getNBytesFromReader(actual, length)
7878
ds3Testing.AssertNilError(t, err)
7979

helpers/getProducer.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"time"
1313
)
1414

15+
const timesToRetryGettingPartialBlob = 5
16+
1517
type getProducer struct {
1618
JobMasterObjectList *ds3Models.MasterObjectList //MOL from put bulk job creation
1719
GetObjects *[]helperModels.GetObject
@@ -146,10 +148,13 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf
146148
return
147149
}
148150
if bytesWritten != info.blob.Length() {
149-
err = fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length())
150-
producer.strategy.Listeners.Errored(info.blob.Name(), err)
151-
info.channelBuilder.SetFatalError(err)
152-
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
151+
producer.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length())
152+
err := GetRemainingBlob(producer.client, info.bucketName, info.blob, bytesWritten, writer, producer.Logger)
153+
if err != nil {
154+
producer.strategy.Listeners.Errored(info.blob.Name(), err)
155+
info.channelBuilder.SetFatalError(err)
156+
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
157+
}
153158
}
154159
return
155160
}
@@ -166,6 +171,52 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf
166171
}
167172
}
168173

174+
func GetRemainingBlob(client *ds3.Client, bucketName string, blob *helperModels.BlobDescription, amountAlreadyRetrieved int64, writer io.Writer, logger sdk_log.Logger) error {
175+
logger.Debugf("starting retry for fetching partial blob '%s' at offset '%d': amount to retrieve %d", blob.Name(), blob.Offset(), blob.Length() - amountAlreadyRetrieved)
176+
bytesRetrievedSoFar := amountAlreadyRetrieved
177+
timesRetried := 0
178+
rangeEnd := blob.Offset() + blob.Length() -1
179+
for bytesRetrievedSoFar < blob.Length() && timesRetried < timesToRetryGettingPartialBlob {
180+
rangeStart := blob.Offset() + bytesRetrievedSoFar
181+
bytesRetrievedThisRound, err := RetryGettingBlobRange(client, bucketName, blob.Name(), blob.Offset(), rangeStart, rangeEnd, writer, logger)
182+
if err != nil {
183+
logger.Errorf("failed to get object '%s' at offset '%d', range %d=%d attempt %d: %s", blob.Name(), blob.Offset(), rangeStart, rangeEnd, timesRetried, err.Error())
184+
}
185+
bytesRetrievedSoFar+= bytesRetrievedThisRound
186+
timesRetried++
187+
}
188+
189+
if bytesRetrievedSoFar < blob.Length() {
190+
return fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", blob.Name(), blob.Offset(), bytesRetrievedSoFar, blob.Length())
191+
}
192+
return nil
193+
}
194+
195+
func RetryGettingBlobRange(client *ds3.Client, bucketName string, objectName string, blobOffset int64, rangeStart int64, rangeEnd int64, writer io.Writer, logger sdk_log.Logger) (int64, error) {
196+
// perform a naked get call for the rest of the blob that we originally failed to get
197+
partOfBlobToFetch := ds3Models.Range{
198+
Start: rangeStart,
199+
End: rangeEnd,
200+
}
201+
getObjRequest := ds3Models.NewGetObjectRequest(bucketName, objectName).
202+
WithOffset(blobOffset).
203+
WithRanges(partOfBlobToFetch)
204+
205+
getObjResponse, err := client.GetObject(getObjRequest)
206+
if err != nil {
207+
return 0, err
208+
}
209+
defer func() {
210+
err := getObjResponse.Content.Close()
211+
if err != nil {
212+
logger.Warningf("failed to close response body for get object '%s' with range %d-%d: %v", objectName, rangeStart, rangeEnd, err)
213+
}
214+
}()
215+
216+
bytesWritten, err := io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer
217+
return bytesWritten, err
218+
}
219+
169220
// Writes a range of a blob to its destination channel
170221
func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, blobRange ds3Models.Range, content io.Reader) error {
171222
writer, err := channelBuilder.GetChannel(blobRange.Start)

helpers_integration/helpersImpl_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,3 +504,120 @@ func TestBulkPutAndGetLotsOfFiles(t *testing.T) {
504504
t.Errorf("expected to get a BP job ID, but instead got nothing")
505505
}
506506
}
507+
508+
func TestRetryGettingBlobRange(t *testing.T) {
509+
defer testutils.DeleteBucketContents(client, testBucket)
510+
511+
helper := helpers.NewHelpers(client)
512+
strategy := newTestTransferStrategy(t)
513+
514+
// Put a blobbed object to BP
515+
const bigFilePath = LargeBookPath + LargeBookTitle
516+
writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath)
517+
ds3Testing.AssertNilError(t, err)
518+
519+
var writeObjects []helperModels.PutObject
520+
writeObjects = append(writeObjects, *writeObj)
521+
522+
putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy)
523+
ds3Testing.AssertNilError(t, err)
524+
if putJobId == "" {
525+
t.Error("expected to get a BP job ID, but instead got nothing")
526+
}
527+
528+
// Try to get some data from each blob
529+
getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId))
530+
ds3Testing.AssertNilError(t, err)
531+
532+
blobsChecked := 0
533+
for _, curObj := range getJob.MasterObjectList.Objects {
534+
for _, blob := range curObj.Objects {
535+
func() {
536+
// create a temp file for writing the blob to
537+
tempFile, err := ioutil.TempFile("", "go-sdk-test-")
538+
ds3Testing.AssertNilError(t, err)
539+
defer func() {
540+
tempFile.Close()
541+
os.Remove(tempFile.Name())
542+
}()
543+
544+
// get a range of the blob
545+
startRange := blob.Offset+10 // retrieve subset of blob
546+
endRange := blob.Length+blob.Offset-1
547+
bytesWritten, err := helpers.RetryGettingBlobRange(client, testBucket, writeObj.PutObject.Name, blob.Offset, startRange, endRange, tempFile, client.Logger)
548+
ds3Testing.AssertNilError(t, err)
549+
ds3Testing.AssertInt64(t, "bytes written", endRange-startRange+1, bytesWritten)
550+
551+
// verify that retrieved partial blob is correct
552+
err = tempFile.Sync()
553+
ds3Testing.AssertNilError(t, err)
554+
555+
tempFile.Seek(0, 0)
556+
length := endRange-startRange
557+
testutils.VerifyPartialFile(t, bigFilePath, length, startRange, tempFile)
558+
}()
559+
blobsChecked++
560+
}
561+
}
562+
if blobsChecked == 0 {
563+
t.Fatalf("didn't verify any blobs")
564+
}
565+
}
566+
567+
func TestGetRemainingBlob(t *testing.T) {
568+
defer testutils.DeleteBucketContents(client, testBucket)
569+
570+
helper := helpers.NewHelpers(client)
571+
strategy := newTestTransferStrategy(t)
572+
573+
// Put a blobbed object to BP
574+
const bigFilePath = LargeBookPath + LargeBookTitle
575+
writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath)
576+
ds3Testing.AssertNilError(t, err)
577+
578+
var writeObjects []helperModels.PutObject
579+
writeObjects = append(writeObjects, *writeObj)
580+
581+
putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy)
582+
ds3Testing.AssertNilError(t, err)
583+
if putJobId == "" {
584+
t.Error("expected to get a BP job ID, but instead got nothing")
585+
}
586+
587+
// Try to get some data from each blob
588+
getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId))
589+
ds3Testing.AssertNilError(t, err)
590+
591+
blobsChecked := 0
592+
for _, curObj := range getJob.MasterObjectList.Objects {
593+
for _, blob := range curObj.Objects {
594+
func() {
595+
// create a temp file for writing the blob to
596+
tempFile, err := ioutil.TempFile("", "go-sdk-test-")
597+
ds3Testing.AssertNilError(t, err)
598+
defer func() {
599+
tempFile.Close()
600+
os.Remove(tempFile.Name())
601+
}()
602+
603+
// get the remainder of the blob after skipping some bytes
604+
blob := helperModels.NewBlobDescription(*blob.Name, blob.Offset, blob.Length)
605+
var amountToSkip int64 = 10
606+
err = helpers.GetRemainingBlob(client, testBucket, &blob, amountToSkip, tempFile, client.Logger)
607+
ds3Testing.AssertNilError(t, err)
608+
609+
// verify that retrieved partial blob is correct
610+
err = tempFile.Sync()
611+
ds3Testing.AssertNilError(t, err)
612+
613+
tempFile.Seek(0, 0)
614+
length := blob.Length() - amountToSkip
615+
testutils.VerifyPartialFile(t, bigFilePath, length, blob.Offset()+amountToSkip, tempFile)
616+
}()
617+
blobsChecked++
618+
}
619+
}
620+
if blobsChecked == 0 {
621+
t.Fatalf("didn't verify any blobs")
622+
}
623+
}

0 commit comments

Comments
 (0)