Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 133 additions & 65 deletions kinesumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ const (
syncInterval = 5*time.Second + jitter
syncTimeout = 5*time.Second - jitter

defaultCommitTimeout = 2 * time.Second
defaultCommitInterval = 5 * time.Second
defaultCommitTimeout = 2 * time.Second
defaultCommitInterval = 5 * time.Second
defaultMarkQueueCapacity = 100

defaultScanLimit int64 = 2000

Expand All @@ -36,10 +37,11 @@ const (

// Error codes.
var (
ErrEmptySequenceNumber = errors.New("kinesumer: sequence number can't be empty")
ErrInvalidStream = errors.New("kinesumer: invalid stream")
errEmptyCommitCheckpoints = errors.New("kinesumer: commit checkpoints can't be empty")
errMarkNilRecord = errors.New("kinesumer: nil record can't be marked")
ErrEmptySequenceNumber = errors.New("kinesumer: sequence number can't be empty")
ErrInvalidStream = errors.New("kinesumer: invalid stream")
errMarkNilRecord = errors.New("kinesumer: nil record can't be marked")
errMarkTimeout = errors.New("kinesumer: mark timeout")
errCommitTimeout = errors.New("kinesumer: commit timeout")
)

// Config defines configs for the Kinesumer client.
Expand Down Expand Up @@ -83,14 +85,18 @@ type CommitConfig struct {

// A Timeout config for commit per stream. (default is 2s)
Timeout time.Duration

// A capacity of markRequest queue. (default is 100)
MarkRecordQueueCapacity uint64
}

// NewDefaultCommitConfig returns a new default offset management configuration.
func NewDefaultCommitConfig() *CommitConfig {
return &CommitConfig{
Auto: true,
Interval: defaultCommitInterval,
Timeout: defaultCommitTimeout,
Auto: true,
Interval: defaultCommitInterval,
Timeout: defaultCommitTimeout,
MarkRecordQueueCapacity: defaultMarkQueueCapacity,
}
}

Expand Down Expand Up @@ -124,6 +130,32 @@ type efoMeta struct {
consumerName string
}

type markRequest struct {
stream string
shardID string
sequenceNumber string
}

// offsets holds uncommitted sequence numbers.
// stream -> shardID -> sequenceNumber
type offsets map[string]map[string]string

func (o offsets) merge(stream, shardID, sequenceNumber string) {
offsetsByShard, ok := o[stream]
if !ok {
offsetsByShard = make(map[string]string)
o[stream] = offsetsByShard
}

offsetsByShard[shardID] = sequenceNumber
}

func (o offsets) clear() {
for stream := range o {
delete(o, stream)
}
}

// Kinesumer implements auto re-balancing consumer group for Kinesis.
// TODO(mingrammer): export prometheus metrics.
type Kinesumer struct {
Expand Down Expand Up @@ -157,8 +189,10 @@ type Kinesumer struct {
shards map[string]Shards
// To cache the last sequence numbers for each shard.
checkPoints map[string]*sync.Map
// offsets holds uncommitted sequence numbers.
offsets map[string]*sync.Map
// To manage the sequence numbers to be marked.
markRequestCh chan *markRequest
// To manage the sequence numbers to be committed.
commit chan struct{}
// To manage the next shard iterators for each shard.
nextIters map[string]*sync.Map

Expand Down Expand Up @@ -243,27 +277,28 @@ func NewKinesumer(cfg *Config) (*Kinesumer, error) {

buffer := recordsChanBuffer
kinesumer := &Kinesumer{
id: id,
client: kinesis.New(sess, cfgs...),
app: cfg.App,
rgn: cfg.Region,
efoMode: cfg.EFOMode,
records: make(chan *Record, buffer),
errors: make(chan error, 1),
stateStore: stateStore,
shardCaches: make(map[string][]string),
shards: make(map[string]Shards),
checkPoints: make(map[string]*sync.Map),
offsets: make(map[string]*sync.Map),
nextIters: make(map[string]*sync.Map),
scanLimit: defaultScanLimit,
scanTimeout: defaultScanTimeout,
scanInterval: defaultScanInterval,
started: make(chan struct{}),
wait: sync.WaitGroup{},
stop: make(chan struct{}),
mu: &sync.Mutex{},
close: make(chan struct{}),
id: id,
client: kinesis.New(sess, cfgs...),
app: cfg.App,
rgn: cfg.Region,
efoMode: cfg.EFOMode,
records: make(chan *Record, buffer),
errors: make(chan error, 1),
stateStore: stateStore,
shardCaches: make(map[string][]string),
shards: make(map[string]Shards),
checkPoints: make(map[string]*sync.Map),
markRequestCh: make(chan *markRequest, cfg.Commit.MarkRecordQueueCapacity),
commit: make(chan struct{}, 1),
nextIters: make(map[string]*sync.Map),
scanLimit: defaultScanLimit,
scanTimeout: defaultScanTimeout,
scanInterval: defaultScanInterval,
started: make(chan struct{}),
wait: sync.WaitGroup{},
stop: make(chan struct{}),
mu: &sync.Mutex{},
close: make(chan struct{}),
}

if cfg.ScanLimit > 0 {
Expand All @@ -287,6 +322,9 @@ func NewKinesumer(cfg *Config) (*Kinesumer, error) {
if err := kinesumer.init(); err != nil {
return nil, errors.WithStack(err)
}

go kinesumer.markAndCommit()

return kinesumer, nil
}

Expand Down Expand Up @@ -510,7 +548,6 @@ func (k *Kinesumer) consumePipe(stream string, shard *Shard) {
case e, ok := <-streamEvents:
if !ok {
k.Commit()
k.cleanupOffsets(stream, shard)
return
}
if se, ok := e.(*kinesis.SubscribeToShardEvent); ok {
Expand Down Expand Up @@ -615,7 +652,6 @@ func (k *Kinesumer) consumeLoop(stream string, shard *Shard) {
time.Sleep(k.scanInterval)
records, closed := k.consumeOnce(stream, shard)
if closed {
k.cleanupOffsets(stream, shard)
return // Close consume loop if shard is CLOSED and has no data.
}

Expand Down Expand Up @@ -724,6 +760,11 @@ func (k *Kinesumer) commitPeriodically() {

// MarkRecord marks the provided record as consumed.
func (k *Kinesumer) MarkRecord(record *Record) {
k.MarkRecordContext(context.Background(), record)
}

// MarkRecordContext marks the provided record as consumed.
func (k *Kinesumer) MarkRecordContext(ctx context.Context, record *Record) {
if record == nil {
k.sendOrDiscardError(errMarkNilRecord)
return
Expand All @@ -739,32 +780,68 @@ func (k *Kinesumer) MarkRecord(record *Record) {
k.sendOrDiscardError(ErrInvalidStream)
return
}
k.offsets[record.Stream].Store(record.ShardID, seqNum)

req := &markRequest{
stream: record.Stream,
shardID: record.ShardID,
sequenceNumber: seqNum,
}
select {
case <-ctx.Done():
k.sendOrDiscardError(errMarkTimeout)
case k.markRequestCh <- req:
}
}

func (k *Kinesumer) markAndCommit() {
o := make(offsets)

for {
select {
case _, ok := <-k.commit:
if !ok {
return
}

var wg sync.WaitGroup
for stream, offset := range o {
wg.Add(1)
go func(stream string, offset map[string]string) {
defer wg.Done()

checkPoints := make([]*ShardCheckPoint, 0, len(offset))
for shardID, seqNum := range offset {
checkPoints = append(checkPoints, &ShardCheckPoint{
Stream: stream,
ShardID: shardID,
SequenceNumber: seqNum,
})
}

k.commitCheckPointsPerStream(stream, checkPoints)
}(stream, offset)
}
wg.Wait()

o.clear()
case req := <-k.markRequestCh:
o.merge(req.stream, req.shardID, req.sequenceNumber)
}
}
}

// Commit updates check point using current checkpoints.
func (k *Kinesumer) Commit() {
var wg sync.WaitGroup
for stream := range k.shards {
wg.Add(1)

var checkpoints []*ShardCheckPoint
k.offsets[stream].Range(func(shardID, seqNum interface{}) bool {
checkpoints = append(checkpoints, &ShardCheckPoint{
Stream: stream,
ShardID: shardID.(string),
SequenceNumber: seqNum.(string),
UpdatedAt: time.Now(),
})
return true
})
k.CommitContext(context.Background())
}

go func(stream string, checkpoints []*ShardCheckPoint) {
defer wg.Done()
k.commitCheckPointsPerStream(stream, checkpoints)
}(stream, checkpoints)
// CommitContext updates check point using current checkpoints.
func (k *Kinesumer) CommitContext(ctx context.Context) {
select {
case <-ctx.Done():
k.sendOrDiscardError(errCommitTimeout)
case k.commit <- struct{}{}:
}
wg.Wait()
}

// commitCheckPointsPerStream updates checkpoints using sequence number.
Expand All @@ -782,17 +859,6 @@ func (k *Kinesumer) commitCheckPointsPerStream(stream string, checkpoints []*Sha
}
}

// cleanupOffsets remove uninterested stream's shard.
// TODO(proost): how to remove unused stream?
func (k *Kinesumer) cleanupOffsets(stream string, shard *Shard) {
if shard == nil {
return
}
if offsets, ok := k.offsets[stream]; ok {
offsets.Delete(shard.ID)
}
}

// Refresh refreshes the consuming streams.
func (k *Kinesumer) Refresh(streams []string) {
k.mu.Lock()
Expand Down Expand Up @@ -830,6 +896,8 @@ func (k *Kinesumer) Close() {

k.wait.Wait()

close(k.commit)

// Client should drain the remaining records.
close(k.records)

Expand Down
Loading