Skip to content

Commit 990cc35

Browse files
authored
Merge pull request #123 from RachelTucker/get-in-order-larger-than-cache
OTHER: create helper to retrieve objects in-order that exceed available cache
2 parents 6f15864 + b3e8c9c commit 990cc35

File tree

4 files changed

+414
-4
lines changed

4 files changed

+414
-4
lines changed

helpers/getTransfernator.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func newBulkGetRequest(bucketName string, readObjects *[]helperModels.GetObject,
5656
func createPartialGetObjects(getObject helperModels.GetObject) []ds3Models.Ds3GetObject {
5757
// handle getting the entire object
5858
if len(getObject.Ranges) == 0 {
59-
return []ds3Models.Ds3GetObject { { Name:getObject.Name }, }
59+
return []ds3Models.Ds3GetObject { { Name:getObject.Name } }
6060
}
6161
// handle partial object retrieval
6262
var partialObjects []ds3Models.Ds3GetObject
@@ -123,11 +123,10 @@ func (transceiver *getTransceiver) transfer() (string, error) {
123123
consumer := newConsumer(&queue, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers(), doneNotifier)
124124

125125
// Wait for completion of producer-consumer goroutines
126-
var aggErr ds3Models.AggregateError
127126
waitGroup.Add(1) // adding producer and consumer goroutines to wait group
128127
go consumer.run()
129128
err = producer.run() // producer will add to waitGroup for every blob retrieval added to queue, and each transfer performed will decrement from waitGroup
130129
waitGroup.Wait()
131130

132-
return bulkGetResponse.MasterObjectList.JobId, aggErr.GetErrors()
131+
return bulkGetResponse.MasterObjectList.JobId, nil
133132
}

helpers/helpersImpl.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package helpers
22

33
import (
4+
"fmt"
45
"github.com/SpectraLogic/ds3_go_sdk/ds3"
6+
"github.com/SpectraLogic/ds3_go_sdk/ds3/models"
57
helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models"
8+
"github.com/SpectraLogic/ds3_go_sdk/helpers/ranges"
9+
"net/http"
10+
"sort"
11+
"strings"
612
)
713

814
type HelperInterface interface {
@@ -20,6 +26,12 @@ type HelperInterface interface {
2026
// A job ID will be returned if a BP job was successfully created, regardless of
2127
// whether additional errors occur.
2228
GetObjects(bucketName string, objects []helperModels.GetObject, strategy ReadTransferStrategy) (string, error)
29+
30+
// Retrieves the list of objects from the specified bucket on the Black Pearl.
31+
// If a get job cannot be created due to insufficient cache space to fulfill an
32+
// IN_ORDER processing guarantee, then the job is split across multiple BP jobs.
33+
// This allows for the IN_ORDER retrieval of objects that exceed available cache space.
34+
GetObjectsSpanningJobs(bucketName string, objects []helperModels.GetObject, strategy ReadTransferStrategy) ([]string, error)
2335
}
2436

2537
type HelperImpl struct {
@@ -49,3 +61,118 @@ func (helper *HelperImpl) GetObjects(bucketName string, objects []helperModels.G
4961
transceiver := newGetTransceiver(bucketName, &objects, &strategy, helper.client)
5062
return transceiver.transfer()
5163
}
64+
65+
func (helper *HelperImpl) GetObjectsSpanningJobs(bucketName string, objects []helperModels.GetObject, strategy ReadTransferStrategy) ([]string, error) {
66+
// Attempt to send the entire job at once
67+
jobId, err := helper.GetObjects(bucketName, objects, strategy)
68+
if err == nil {
69+
// success
70+
return []string{jobId}, nil
71+
} else if !helper.isCannotPreAllocateError(err) {
72+
// error not related to pre-allocation
73+
return nil, err
74+
}
75+
76+
// Retrieve each file individually
77+
var jobIds []string
78+
for _, getObject := range objects {
79+
fileJobIds := helper.retrieveIndividualFile(bucketName, getObject, strategy)
80+
jobIds = append(jobIds, fileJobIds...)
81+
}
82+
return jobIds, nil
83+
}
84+
85+
func (helper *HelperImpl) isCannotPreAllocateError(err error) bool {
86+
badStatusErr, ok := err.(*models.BadStatusCodeError)
87+
if !ok || badStatusErr.ActualStatusCode != http.StatusServiceUnavailable {
88+
// failed to create bulk get for reason other than 503
89+
return false
90+
}
91+
92+
if strings.Contains(badStatusErr.Error(), "GET jobs that have a chunkClientProcessingOrderGuarantee of IN_ORDER must be entirely pre-allocated. Cannot pre-allocate") {
93+
return true
94+
}
95+
return false
96+
}
97+
98+
func (helper *HelperImpl) retrieveIndividualFile(bucketName string, getObject helperModels.GetObject, strategy ReadTransferStrategy) []string {
99+
// Get the blob offsets
100+
headObject, err := helper.client.HeadObject(models.NewHeadObjectRequest(bucketName, getObject.Name))
101+
if err != nil {
102+
getObject.ChannelBuilder.SetFatalError(err)
103+
return nil
104+
}
105+
var offsets []int64
106+
for offset := range headObject.BlobChecksums {
107+
offsets = append(offsets, offset)
108+
}
109+
110+
sort.Slice(offsets, func(i, j int) bool {
111+
return offsets[i] < offsets[j]
112+
})
113+
114+
// Get the object size
115+
objectsDetails, err := helper.client.GetObjectsWithFullDetailsSpectraS3(
116+
models.NewGetObjectsWithFullDetailsSpectraS3Request().
117+
WithBucketId(bucketName).WithName(getObject.Name).
118+
WithLatest(true))
119+
120+
if err != nil {
121+
getObject.ChannelBuilder.SetFatalError(err)
122+
return nil
123+
} else if len(objectsDetails.DetailedS3ObjectList.DetailedS3Objects) < 1 {
124+
getObject.ChannelBuilder.SetFatalError(fmt.Errorf("failed to get object details"))
125+
return nil
126+
}
127+
128+
// Retrieve the object one blob at a time in order
129+
objectCopy := getObject
130+
objectEnd := objectsDetails.DetailedS3ObjectList.DetailedS3Objects[0].Size - 1
131+
if len(objectCopy.Ranges) == 0 {
132+
// If the user didn't specify a range, add a range that covers the entire file
133+
// so that we can use the blobRangeFinder to tell us what ranges to specify.
134+
objectCopy.Ranges = append(objectCopy.Ranges, models.Range{Start: 0, End: objectEnd})
135+
}
136+
blobFinder := ranges.NewBlobRangeFinder(&[]helperModels.GetObject{objectCopy})
137+
138+
var jobIds []string
139+
for i, offset := range offsets {
140+
var blobEnd int64
141+
if i+1 < len(offsets) {
142+
blobEnd = offsets[i+1]-1
143+
} else {
144+
blobEnd = objectEnd
145+
}
146+
length := blobEnd - offset + 1
147+
blobRanges := blobFinder.GetRanges(objectCopy.Name, offset, length)
148+
if len(blobRanges) == 0 {
149+
// This blob does not need to be retrieved
150+
continue
151+
}
152+
153+
jobId, err := helper.retrieveBlob(bucketName, getObject, blobRanges, strategy)
154+
if err != nil {
155+
getObject.ChannelBuilder.SetFatalError(err)
156+
return nil
157+
}
158+
jobIds = append(jobIds, jobId)
159+
160+
if objectCopy.ChannelBuilder.HasFatalError() {
161+
// Failed to retrieve a portion of the file, don't bother with the rest
162+
break
163+
}
164+
}
165+
return jobIds
166+
}
167+
168+
func (helper *HelperImpl) retrieveBlob(bucketName string, getObject helperModels.GetObject, blobRanges []models.Range, strategy ReadTransferStrategy) (string, error) {
169+
// Since there is only one blob being retrieved, create the job with Order-Guarantee=None so that the
170+
// job will wait if cache needs to be reclaimed on the BP before the chunk can be allocated.
171+
getObjectBlob := getObject
172+
getObjectBlob.Ranges = blobRanges
173+
174+
strategyCopy := strategy
175+
strategyCopy.Options.ChunkClientProcessingOrderGuarantee = models.JOB_CHUNK_CLIENT_PROCESSING_ORDER_GUARANTEE_NONE
176+
177+
return helper.GetObjects(bucketName, []helperModels.GetObject{getObjectBlob}, strategyCopy)
178+
}

0 commit comments

Comments
 (0)