Skip to content

Commit 0fbb432

Browse files
committed
Fix rate limiting
1 parent 3b96087 commit 0fbb432

File tree

5 files changed

+93
-133
lines changed

5 files changed

+93
-133
lines changed

main.go

Lines changed: 23 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ package main
2525
import (
2626
"encoding/json"
2727
"errors"
28+
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
29+
"strings"
30+
2831
//"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
2932
"github.com/thumbtack/go/lib/metrics"
3033
//"github.com/thumbtack/go/lib/monitoring"
@@ -33,7 +36,6 @@ import (
3336
_ "net/http/pprof"
3437
"os"
3538
"strconv"
36-
"strings"
3739
"sync"
3840
"time"
3941

@@ -49,18 +51,16 @@ const (
4951
paramCheckpointTable = "CHECKPOINT_DDB_TABLE"
5052
paramCheckpointRegion = "CHECKPOINT_DDB_REGION"
5153
paramCheckpointEndpoint = "CHECKPOINT_DDB_ENDPOINT"
52-
paramMaxRetries = "MAX_RETRIES"
5354
paramVerbose = "VERBOSE"
5455
paramPort = "PORT"
5556
paramConfigDir = "CONFIG_DIR"
56-
defaultConfigMaxRetries = 3
57+
maxRetries = 3
5758
)
5859

5960
var logger = logging.New()
6061
var ddbTable = os.Getenv(paramCheckpointTable)
6162
var ddbRegion = os.Getenv(paramCheckpointRegion)
6263
var ddbEndpoint = os.Getenv(paramCheckpointEndpoint)
63-
var maxRetries = defaultConfigMaxRetries
6464
var ddbClient = ddbConfigConnect(ddbRegion, ddbEndpoint, maxRetries, *logger)
6565
var metricsClient = newMetricsClient()
6666

@@ -73,16 +73,12 @@ type config struct {
7373
DstEndpoint string `json:"dst_endpoint"`
7474
SrcEnv string `json:"src_env"`
7575
DstEnv string `json:"dst_env"`
76-
SrcAccount string `json:"src_account"`
77-
DstAccount string `json:"dst_account"`
78-
MaxConnectRetries int `json:"max_connect_retries"`
7976
ReadWorkers int `json:"read_workers"`
8077
WriteWorkers int `json:"write_workers"`
8178
ReadQps int64 `json:"read_qps"`
8279
WriteQps int64 `json:"write_qps"`
8380
UpdateCheckpointThreshold int `json:"update_checkpoint_threshold"`
8481
EnableStreaming *bool `json:"enable_streaming"`
85-
TruncateTable bool `json:"truncate_table"`
8682
}
8783

8884
// Config file is read and dumped into this struct
@@ -101,13 +97,8 @@ type syncState struct {
10197
timestamp time.Time
10298
}
10399

104-
func getRoleArn(env string, account string) (string) {
105-
var roleType = ""
106-
if strings.ToLower(account) != "admin" {
107-
roleType = "NEW_" + strings.ToUpper(env) + "_ROLE"
108-
} else {
109-
roleType = "OLD_" + strings.ToUpper(env) + "_ROLE"
110-
}
100+
func getRoleArn(env string) string {
101+
roleType := strings.ToUpper(env) + "_ROLE"
111102
logger.WithFields(logging.Fields{"Roletype": roleType}).Debug()
112103
return os.Getenv(roleType)
113104
}
@@ -130,7 +121,7 @@ func NewSyncState(tableConfig config) *syncState {
130121
aws.NewConfig().
131122
WithRegion(tableConfig.SrcRegion).
132123
WithEndpoint(tableConfig.SrcEndpoint).
133-
WithMaxRetries(tableConfig.MaxConnectRetries).
124+
WithMaxRetries(maxRetries).
134125
WithHTTPClient(httpClient),
135126
))
136127

@@ -139,10 +130,10 @@ func NewSyncState(tableConfig config) *syncState {
139130
aws.NewConfig().
140131
WithRegion(tableConfig.DstRegion).
141132
WithEndpoint(tableConfig.DstEndpoint).
142-
WithMaxRetries(tableConfig.MaxConnectRetries),
133+
WithMaxRetries(maxRetries),
143134
))
144-
srcRoleArn := getRoleArn(tableConfig.SrcEnv, tableConfig.SrcAccount)
145-
dstRoleArn := getRoleArn(tableConfig.DstEnv, tableConfig.DstAccount)
135+
srcRoleArn := getRoleArn(tableConfig.SrcEnv)
136+
dstRoleArn := getRoleArn(tableConfig.DstEnv)
146137

147138
if srcRoleArn == "" || dstRoleArn == "" {
148139
logger.WithFields(logging.Fields{}).
@@ -154,16 +145,12 @@ func NewSyncState(tableConfig config) *syncState {
154145
"Src Role Arn": srcRoleArn,
155146
"Dst Role Arn": dstRoleArn}).Debug("Role ARN")
156147

157-
/*srcCreds := stscreds.NewCredentials(srcSess, srcRoleArn)
148+
srcCreds := stscreds.NewCredentials(srcSess, srcRoleArn)
158149
dstCreds := stscreds.NewCredentials(dstSess, dstRoleArn)
159150

160151
srcDynamo = dynamodb.New(srcSess, &aws.Config{Credentials: srcCreds})
161152
dstDynamo = dynamodb.New(dstSess, &aws.Config{Credentials: dstCreds})
162-
stream = dynamodbstreams.New(srcSess, &aws.Config{Credentials: srcCreds})*/
163-
164-
srcDynamo = dynamodb.New(srcSess, &aws.Config{})
165-
dstDynamo = dynamodb.New(dstSess, &aws.Config{})
166-
stream = dynamodbstreams.New(srcSess, &aws.Config{})
153+
stream = dynamodbstreams.New(srcSess, &aws.Config{Credentials: srcCreds})
167154

168155
return &syncState{
169156
tableConfig: tableConfig,
@@ -238,14 +225,6 @@ func NewApp() *appConfig {
238225
logger.SetLevel(logging.DebugLevel)
239226
}
240227
}
241-
if os.Getenv(paramMaxRetries) != "" {
242-
maxRetries, err = strconv.Atoi(os.Getenv(paramMaxRetries))
243-
if err != nil {
244-
logger.WithFields(logging.Fields{
245-
"error": err,
246-
}).Fatal("Failed to parse " + paramMaxRetries)
247-
}
248-
}
249228

250229
configFile = os.Getenv(paramConfigDir) + "/config.json"
251230
tableConfig, err := readConfigFile(configFile, *logger)
@@ -298,17 +277,6 @@ func setDefaults(tableConfig []config) ([]config, error) {
298277
continue
299278
}
300279

301-
if tableConfig[i].SrcAccount == "" {
302-
tableConfig[i].SrcAccount = "admin"
303-
}
304-
305-
if tableConfig[i].DstAccount == "" {
306-
tableConfig[i].DstAccount = "admin"
307-
}
308-
309-
if tableConfig[i].MaxConnectRetries == 0 {
310-
tableConfig[i].MaxConnectRetries = 3
311-
}
312280

313281
if tableConfig[i].ReadQps == 0 {
314282
tableConfig[i].ReadQps = 500
@@ -354,18 +322,22 @@ func (sync *syncState) isFreshStart(key primaryKey) bool {
354322
return false
355323
}
356324

357-
func getPrimaryKey(sync config) (primaryKey) {
325+
func getPrimaryKey(sync config) primaryKey {
358326
key := primaryKey{}
359-
if sync.SrcAccount == "admin" {
327+
delim := "_"
328+
329+
if !strings.Contains(sync.SrcEnv, " new") {
360330
key.sourceTable = sync.SrcTable
361331
} else {
362-
key.sourceTable = sync.SrcTable + ".account." + sync.SrcAccount
332+
key.sourceTable = sync.SrcTable + ".account." + strings.Split(sync.SrcEnv, delim)[0]
363333
}
364-
if sync.DstAccount == "admin" {
334+
335+
if !strings.Contains(sync.DstEnv, "new") {
365336
key.dstTable = sync.DstTable
366337
} else {
367-
key.dstTable = sync.DstTable + ".account." + sync.DstAccount
338+
key.dstTable = sync.DstTable + ".account." + strings.Split(sync.DstEnv, delim)[0]
368339
}
340+
369341
return key
370342
}
371343

@@ -389,8 +361,9 @@ func main() {
389361
"Source Table": key.sourceTable,
390362
"Destination Table": key.dstTable,
391363
}).Error("Error in connecting to tables. Check config file")
392-
364+
return
393365
}
366+
394367
syncWorker.readCheckpoint()
395368

396369
// Call a go routine to replicate for each key

replicate.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ func (sync *syncState) replicate(quit <-chan bool, key primaryKey) {
7373
// Once all the workers in the readWorker group are done,
7474
// we close the channel, and wait for the writeWorker group
7575
// to finish
76-
func (state *syncState) copyTable(key primaryKey) (error){
76+
func (state *syncState) copyTable(key primaryKey) error {
77+
var isSourceThroughputChanged = false
78+
var isDstThroughputChanged = false
79+
7780
logger.WithFields(logging.Fields{
7881
"Source Table": key.sourceTable,
7982
"Destination Table": key.dstTable,
@@ -84,8 +87,6 @@ func (state *syncState) copyTable(key primaryKey) (error){
8487
// we are done copying
8588
sourceCapacity := state.getCapacity(state.tableConfig.SrcTable, state.srcDynamo)
8689
dstCapacity := state.getCapacity(state.tableConfig.DstTable, state.dstDynamo)
87-
var isSourceThroughputChanged = false
88-
var isDstThroughputChanged = false
8990
srcDynamo := state.srcDynamo
9091
dstDynamo := state.dstDynamo
9192

@@ -124,14 +125,14 @@ func (state *syncState) copyTable(key primaryKey) (error){
124125

125126
writerWG.Add(writeWorkers)
126127

127-
rlDst := rate.NewLimiter(rate.Limit(state.tableConfig.WriteQps), maxBatchSize)
128+
rl := rate.NewLimiter(rate.Limit(state.tableConfig.WriteQps), int(state.tableConfig.WriteQps))
128129
for i := 0; i < writeWorkers; i++ {
129130
logger.WithFields(logging.Fields{
130131
"Write Worker": i,
131132
"Source Table": key.sourceTable,
132133
"Destination Table": key.dstTable,
133134
}).Debug("Starting copy table write worker..")
134-
go state.writeTable(key, items, &writerWG, i, *rlDst)
135+
go state.writeTable(key, items, &writerWG, i, rl)
135136
}
136137
readerWG.Add(readWorkers)
137138
for i := 0; i < readWorkers; i++ {
@@ -213,18 +214,16 @@ func (sync *syncState) streamSync(key primaryKey, streamArn string) error {
213214
if lastEvaluatedShardId != "" {
214215
input.ExclusiveStartShardId = aws.String(lastEvaluatedShardId)
215216
}
216-
maxConnectRetries := sync.tableConfig.MaxConnectRetries
217217

218-
for i := 0; i < maxConnectRetries; i++ {
218+
for i := 1; i <= maxRetries; i++ {
219219
result, err = sync.stream.DescribeStream(input)
220-
if err != nil {
221-
if i == maxConnectRetries-1 {
222-
return err
223-
}
224-
backoff(i, "Describe Stream")
225-
} else {
220+
if err == nil {
226221
break
227222
}
223+
if i == maxRetries {
224+
return err
225+
}
226+
backoff(i, "Describe Stream")
228227
}
229228

230229
numShards += len(result.StreamDescription.Shards)

stream.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,11 @@ func (sync *syncState) shardSyncStart(key primaryKey,
6262
}
6363

6464
shardIteratorInput := sync.getShardIteratorInput(key, *shardId, streamArn)
65-
maxConnectRetries := sync.tableConfig.MaxConnectRetries
6665

67-
for i := 0; i < maxConnectRetries; i++ {
66+
for i := 1; i <= maxRetries; i++ {
6867
iterator, err = sync.stream.GetShardIterator(shardIteratorInput)
6968
if err != nil {
70-
if i == maxConnectRetries-1 {
69+
if i == maxRetries {
7170
logger.WithFields(logging.Fields{
7271
"Shard Id": *shardId,
7372
"Error": err,
@@ -87,19 +86,13 @@ func (sync *syncState) shardSyncStart(key primaryKey,
8786
// when nil, the shard has been closed, and the requested iterator
8887
// will not return any more data
8988
for shardIterator != nil {
90-
for i := 0; i < maxConnectRetries; i++ {
91-
logger.WithFields(logging.Fields{
92-
"Source Table": key.sourceTable,
93-
"Destination Table": key.dstTable,
94-
"Shard Id": *shardId}).Debug("Calling GetRecords")
89+
for i := 1; i <= maxRetries; i++ {
90+
9591
records, err = sync.stream.GetRecords(
9692
&dynamodbstreams.GetRecordsInput{ShardIterator: shardIterator})
97-
logger.WithFields(logging.Fields{
98-
"Source Table": key.sourceTable,
99-
"Destination Table": key.dstTable,
100-
"Shard Id": *shardId}).Debug("Returned from GetRecords")
93+
10194
if err != nil {
102-
if i == maxConnectRetries-1 {
95+
if i == maxRetries {
10396
logger.WithFields(logging.Fields{
10497
"Shard Id": *shardId,
10598
"Error": err,
@@ -209,13 +202,12 @@ func (sync *syncState) writeRecords(
209202
// Insert this record in the dst table
210203
func (sync *syncState) insertRecord(item map[string]*dynamodb.AttributeValue, key primaryKey) error {
211204
var err error
212-
maxConnectRetries := sync.tableConfig.MaxConnectRetries
213205

214206
input := &dynamodb.PutItemInput{
215207
Item: item,
216208
TableName: aws.String(sync.tableConfig.DstTable),
217209
}
218-
for i := 0; i < maxConnectRetries; i++ {
210+
for i := 1; i <= maxRetries; i++ {
219211
_, err = sync.dstDynamo.PutItem(input)
220212
if err == nil {
221213
return nil
@@ -229,13 +221,12 @@ func (sync *syncState) insertRecord(item map[string]*dynamodb.AttributeValue, ke
229221
// Remove this record from the dst table
230222
func (sync *syncState) removeRecord(item map[string]*dynamodb.AttributeValue, key primaryKey) error {
231223
var err error
232-
maxConnectRetries := sync.tableConfig.MaxConnectRetries
233224

234225
input := &dynamodb.DeleteItemInput{
235226
Key: item,
236227
TableName: aws.String(sync.tableConfig.DstTable),
237228
}
238-
for i := 0; i < maxConnectRetries; i++ {
229+
for i := 0; i < maxRetries; i++ {
239230
_, err = sync.dstDynamo.DeleteItem(input)
240231
if err == nil {
241232
return nil

sync_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,17 @@ func setupTest(sync *syncState, key primaryKey) {
135135

136136
func (sync *syncState) scanTable(key primaryKey, name string) {
137137
lastEvaluatedKey := make(map[string]*dynamodb.AttributeValue, 0)
138-
maxConnectRetries := sync.tableConfig.MaxConnectRetries
139138
items := make([]map[string]*dynamodb.AttributeValue, 0)
140139
input := &dynamodb.ScanInput{TableName: aws.String(name)}
141140

142141
for {
143142
if len(lastEvaluatedKey) > 0 {
144143
input.ExclusiveStartKey = lastEvaluatedKey
145144
}
146-
for i := 0; i < maxConnectRetries; i++ {
145+
for i := 1; i <= maxRetries; i++ {
147146
result, err := sync.srcDynamo.Scan(input)
148147
if err != nil {
149-
if i == maxConnectRetries-1 {
148+
if i == maxRetries {
150149
return
151150
}
152151
backoff(i, "Scan")
@@ -289,7 +288,6 @@ func TestAll(t *testing.T) {
289288
os.Setenv(paramCheckpointRegion, "us-west-2")
290289
os.Setenv(paramCheckpointTable, "local-dynamodb-sync.checkpoint")
291290
os.Setenv(paramCheckpointEndpoint, "http://localhost:8000")
292-
os.Setenv(paramMaxRetries, "3")
293291

294292
app := NewApp()
295293
checkpointDynamo := dynamodb.New(session.Must(
@@ -322,7 +320,6 @@ func TestAll(t *testing.T) {
322320
//main()
323321

324322
for i := 0; i < len(syncWorkers); i++ {
325-
syncWorkers[i].loadCheckpointTable()
326323
syncWorkers[i].testStreamSyncWait()
327324
syncWorkers[i].testExpireShards()
328325
}

0 commit comments

Comments
 (0)