Skip to content

Commit af99f0a

Browse files
iamcodingcatntkathole
authored andcommitted
fix/enhancement: bugfix and add retry logic to dynamodb as onlinestore
Signed-off-by: iamcodingcat <[email protected]>
1 parent 8cde460 commit af99f0a

File tree

1 file changed

+87
-13
lines changed

1 file changed

+87
-13
lines changed

go/internal/feast/onlinestore/dynamodbonlinestore.go

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,22 @@ import (
1111
"github.com/feast-dev/feast/go/protos/feast/serving"
1212
"github.com/feast-dev/feast/go/protos/feast/types"
1313
"github.com/roberson-io/mmh3"
14+
"github.com/rs/zerolog/log"
1415
"golang.org/x/sync/errgroup"
1516
"golang.org/x/sync/semaphore"
1617
"google.golang.org/protobuf/proto"
1718
"google.golang.org/protobuf/types/known/timestamppb"
19+
"math/rand"
1820
"runtime"
1921
"sync"
2022
"time"
2123
)
2224

23-
type batchResult struct {
24-
index int
25-
response *dynamodb.BatchGetItemOutput
26-
err error
27-
}
25+
const (
26+
maxRetriesDefault = 5
27+
initialBackoff = 50 * time.Millisecond
28+
maxBackoff = 1 * time.Second
29+
)
2830

2931
type DynamodbOnlineStore struct {
3032
// Feast project name
@@ -38,6 +40,7 @@ type DynamodbOnlineStore struct {
3840
// dynamodb configuration
3941
consistentRead *bool
4042
batchSize *int
43+
maxRetries *int
4144
}
4245

4346
func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*DynamodbOnlineStore, error) {
@@ -70,6 +73,14 @@ func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineS
7073
}
7174
store.batchSize = &batchSize
7275

76+
var maxRetries int
77+
if maxRetriesFloat, ok := onlineStoreConfig["max_retries"].(float64); ok {
78+
maxRetries = int(maxRetriesFloat)
79+
} else {
80+
maxRetries = maxRetriesDefault
81+
}
82+
store.maxRetries = &maxRetries
83+
7384
return &store, nil
7485
}
7586

@@ -79,12 +90,12 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
7990
return nil, ctx.Err()
8091
}
8192

93+
maxRetries := *d.maxRetries
8294
results := make([][]FeatureData, len(entityKeys))
8395

8496
// serialize entity key into entity hash id
8597
entityIndexMap := make(map[string]int)
8698
entityIds := make([]string, 0, len(entityKeys))
87-
unprocessedEntityIds := make(map[string]bool)
8899
for i, entityKey := range entityKeys {
89100
serKey, err := serializeEntityKey(entityKey, d.config.EntityKeySerializationVersion)
90101
if err != nil {
@@ -93,7 +104,6 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
93104
entityId := hex.EncodeToString(mmh3.Hashx64_128(*serKey, 0))
94105
entityIds = append(entityIds, entityId)
95106
entityIndexMap[entityId] = i
96-
unprocessedEntityIds[entityId] = false
97107
}
98108

99109
// metadata from feature views, feature names
@@ -116,6 +126,11 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
116126
for featureViewName, featureNames := range featureMap {
117127
tableName := fmt.Sprintf("%s.%s", d.project, featureViewName)
118128

129+
unprocessedEntityIdsFeatureView := make(map[string]bool)
130+
for _, entityId := range entityIds {
131+
unprocessedEntityIdsFeatureView[entityId] = true
132+
}
133+
119134
var batchGetItemInputs []*dynamodb.BatchGetItemInput
120135
batchSize := *d.batchSize
121136
for i := 0; i < len(entityIds); i += batchSize {
@@ -151,28 +166,87 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
151166
}
152167
defer sem.Release(1)
153168

169+
var Responses []map[string]dtypes.AttributeValue
170+
var unprocessedKeys dtypes.KeysAndAttributes
171+
172+
// response from initial request to dynamodb
154173
resp, err := d.client.BatchGetItem(ctx, batchGetItemInput)
155174
if err != nil {
156175
return err
157176
}
177+
if len(resp.Responses[tableName]) > 0 {
178+
Responses = append(Responses, resp.Responses[tableName]...)
179+
}
180+
if len(resp.UnprocessedKeys[tableName].Keys) > 0 {
181+
unprocessedKeys = resp.UnprocessedKeys[tableName]
182+
}
183+
// retry about unprocessed key from initial request to dynamodb
184+
retries := 0
185+
backoff := initialBackoff
186+
jitterRand := rand.New(rand.NewSource(time.Now().UnixNano()))
187+
for len(unprocessedKeys.Keys) > 0 && retries < maxRetries {
188+
log.Info().Msgf("%d retry using exponential backoff to dynamodb", retries+1)
189+
if err := ctx.Err(); err != nil {
190+
return err
191+
}
192+
// jitter before retrying
193+
jitter := time.Duration(jitterRand.Intn(100)) * time.Millisecond
194+
waitDuration := backoff + jitter
195+
timer := time.NewTimer(waitDuration)
196+
select {
197+
case <-ctx.Done():
198+
timer.Stop()
199+
return ctx.Err()
200+
case <-timer.C:
201+
}
202+
203+
retries++
204+
backoff *= 2
205+
if backoff > maxBackoff {
206+
backoff = maxBackoff
207+
}
208+
retryBatchGetItemInput := &dynamodb.BatchGetItemInput{
209+
RequestItems: map[string]dtypes.KeysAndAttributes{
210+
tableName: unprocessedKeys,
211+
},
212+
}
213+
retryResp, err := d.client.BatchGetItem(ctx, retryBatchGetItemInput)
214+
if err != nil {
215+
log.Info().Msgf("BatchGetItem retry attempt(%d) failed for table %s. err: %v\n", retries, tableName, err)
216+
continue
217+
}
218+
if len(retryResp.Responses[tableName]) > 0 {
219+
Responses = append(Responses, retryResp.Responses[tableName]...)
220+
}
221+
// check unprocessed key in retried response again
222+
if len(retryResp.UnprocessedKeys[tableName].Keys) > 0 {
223+
unprocessedKeys = retryResp.UnprocessedKeys[tableName]
224+
} else {
225+
unprocessedKeys = dtypes.KeysAndAttributes{}
226+
}
227+
}
228+
229+
if len(unprocessedKeys.Keys) > 0 {
230+
return fmt.Errorf("failed to process %d keys from table %s after %d retries. keys=%+v\n", len(unprocessedKeys.Keys), tableName, maxRetries, unprocessedKeys.Keys)
231+
}
158232

159233
// in case there is no entity id of a feature view in dynamodb
160-
batchSize := len(resp.Responses[tableName])
234+
batchSize := len(Responses)
161235
if batchSize == 0 {
162236
return nil
163237
}
164238

165239
// process response from dynamodb
166240
for j := 0; j < batchSize; j++ {
167-
entityId := resp.Responses[tableName][j]["entity_id"].(*dtypes.AttributeValueMemberS).Value
168-
timestampString := resp.Responses[tableName][j]["event_ts"].(*dtypes.AttributeValueMemberS).Value
241+
entityId := Responses[j]["entity_id"].(*dtypes.AttributeValueMemberS).Value
242+
timestampString := Responses[j]["event_ts"].(*dtypes.AttributeValueMemberS).Value
169243
t, err := time.Parse("2006-01-02 15:04:05-07:00", timestampString)
170244
if err != nil {
171245
return err
172246
}
173247
timeStamp := timestamppb.New(t)
174248

175-
featureValues := resp.Responses[tableName][j]["values"].(*dtypes.AttributeValueMemberM).Value
249+
featureValues := Responses[j]["values"].(*dtypes.AttributeValueMemberM).Value
176250
entityIndex := entityIndexMap[entityId]
177251

178252
for _, featureName := range featureNames {
@@ -192,7 +266,7 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
192266
}
193267

194268
mu.Lock()
195-
delete(unprocessedEntityIds, entityId)
269+
delete(unprocessedEntityIdsFeatureView, entityId)
196270
mu.Unlock()
197271
}
198272
return nil
@@ -204,7 +278,7 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
204278

205279
// process null imputation for entity ids that don't exist in dynamodb
206280
currentTime := timestamppb.Now() // TODO: should use a different timestamp?
207-
for entityId, _ := range unprocessedEntityIds {
281+
for entityId, _ := range unprocessedEntityIdsFeatureView {
208282
entityIndex := entityIndexMap[entityId]
209283
for _, featureName := range featureNames {
210284
featureIndex := featureNamesIndex[featureName]

0 commit comments

Comments
 (0)