Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions golang/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) {

// Update the scale up alarm.
// Set the state of the scale up alarm to INSUFFICIENT_DATA.
_, err = updateAlarm(scaleUpAlarmName, evaluationPeriodScaleUp, datapointsRequiredScaleUp, upThreshold, cloudwatch.ComparisonOperatorGreaterThanOrEqualToThreshold, streamName, alarmActions, newShardCount, false, 0)
_, err = updateAlarm(scaleUpAlarmName, periodMins, evaluationPeriodScaleUp, datapointsRequiredScaleUp, upThreshold, cloudwatch.ComparisonOperatorGreaterThanOrEqualToThreshold, streamName, alarmActions, newShardCount, false, 0)
if err != nil {
logMessage := fmt.Sprintf("Kinesis stream (%s) has scaled and been tagged with the timestamp but couldn't update the scale-up alarm (%s). Log CloudWatch PutMetricAlarm API error.", streamName, scaleUpAlarmName)
logger.WithError(err).Error(logMessage)
Expand All @@ -300,7 +300,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) {

// Update the scale down alarm.
// Set the state of the scale down alarm to INSUFFICIENT_DATA.
_, err = updateAlarm(scaleDownAlarmName, evaluationPeriodScaleDown, datapointsRequiredScaleDown, downThreshold, cloudwatch.ComparisonOperatorLessThanThreshold, streamName, alarmActions, newShardCount, true, scaleDownMinIterAgeMins)
_, err = updateAlarm(scaleDownAlarmName, periodMins, evaluationPeriodScaleDown, datapointsRequiredScaleDown, downThreshold, cloudwatch.ComparisonOperatorLessThanThreshold, streamName, alarmActions, newShardCount, true, scaleDownMinIterAgeMins)
if err != nil {
logMessage := fmt.Sprintf("Kinesis stream (%s) has scaled and been tagged with the timestamp but couldn't update the scale-down alarm (%s). Log CloudWatch PutMetricAlarm API error.", streamName, scaleDownAlarmName)
logger.WithError(err).Error(logMessage)
Expand Down Expand Up @@ -357,12 +357,11 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) {
// newShardCount: The new shard count of the Kinesis Data Stream
// isScaledown: true if the alarm is for scale down, false if for scale up
// scaleDownMinIterAgeMins: used for scaleDown only metrics
func updateAlarm(alarmName string, evaluationPeriod int64, datapointsRequired int64, threshold float64, comparisonOperator string, streamName string, alarmActions []*string, newShardCount int64, isScaleDown bool, scaleDownMinIterAgeMins int64) (*cloudwatch.PutMetricAlarmOutput, error) {
func updateAlarm(alarmName string, periodMins int64, evaluationPeriod int64, datapointsRequired int64, threshold float64, comparisonOperator string, streamName string, alarmActions []*string, newShardCount int64, isScaleDown bool, scaleDownMinIterAgeMins int64) (*cloudwatch.PutMetricAlarmOutput, error) {
var putMetricAlarmResponse *cloudwatch.PutMetricAlarmOutput
var err error
// Initialize the seed function to get a different random number every execution.
rand.Seed(time.Now().UnixNano())
var periodMins int64 = 5 // Data is evaluated every 5 minutes
var retryCount int64 = 0
var isDone bool

Expand Down