From f174e99eec7036d6ea18e05d4c790abd2280ec76 Mon Sep 17 00:00:00 2001 From: Dan Maas Date: Sun, 19 Aug 2018 18:52:06 -0700 Subject: [PATCH] do not scale all the way down to zero unless there are no invisible messages remaining --- sqs/sqs.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/sqs/sqs.py b/sqs/sqs.py index 22a51fd..ea0cad8 100644 --- a/sqs/sqs.py +++ b/sqs/sqs.py @@ -19,26 +19,34 @@ def __init__(self, options): self.last_scale_up_time = time() self.last_scale_down_time = time() - def message_count(self): + def message_counts(self): response = self.sqs_client.get_queue_attributes( QueueUrl=self.options.sqs_queue_url, - AttributeNames=['ApproximateNumberOfMessages'] + AttributeNames=['ApproximateNumberOfMessages','ApproximateNumberOfMessagesNotVisible'] ) - return int(response['Attributes']['ApproximateNumberOfMessages']) + message_count = int(response['Attributes']['ApproximateNumberOfMessages']) + invisible_message_count = int(response['Attributes']['ApproximateNumberOfMessagesNotVisible']) + return message_count, invisible_message_count def poll(self): - message_count = self.message_count() + message_count, invisible_message_count = self.message_counts() + deployment = self.deployment() t = time() if message_count >= self.options.scale_up_messages: if t - self.last_scale_up_time > self.options.scale_up_cool_down: - self.scale_up() + self.scale_up(deployment) self.last_scale_up_time = t else: logger.debug("Waiting for scale up cooldown") if message_count <= self.options.scale_down_messages: - if t - self.last_scale_down_time > self.options.scale_down_cool_down: - self.scale_down() + # special case - do not scale to zero unless there are no invisible messages remaining + if deployment.spec.replicas <= 1 and \ + deployment.spec.replicas > self.options.min_pods and \ + invisible_message_count > 0: + logger.debug("Not scaling to zero because {} message(s) are still in-flight".format(invisible_message_count)) + elif t - self.last_scale_down_time > self.options.scale_down_cool_down: + self.scale_down(deployment) self.last_scale_down_time = t else: logger.debug("Waiting for scale down cooldown") @@ -46,25 +54,23 @@ def poll(self): # code for scale to use msg_count sleep(self.options.poll_period) - def scale_up(self): - deployment = self.deployment() + def scale_up(self, deployment): if deployment.spec.replicas < self.options.max_pods: logger.info("Scaling up") deployment.spec.replicas += 1 self.update_deployment(deployment) elif deployment.spec.replicas > self.options.max_pods: - self.scale_down() + self.scale_down(deployment) else: logger.info("Max pods reached") - def scale_down(self): - deployment = self.deployment() + def scale_down(self, deployment): if deployment.spec.replicas > self.options.min_pods: logger.info("Scaling Down") deployment.spec.replicas -= 1 self.update_deployment(deployment) elif deployment.spec.replicas < self.options.min_pods: - self.scale_up() + self.scale_up(deployment) else: logger.info("Min pods reached")