Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions sqs/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,58 @@ def __init__(self, options):
self.last_scale_up_time = time()
self.last_scale_down_time = time()

def message_count(self):
def message_counts(self):
response = self.sqs_client.get_queue_attributes(
QueueUrl=self.options.sqs_queue_url,
AttributeNames=['ApproximateNumberOfMessages']
AttributeNames=['ApproximateNumberOfMessages','ApproximateNumberOfMessagesNotVisible']
)
return int(response['Attributes']['ApproximateNumberOfMessages'])
message_count = int(response['Attributes']['ApproximateNumberOfMessages'])
invisible_message_count = int(response['Attributes']['ApproximateNumberOfMessagesNotVisible'])
return message_count, invisible_message_count


def poll(self):
message_count = self.message_count()
message_count, invisible_message_count = self.message_counts()
deployment = self.deployment()
t = time()
if message_count >= self.options.scale_up_messages:
if t - self.last_scale_up_time > self.options.scale_up_cool_down:
self.scale_up()
self.scale_up(deployment)
self.last_scale_up_time = t
else:
logger.debug("Waiting for scale up cooldown")
if message_count <= self.options.scale_down_messages:
if t - self.last_scale_down_time > self.options.scale_down_cool_down:
self.scale_down()
# special case - do not scale to zero unless there are no invisible messages remaining
if deployment.spec.replicas <= 1 and \
deployment.spec.replicas > self.options.min_pods and \
invisible_message_count > 0:
logger.debug("Not scaling to zero because {} message(s) are still in-flight".format(invisible_message_count))
elif t - self.last_scale_down_time > self.options.scale_down_cool_down:
self.scale_down(deployment)
self.last_scale_down_time = t
else:
logger.debug("Waiting for scale down cooldown")

# code for scale to use msg_count
sleep(self.options.poll_period)

def scale_up(self):
deployment = self.deployment()
def scale_up(self, deployment):
if deployment.spec.replicas < self.options.max_pods:
logger.info("Scaling up")
deployment.spec.replicas += 1
self.update_deployment(deployment)
elif deployment.spec.replicas > self.options.max_pods:
self.scale_down()
self.scale_down(deployment)
else:
logger.info("Max pods reached")

def scale_down(self):
deployment = self.deployment()
def scale_down(self, deployment):
if deployment.spec.replicas > self.options.min_pods:
logger.info("Scaling Down")
deployment.spec.replicas -= 1
self.update_deployment(deployment)
elif deployment.spec.replicas < self.options.min_pods:
self.scale_up()
self.scale_up(deployment)
else:
logger.info("Min pods reached")

Expand Down