Skip to content

Commit ed63d5f

Browse files
authored
Merge pull request #29 from rewardStyle/fix_refresh_iterator_rate_limit
Fix refresh iterator rate limit
2 parents 3c4f36f + 7a28a70 commit ed63d5f

File tree

7 files changed

+43
-42
lines changed

7 files changed

+43
-42
lines changed

.gitignore

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,17 @@ _testmain.go
2323
*.exe
2424
*.test
2525
*.prof
26+
27+
# Created by https://www.gitignore.io/api/vim
28+
29+
### Vim ###
30+
# swap
31+
[._]*.s[a-w][a-z]
32+
[._]s[a-w][a-z]
33+
# session
34+
Session.vim
35+
# temporary
36+
.netrwhist
37+
*~
38+
# auto-generated tag files
39+
tags

firehose.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,6 @@ func (p *Producer) sendFirehoseRecords(args *gokinesis.RequestArgs) {
143143
}
144144

145145
if conf.Debug.Verbose && p.getMsgCount()%100 == 0 {
146-
log.Println("Messages sent so far: " + strconv.Itoa(p.getMsgCount()))
146+
log.Println("Messages sent so far: " + strconv.FormatInt(p.getMsgCount(), 10))
147147
}
148148
}

kinesis.go

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kinetic
33
import (
44
"errors"
55
"sync"
6+
"sync/atomic"
67
"time"
78

89
gokinesis "github.com/rewardStyle/go-kinesis"
@@ -57,11 +58,8 @@ type kinesis struct {
5758

5859
client gokinesis.KinesisClient
5960

60-
msgCount int
61-
msgCountMu sync.Mutex
62-
63-
errCount int
64-
errCountMu sync.Mutex
61+
msgCount int64
62+
errCount int64
6563
}
6664

6765
func (k *kinesis) init(stream, shard, shardIteratorType, accessKey, secretKey, region string) (*kinesis, error) {
@@ -157,39 +155,27 @@ func (k *kinesis) refreshClient(accessKey, secretKey, region string) {
157155
}
158156

159157
func (k *kinesis) decMsgCount() {
160-
k.msgCountMu.Lock()
161-
k.msgCount--
162-
k.msgCountMu.Unlock()
158+
atomic.AddInt64(&k.msgCount, -1)
163159
}
164160

165161
func (k *kinesis) incMsgCount() {
166-
k.msgCountMu.Lock()
167-
k.msgCount++
168-
k.msgCountMu.Unlock()
162+
atomic.AddInt64(&k.msgCount, 1)
169163
}
170164

171-
func (k *kinesis) getMsgCount() int {
172-
k.msgCountMu.Lock()
173-
defer k.msgCountMu.Unlock()
174-
return k.msgCount
165+
func (k *kinesis) getMsgCount() int64 {
166+
return atomic.LoadInt64(&k.msgCount)
175167
}
176168

177169
func (k *kinesis) decErrCount() {
178-
k.errCountMu.Lock()
179-
k.errCount--
180-
k.errCountMu.Unlock()
170+
atomic.AddInt64(&k.errCount, -1)
181171
}
182172

183173
func (k *kinesis) incErrCount() {
184-
k.errCountMu.Lock()
185-
k.errCount++
186-
k.errCountMu.Unlock()
174+
atomic.AddInt64(&k.errCount, 1)
187175
}
188176

189-
func (k *kinesis) getErrCount() int {
190-
k.errCountMu.Lock()
191-
defer k.errCountMu.Unlock()
192-
return k.errCount
177+
func (k *kinesis) getErrCount() int64 {
178+
return atomic.LoadInt64(&k.errCount)
193179
}
194180

195181
func getLock(sem chan bool) {

listener.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,19 @@ retry:
206206
func (l *Listener) consume() {
207207
l.setConsuming(true)
208208

209-
counter := 0
210-
timer := time.Now()
209+
readCounter := 0
210+
readTimer := time.Now()
211+
212+
GsiCounter := 0
213+
GsiTimer := time.Now()
211214

212215
for {
213-
l.throttle(&counter, &timer)
216+
if !l.shouldConsume() {
217+
l.setConsuming(false)
218+
break
219+
}
220+
221+
l.throttle(&readCounter, &readTimer)
214222

215223
// args() will give us the shard iterator and type as well as the shard id
216224
response, err := l.client.GetRecords(l.args())
@@ -241,9 +249,7 @@ func (l *Listener) consume() {
241249

242250
// If we received an error we should wait and attempt to
243251
// refresh the shard iterator again
244-
<-time.After(1 * time.Second)
245-
246-
log.Println("Retrying after waiting one second.")
252+
l.throttle(&GsiCounter, &GsiTimer)
247253

248254
goto refresh_iterator
249255
}
@@ -259,11 +265,6 @@ func (l *Listener) consume() {
259265
}
260266
}
261267
}
262-
263-
if !l.shouldConsume() {
264-
l.setConsuming(false)
265-
break
266-
}
267268
}
268269
}
269270

listener_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestListenerError(t *testing.T) {
4949
// Let the error propagate
5050
<-time.After(1 * time.Second)
5151

52-
So(listener.errCount, ShouldNotEqual, 0)
52+
So(listener.getErrCount(), ShouldNotEqual, 0)
5353
So(listener.IsListening(), ShouldEqual, true)
5454
})
5555
})

producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ stop:
218218
p.incMsgCount()
219219

220220
if conf.Debug.Verbose && p.getMsgCount()%100 == 0 {
221-
log.Println("Received message to send. Total messages received: " + strconv.Itoa(p.getMsgCount()))
221+
log.Println("Received message to send. Total messages received: " + strconv.FormatInt(p.getMsgCount(), 10))
222222
}
223223

224224
kargs := p.args()
@@ -320,7 +320,7 @@ func (p *Producer) sendRecords(args *gokinesis.RequestArgs) {
320320
}
321321

322322
if conf.Debug.Verbose && p.getMsgCount()%100 == 0 {
323-
log.Println("Messages sent so far: " + strconv.Itoa(p.getMsgCount()))
323+
log.Println("Messages sent so far: " + strconv.FormatInt(p.getMsgCount(), 10))
324324
}
325325
}
326326

producer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func TestProducerError(t *testing.T) {
4343
Convey("It should handle errors successfully", func() {
4444
producer.errors <- errors.New("All your base are belong to us!")
4545
// Let the error propagate
46-
<-time.After(1 * time.Second)
47-
So(producer.errCount, ShouldEqual, 1)
46+
<-time.After(3 * time.Second)
47+
So(producer.getErrCount(), ShouldEqual, 1)
4848
So(producer.IsProducing(), ShouldEqual, true)
4949
})
5050
})

0 commit comments

Comments
 (0)