Skip to content

Commit d32b83b

Browse files
magiusdarrigomrmonaghancalebstewartspentakotavmwjc
authored
[MTCES-1264] Fetch upstream updates (#3)
* fixing infinite worker loop Signed-off-by: Mike Monaghan <mike_monaghan@live.ca> * Automatically resolve default KinesisEndpoint This commit fixes vmware-archive#5 by returning `aws.EndpointNotFoundError` from the endpoint resolver when no `KinesisEndpoint` is defined, which will resolve the default AWS endpoint. This is the same process used by the DynamoDB checkpointer to resolve the default endpoint. Signed-off-by: Caleb Stewart <caleb.stewart94@gmail.com> * fix: catch DynamoDB Scan error when trying to scan nonexistent table/index in syncLeases() Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * chore: Adding periods to copyright comment to satisfy gofmt Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * feat: Sending renewed lease metric Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * fix: add DeleteMetricMillisBehindLatest for error case Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * Refactor in prep for testing rate limiting improvements Signed-off-by: John Calixto <jcalixto@vmware.com> * fix: add getRecords TPS rate limiting Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * fix: add hard cap maxRetries for getRecord errors Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * fix: add maxBytes per second getRecord check Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * chore: log RemoveLeaseOwner errors with debug instead of error Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * fix: add check for GetRecords error within callGetRecordsAPI Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * fix: use nanosecond precision in lease comparisons Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * chore: add info logs in sleep case for kinesis backoff errors Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * fix: Check token bucket corner cases correctly. Signed-off-by: John Calixto <jcalixto@vmware.com> * Bump github.com/prometheus/client_golang from 1.11.0 to 1.11.1 Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.11.0 to 1.11.1. - [Release notes](https://github.com/prometheus/client_golang/releases) - [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md) - [Commits](prometheus/client_golang@v1.11.0...v1.11.1) --- updated-dependencies: - dependency-name: github.com/prometheus/client_golang dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com> * Bump golang.org/x/sys from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0 Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0. - [Release notes](https://github.com/golang/sys/releases) - [Commits](https://github.com/golang/sys/commits/v0.1.0) --- updated-dependencies: - dependency-name: golang.org/x/sys dependency-type: indirect ... Signed-off-by: dependabot[bot] <support@github.com> * fix: add shutdown and leaseExpired error cases for checkpoint function Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * feat: make lease renewal async Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * fix: return err log in case of ErrLeaseNotAcquired Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * chore: Remove extraneous err check After checking the scan result above this line, checking err here no longer has any effect. Signed-off-by: John Calixto <jcalixto@vmware.com> * fix: pass in ctx with cancel for renewLease Signed-off-by: Shiva Pentakota <spentakota@vmware.com> * ran go mod tidy --------- Signed-off-by: Mike Monaghan <mike_monaghan@live.ca> Signed-off-by: Caleb Stewart <caleb.stewart94@gmail.com> Signed-off-by: Shiva Pentakota <spentakota@vmware.com> Signed-off-by: John Calixto <jcalixto@vmware.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Mike Monaghan <mike_monaghan@live.ca> Co-authored-by: Caleb Stewart <caleb.stewart94@gmail.com> Co-authored-by: Shiva Pentakota <spentakota@vmware.com> Co-authored-by: spentakota <120056013+spentakota@users.noreply.github.com> Co-authored-by: John Calixto <jcalixto@vmware.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: cmckelvey-vmware <85366153+cmckelvey-vmware@users.noreply.github.com> Co-authored-by: vmwjc <108959326+vmwjc@users.noreply.github.com>
1 parent c89ce36 commit d32b83b

File tree

16 files changed

+689
-114
lines changed

16 files changed

+689
-114
lines changed

clientlibrary/checkpoint/checkpointer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// Package checkpoint
2121
// The implementation is derived from https://github.com/patrobinson/gokini
2222
//
23-
// # Copyright 2018 Patrick robinson
23+
// Copyright 2018 Patrick robinson.
2424
//
2525
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
2626
//
@@ -79,6 +79,9 @@ type Checkpointer interface {
7979
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
8080
RemoveLeaseOwner(string) error
8181

82+
// GetLeaseOwner to get current owner of lease for shard
83+
GetLeaseOwner(string) (string, error)
84+
8285
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
8386
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)
8487

clientlibrary/checkpoint/dynamodb-checkpointer.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// Package checkpoint
2121
// The implementation is derived from https://github.com/patrobinson/gokini
2222
//
23-
// # Copyright 2018 Patrick robinson
23+
// Copyright 2018 Patrick robinson.
2424
//
2525
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
2626
//
@@ -51,6 +51,10 @@ const (
5151
NumMaxRetries = 10
5252
)
5353

54+
var (
55+
NoLeaseOwnerErr = errors.New("no LeaseOwner in checkpoints table")
56+
)
57+
5458
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
5559
type DynamoCheckpoint struct {
5660
log logger.Logger
@@ -129,7 +133,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
129133
// GetLease attempts to gain a lock on the given shard
130134
func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error {
131135
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
132-
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
136+
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano)
133137
currentCheckpoint, err := checkpointer.getItem(shard.ID)
134138
if err != nil {
135139
return err
@@ -161,7 +165,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
161165
assignedTo := assignedVar.(*types.AttributeValueMemberS).Value
162166
leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value
163167

164-
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout)
168+
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout)
165169
if err != nil {
166170
return err
167171
}
@@ -246,7 +250,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
246250

247251
// CheckpointSequence writes a checkpoint at the designated sequence ID
248252
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
249-
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339)
253+
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano)
250254
marshalledCheckpoint := map[string]types.AttributeValue{
251255
LeaseKeyKey: &types.AttributeValueMemberS{
252256
Value: shard.ID,
@@ -290,7 +294,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
290294

291295
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
292296
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
293-
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value)
297+
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value)
294298
if err != nil {
295299
return err
296300
}
@@ -336,6 +340,23 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
336340
return err
337341
}
338342

343+
// GetLeaseOwner returns current lease owner of given shard in checkpoints table
344+
func (checkpointer *DynamoCheckpoint) GetLeaseOwner(shardID string) (string, error) {
345+
currentCheckpoint, err := checkpointer.getItem(shardID)
346+
if err != nil {
347+
return "", err
348+
}
349+
350+
assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey]
351+
352+
if !assignedToOk {
353+
return "", NoLeaseOwnerErr
354+
}
355+
356+
return assignedVar.(*types.AttributeValueMemberS).Value, nil
357+
358+
}
359+
339360
// ListActiveWorkers returns a map of workers and their shards
340361
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
341362
err := checkpointer.syncLeases(shardStatus)
@@ -370,7 +391,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID
370391
if err != nil && err != ErrSequenceIDNotFound {
371392
return err
372393
}
373-
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339)
394+
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano)
374395

375396
conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
376397
expressionAttributeValues := map[string]types.AttributeValue{
@@ -441,6 +462,12 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
441462
}
442463

443464
scanOutput, err := checkpointer.svc.Scan(context.TODO(), input)
465+
466+
if err != nil {
467+
log.Debugf("Error performing DynamoDB Scan. Error: %+v ", err)
468+
return err
469+
}
470+
444471
results := scanOutput.Items
445472
for _, result := range results {
446473
shardId, foundShardId := result[LeaseKeyKey]
@@ -456,10 +483,6 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
456483
}
457484
}
458485

459-
if err != nil {
460-
log.Debugf("Error performing SyncLeases. Error: %+v ", err)
461-
return err
462-
}
463486
log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond)
464487
return nil
465488
}

clientlibrary/checkpoint/mock-dynamodb_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
// The implementation is derived from https://github.com/patrobinson/gokini
2121
//
22-
// Copyright 2018 Patrick robinson
22+
// Copyright 2018 Patrick robinson.
2323
//
2424
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
2525
//

clientlibrary/config/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ const (
6969
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
7070
DefaultLeaseRefreshPeriodMillis = 5000
7171

72+
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
73+
DefaultLeaseRefreshWaitTime = 2500
74+
7275
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
7376
DefaultMaxRecords = 10000
7477

@@ -136,6 +139,9 @@ const (
136139

137140
// DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB)
138141
DefaultLeaseSyncingIntervalMillis = 60000
142+
143+
// DefaultMaxRetryCount The default maximum number of retries in case of error
144+
DefaultMaxRetryCount = 5
139145
)
140146

141147
type (
@@ -213,6 +219,9 @@ type (
213219
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
214220
LeaseRefreshPeriodMillis int
215221

222+
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
223+
LeaseRefreshWaitTime int
224+
216225
// MaxRecords Max records to read per Kinesis getRecords() call
217226
MaxRecords int
218227

@@ -283,6 +292,9 @@ type (
283292

284293
// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
285294
LeaseSyncingTimeIntervalMillis int
295+
296+
// MaxRetryCount The maximum number of retries in case of error
297+
MaxRetryCount int
286298
}
287299
)
288300

clientlibrary/config/kcl-config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
102102
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
103103
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
104104
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
105+
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
106+
MaxRetryCount: DefaultMaxRetryCount,
105107
Logger: logger.GetDefaultLogger(),
106108
}
107109
}
@@ -148,6 +150,12 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
148150
return c
149151
}
150152

153+
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
154+
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
155+
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
156+
return c
157+
}
158+
151159
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
152160
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
153161
c.ShardSyncIntervalMillis = shardSyncIntervalMillis
@@ -211,6 +219,13 @@ func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *Kinesi
211219
return c
212220
}
213221

222+
// WithMaxRetryCount sets the max retry count in case of error.
223+
func (c *KinesisClientLibConfiguration) WithMaxRetryCount(maxRetryCount int) *KinesisClientLibConfiguration {
224+
checkIsValuePositive("maxRetryCount", maxRetryCount)
225+
c.MaxRetryCount = maxRetryCount
226+
return c
227+
}
228+
214229
// WithMonitoringService sets the monitoring service to use to publish metrics.
215230
func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration {
216231
// Nil case is handled downward (at worker creation) so no need to do it here.

clientlibrary/metrics/cloudwatch/cloudwatch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// Package cloudwatch
2121
// The implementation is derived from https://github.com/patrobinson/gokini
2222
//
23-
// # Copyright 2018 Patrick robinson
23+
// Copyright 2018 Patrick robinson.
2424
//
2525
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
2626
//

clientlibrary/metrics/interfaces.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// Package metrics
2121
// The implementation is derived from https://github.com/patrobinson/gokini
2222
//
23-
// Copyright 2018 Patrick robinson
23+
// Copyright 2018 Patrick robinson.
2424
//
2525
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
2626
//
@@ -35,6 +35,7 @@ type MonitoringService interface {
3535
IncrRecordsProcessed(shard string, count int)
3636
IncrBytesProcessed(shard string, count int64)
3737
MillisBehindLatest(shard string, milliSeconds float64)
38+
DeleteMetricMillisBehindLatest(shard string)
3839
LeaseGained(shard string)
3940
LeaseLost(shard string)
4041
LeaseRenewed(shard string)
@@ -53,6 +54,7 @@ func (NoopMonitoringService) Shutdown() {}
5354
func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {}
5455
func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {}
5556
func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {}
57+
func (NoopMonitoringService) DeleteMetricMillisBehindLatest(_ string) {}
5658
func (NoopMonitoringService) LeaseGained(_ string) {}
5759
func (NoopMonitoringService) LeaseLost(_ string) {}
5860
func (NoopMonitoringService) LeaseRenewed(_ string) {}

clientlibrary/metrics/prometheus/prometheus.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// Package prometheus
2121
// The implementation is derived from https://github.com/patrobinson/gokini
2222
//
23-
// # Copyright 2018 Patrick robinson
23+
// Copyright 2018 Patrick robinson.
2424
//
2525
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
2626
//
@@ -147,6 +147,10 @@ func (p *MonitoringService) MillisBehindLatest(shard string, millSeconds float64
147147
p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds)
148148
}
149149

150+
func (p *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
151+
p.behindLatestMillis.Delete(prom.Labels{"shard": shard, "kinesisStream": p.streamName})
152+
}
153+
150154
func (p *MonitoringService) LeaseGained(shard string) {
151155
p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc()
152156
}

clientlibrary/partition/partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// Package partition
2121
// The implementation is derived from https://github.com/patrobinson/gokini
2222
//
23-
// # Copyright 2018 Patrick robinson
23+
// Copyright 2018 Patrick robinson.
2424
//
2525
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
2626
//

clientlibrary/worker/common-shard-consumer.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package worker
2222

2323
import (
24+
"context"
2425
"sync"
2526
"time"
2627

@@ -40,29 +41,36 @@ type shardConsumer interface {
4041
getRecords() error
4142
}
4243

44+
type KinesisSubscriberGetter interface {
45+
SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error)
46+
GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
47+
GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
48+
}
49+
4350
// commonShardConsumer implements common functionality for regular and enhanced fan-out consumers
4451
type commonShardConsumer struct {
4552
shard *par.ShardStatus
46-
kc *kinesis.Client
53+
kc KinesisSubscriberGetter
4754
checkpointer chk.Checkpointer
4855
recordProcessor kcl.IRecordProcessor
4956
kclConfig *config.KinesisClientLibConfiguration
5057
mService metrics.MonitoringService
5158
}
5259

5360
// Cleanup the internal lease cache
54-
func (sc *commonShardConsumer) releaseLease() {
61+
func (sc *commonShardConsumer) releaseLease(shard string) {
5562
log := sc.kclConfig.Logger
5663
log.Infof("Release lease for shard %s", sc.shard.ID)
5764
sc.shard.SetLeaseOwner("")
5865

5966
// Release the lease by wiping out the lease owner for the shard
6067
// Note: we don't need to do anything in case of error here and shard lease will eventually be expired.
6168
if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil {
62-
log.Errorf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
69+
log.Debugf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
6370
}
6471

6572
// reporting lease lose metrics
73+
sc.mService.DeleteMetricMillisBehindLatest(shard)
6674
sc.mService.LeaseLost(sc.shard.ID)
6775
}
6876

@@ -165,7 +173,6 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
165173
input.CacheEntryTime = &getRecordsStartTime
166174
input.CacheExitTime = &processRecordsStartTime
167175
sc.recordProcessor.ProcessRecords(input)
168-
169176
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
170177
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
171178
}

0 commit comments

Comments
 (0)