Skip to content

Commit 36abd1d

Browse files
authored
Merge pull request #57 from rewardStyle/feature/limit-reset-timer
add reset timer
2 parents 37a22eb + 966f2a1 commit 36abd1d

File tree

15 files changed

+160
-28
lines changed

15 files changed

+160
-28
lines changed

kinesis.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ const (
2020
statusActive
2121
staticUpdating
2222

23+
kinesisWritesPerSec int = 1000
24+
kinesisReadsPerSec int = 5
25+
2326
// http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#API_GetRecords_RequestSyntax
2427
defaultLimit int = 10000
2528

26-
kinesisWritesPerSec int = 1000
27-
kinesisReadsPerSec int = 5
29+
limitResetDuration time.Duration = 1 * time.Minute
2830

2931
// Timeout TODO
3032
Timeout = 60
@@ -61,17 +63,19 @@ type kinesis struct {
6163
sequenceNumber string
6264
sequenceNumberMu sync.Mutex
6365

64-
limit int
65-
origLimit int
66-
limitMu sync.Mutex
67-
client gokinesis.KinesisClient
66+
limit int
67+
origLimit int
68+
limitMu sync.Mutex
69+
limitReset *time.Timer
70+
limitResetMu sync.Mutex
71+
72+
client gokinesis.KinesisClient
6873

6974
msgCount int64
7075
errCount int64
7176
}
7277

7378
func (k *kinesis) init(stream, shard, shardIteratorType, accessKey, secretKey, region string) (*kinesis, error) {
74-
7579
auth, err := authenticate(accessKey, secretKey)
7680
k = &kinesis{
7781
stream: stream,
@@ -98,7 +102,10 @@ func (k *kinesis) args() *gokinesis.RequestArgs {
98102
args.Add("StreamName", k.stream)
99103
args.Add("ShardId", k.shard)
100104
args.Add("ShardIterator", k.shardIterator)
105+
106+
k.limitMu.Lock()
101107
args.Add("Limit", k.limit)
108+
k.limitMu.Unlock()
102109

103110
if k.sequenceNumber != "" {
104111
args.Add("StartingSequenceNumber", k.sequenceNumber)
@@ -200,12 +207,41 @@ func getLock(sem chan bool) {
200207

201208
func (k *kinesis) decreaseRequestLimit() {
202209
k.limitMu.Lock()
210+
203211
k.limit = k.limit >> 1
212+
213+
if k.limit <= 0 {
214+
k.limit = 1
215+
}
216+
204217
k.limitMu.Unlock()
205218
}
206219

220+
func (k *kinesis) limitResetter() {
221+
<-k.limitReset.C
222+
k.resetRequestLimit()
223+
}
224+
207225
func (k *kinesis) resetRequestLimit() {
208226
k.limitMu.Lock()
209227
k.limit = k.origLimit
210228
k.limitMu.Unlock()
211229
}
230+
231+
func (k *kinesis) startRequestLimitReset(dur time.Duration) {
232+
k.limitResetMu.Lock()
233+
234+
// Init or reset the limitReset ticker
235+
if k.limitReset == nil {
236+
k.limitReset = time.NewTimer(dur)
237+
} else {
238+
if !k.limitReset.Stop() {
239+
<-k.limitReset.C
240+
}
241+
k.limitReset.Reset(dur)
242+
}
243+
244+
go k.limitResetter()
245+
246+
k.limitResetMu.Unlock()
247+
}

listener.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type Listener struct {
4343
consumingMu sync.Mutex
4444
messageMu sync.Mutex
4545

46+
paused bool
47+
pauseMu sync.Mutex
48+
4649
errors chan error
4750
messages chan *Message
4851
interrupts chan os.Signal
@@ -208,6 +211,28 @@ retry:
208211
l.messageMu.Unlock()
209212
}
210213

214+
func (l *Listener) pause(duration time.Duration) {
215+
l.pauseMu.Lock()
216+
217+
if l.paused == false {
218+
go func() {
219+
<-time.After(duration)
220+
l.pauseMu.Lock()
221+
l.paused = false
222+
l.pauseMu.Unlock()
223+
}()
224+
}
225+
226+
l.paused = true
227+
l.pauseMu.Unlock()
228+
}
229+
230+
func (l *Listener) isPaused() bool {
231+
l.pauseMu.Lock()
232+
defer l.pauseMu.Unlock()
233+
return l.paused
234+
}
235+
211236
func (l *Listener) getRecords(GsiCounter int, GsiTimer time.Time) (*gokinesis.GetRecordsResp, error) {
212237
// args() will give us the shard iterator and type as well as the shard id
213238
response, err := l.client.GetRecords(l.args())
@@ -220,9 +245,19 @@ func (l *Listener) getRecords(GsiCounter int, GsiTimer time.Time) (*gokinesis.Ge
220245
// the AWS General Reference.
221246
if strings.Contains(err.Error(), `ProvisionedThroughputExceededException`) {
222247
log.Println("Received ProvisionedThroughputExceededException. Temporarily decreasing request limit and retrying.")
248+
223249
log.Println("Previous Request Limit: ", l.limit)
224250
l.decreaseRequestLimit()
225251
log.Println("New Request Limit: ", l.limit)
252+
253+
// The maximum size of data that GetRecords can return is 10 MB.
254+
// If a call returns this amount of data, subsequent calls made
255+
// within the next 5 seconds throw ProvisionedThroughputExceededException.
256+
// If there is insufficient provisioned throughput on the stream,
257+
/// subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.
258+
l.pause(5 * time.Second)
259+
260+
go l.startRequestLimitReset(limitResetDuration)
226261
}
227262
}
228263

@@ -282,7 +317,6 @@ func (l *Listener) consume() {
282317

283318
if response != nil {
284319
l.setShardIterator(response.NextShardIterator)
285-
l.resetRequestLimit()
286320

287321
if len(response.Records) > 0 {
288322
for _, record := range response.Records {
@@ -427,6 +461,9 @@ func (l *Listener) handleError(err error) {
427461
// Kinesis allows five read ops per second per shard
428462
// http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html
429463
func (l *Listener) throttle(counter *int, timer *time.Time) {
464+
for l.isPaused() {
465+
runtime.Gosched()
466+
}
430467
// If a second has passed since the last timer start, reset the timer
431468
if time.Now().After(timer.Add(1 * time.Second)) {
432469
*timer = time.Now()

listener_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,64 @@ func TestRetrieveMessage(t *testing.T) {
135135
listener.Close()
136136
}
137137

138+
func TestLimitReset(t *testing.T) {
139+
l, _ := new(Listener).InitC("your-stream", "0", ShardIterTypes[3], "accesskey", "secretkey", "us-east-1", 10)
140+
l.NewEndpoint(testEndpoint, "your-stream")
141+
142+
testLimitResetDuration := 2 * time.Second
143+
144+
Convey("If the request limit has not been decreased", t, func() {
145+
Convey("It should be equal to the defaultLimit", func() {
146+
So(l.limit, ShouldEqual, defaultLimit)
147+
148+
Convey("If the request limit has been decreased", func() {
149+
l.decreaseRequestLimit()
150+
Convey("It should not be reset until the timer has elapsed", func() {
151+
152+
go l.startRequestLimitReset(testLimitResetDuration)
153+
<-time.After(1 * time.Second)
154+
l.limitMu.Lock()
155+
So(l.limit, ShouldBeLessThan, defaultLimit)
156+
l.limitMu.Unlock()
157+
158+
go l.startRequestLimitReset(testLimitResetDuration)
159+
<-time.After(1 * time.Second)
160+
l.limitMu.Lock()
161+
So(l.limit, ShouldBeLessThan, defaultLimit)
162+
l.limitMu.Unlock()
163+
164+
go l.startRequestLimitReset(testLimitResetDuration)
165+
<-time.After(1 * time.Second)
166+
l.limitMu.Lock()
167+
So(l.limit, ShouldBeLessThan, defaultLimit)
168+
l.limitMu.Unlock()
169+
170+
<-time.After(testLimitResetDuration + 1*time.Second)
171+
l.limitMu.Lock()
172+
So(l.limit, ShouldEqual, defaultLimit)
173+
l.limitMu.Unlock()
174+
})
175+
})
176+
})
177+
})
178+
}
179+
180+
func TestLimitGreaterThanZero(t *testing.T) {
181+
l, _ := new(Listener).InitC("your-stream", "0", ShardIterTypes[3], "accesskey", "secretkey", "us-east-1", 10)
182+
l.NewEndpoint(testEndpoint, "your-stream")
183+
184+
Convey("If the listener limit is decreased", t, func() {
185+
186+
for idx := 0; idx < 100; idx++ {
187+
l.decreaseRequestLimit()
188+
}
189+
190+
Convey("it should always be greater than zero", func() {
191+
So(l.limit, ShouldBeGreaterThan, 0)
192+
})
193+
})
194+
}
195+
138196
var cases = []struct {
139197
message []byte
140198
}{

vendor/github.com/smartystreets/assertions/internal/oglematchers/all_of_test.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/smartystreets/assertions/internal/oglematchers/contains_test.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/smartystreets/assertions/internal/oglematchers/deep_equals_test.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/smartystreets/assertions/internal/oglematchers/elements_are_test.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/smartystreets/assertions/internal/oglematchers/error_test.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/smartystreets/assertions/internal/oglematchers/has_substr_test.go

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/smartystreets/assertions/internal/oglematchers/identical_to_test.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)