Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ proxmox:

# Balancing Parameters
parameters:
deviation: 4 # Percentage of allowable deviation of the RAM load of the node
mem_deviation: 4 # Percentage of allowable deviation of the RAM load of the node
threshold: 90 # Percentage of maximum load
cpu_deviation: 1 # Percentage of allowable deviation of the CPU load of the node
cpu_deviation_duration_seconds: 20 # Time of allowable duration of the CPU exceed deviation
lxc_migration: OFF # Container migration (LXCs are rebooted during migration!!!)
migration_timeout: 1000 # For the future
only_on_master: OFF # Only run PLB on the current cluster master
Expand Down
110 changes: 73 additions & 37 deletions plb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import yaml
import smtplib
import socket
from random import random
from email.message import EmailMessage
from time import time
from time import sleep
from itertools import permutations
from copy import deepcopy
from loguru import logger
from collections import defaultdict

try:
with open("config.yaml", "r", encoding='utf8') as yaml_file:
Expand All @@ -27,8 +28,10 @@
auth = dict(cfg["proxmox"]["auth"])

"""Parameters"""
CONFIG_DEVIATION = CD = cfg["parameters"]["deviation"] / 200
MEM_CONFIG_DEVIATION = MCD = cfg["parameters"]["mem_deviation"] / 100
THRESHOLD = cfg["parameters"]["threshold"] / 100
CPU_CONFIG_DEVIATION = CCD = cfg["parameters"]["cpu_deviation"] / 100
CPU_CONFIG_DEVIATION_DURATION_SECONDS = cfg["parameters"]["cpu_deviation_duration_seconds"]
LXC_MIGRATION = cfg["parameters"]["lxc_migration"]
MIGRATION_TIMEOUT = cfg["parameters"]["migration_timeout"]
ONLY_ON_MASTER = cfg["parameters"].get("only_on_master", False)
Expand Down Expand Up @@ -75,8 +78,9 @@
"""Globals"""
payload = dict() # PVEAuthCookie
header = dict() # CSRFPreventionToken
sum_of_deviations: float = 0
iteration = 0
mem_sum_of_deviations: float = 0
cpu_sum_of_deviations: float = 0
cpu_sliding_windows = defaultdict(lambda: None)


class Cluster:
Expand Down Expand Up @@ -282,7 +286,7 @@ def authentication(server: str, data: dict):
header = {'CSRFPreventionToken': (get_token.json()['data']['CSRFPreventionToken'])}


def cluster_load_verification(mem_load: float, cluster_obj: object) -> None:
def cluster_load_verification(mem_load: float, cluster_obj: Cluster) -> None:
"""Checking the RAM load of the balanced part of the cluster"""
logger.debug("Starting cluster_load_verification")
if len(cluster_obj.cl_nodes) - len(excluded_nodes) == 1:
Expand All @@ -295,29 +299,53 @@ def cluster_load_verification(mem_load: float, cluster_obj: object) -> None:
sys.exit(1)


def need_to_balance_checking(cluster_obj: object) -> bool:
"""Checking the need for balancing"""
logger.debug("Starting need_to_balance_checking")
global sum_of_deviations, iteration
def calculate_sum_of_deviations(cluster_obj: Cluster) -> None:
"""Calculation of the sum of deviations for RAM and CPU"""
logger.debug("Starting calculate_sum_of_deviations")
global mem_sum_of_deviations, cpu_sum_of_deviations
nodes = cluster_obj.included_nodes
average = cluster_obj.mem_load_included
mem_average = cluster_obj.mem_load_included
cpu_average = cluster_obj.cl_cpu_load_include
for host, values in nodes.items():
values["deviation"] = abs(values["mem_load"] - average)
sum_of_deviations = sum(values["deviation"] for values in nodes.values())
if iteration > 10:
operational_deviation = CD/2 if random() > 1/3 else CD/4 if random() > 1/6 else CD/8
logger.debug(f'operational_deviation changed to {operational_deviation}')
iteration = 0
else:
operational_deviation = CONFIG_DEVIATION
values["mem_deviation"] = abs(values["mem_load"] - mem_average)
for host, values in nodes.items():
values["cpu_deviation"] = abs(values["cpu"] - cpu_average)
mem_sum_of_deviations = sum(values["mem_deviation"] for values in nodes.values())
cpu_sum_of_deviations = sum(values["cpu_deviation"] for values in nodes.values())


def need_to_balance_mem_checking(cluster_obj: Cluster) -> bool:
"""Checking the RAM load of the balanced part of the cluster"""
logger.debug("Starting cluster_load_verification")
nodes = cluster_obj.included_nodes
for values in nodes.values():
if values["deviation"] > operational_deviation:
if values["mem_deviation"] > MEM_CONFIG_DEVIATION:
return True
else:
return False


def temporary_dict(cluster_obj: object) -> object:
def need_to_balance_cpu_checking(cluster_obj: Cluster) -> bool:
"""Checking the need for balancing about CPU"""
logger.debug("Starting need_to_balance_cpu_checking")
nodes = cluster_obj.included_nodes
cpu_average = cluster_obj.cl_cpu_load_include
for host, values in nodes.items():
if values["cpu_deviation"] > CPU_CONFIG_DEVIATION:
sign = values["cpu"] > cpu_average
if cpu_sliding_windows[host] is None:
cpu_sliding_windows[host] = (time(), sign)
elif cpu_sliding_windows[host][1] != sign:
cpu_sliding_windows[host] = (time(), sign)
elif time() - cpu_sliding_windows[host][0] > CPU_CONFIG_DEVIATION_DURATION_SECONDS:
return True
else:
cpu_sliding_windows[host] = None
else:
return False


def temporary_dict(cluster_obj: Cluster) -> object:
"""Preparation of information for subsequent processing"""
logger.debug("Running temporary_dict")
obj = {}
Expand All @@ -334,26 +362,34 @@ def temporary_dict(cluster_obj: object) -> object:
return obj


def calculating(hosts: object, cluster_obj: object) -> list:
def calculating(hosts: object, cluster_obj: Cluster) -> list:
"""The function of selecting the optimal VM migration options for the cluster balance"""
logger.debug("Starting calculating")
count = 0
variants: list = []
nodes = cluster_obj.included_nodes
average = cluster_obj.mem_load_included
mem_average = cluster_obj.mem_load_included
cpu_average = cluster_obj.cl_cpu_load_include
for host in permutations(nodes, 2):
part_of_deviation = sum(values["deviation"] if node not in host else 0 for node, values in nodes.items())
cpu_part_of_deviation = sum(values["cpu_deviation"] if node not in host else 0 for node, values in nodes.items())
mem_part_of_deviation = sum(values["mem_deviation"] if node not in host else 0 for node, values in nodes.items())
for vm in hosts[host[0]].values():
h0_mem_load = (nodes[host[0]]["mem"] - vm["mem"]) / nodes[host[0]]["maxmem"]
h0_deviation = h0_mem_load - average if h0_mem_load > average else average - h0_mem_load
h0_mem_deviation = h0_mem_load - mem_average if h0_mem_load > mem_average else mem_average - h0_mem_load
h1_mem_load = (nodes[host[1]]["mem"] + vm["mem"]) / nodes[host[1]]["maxmem"]
h1_deviation = h1_mem_load - average if h1_mem_load > average else average - h1_mem_load
temp_full_deviation = part_of_deviation + h0_deviation + h1_deviation
if temp_full_deviation < sum_of_deviations:
variant = (host[0], host[1], vm["vmid"], temp_full_deviation)
h1_mem_deviation = h1_mem_load - mem_average if h1_mem_load > mem_average else mem_average - h1_mem_load
mem_temp_full_deviation = mem_part_of_deviation + h0_mem_deviation + h1_mem_deviation
vm["cpu_used"] = round(vm["maxcpu"] * vm["cpu"], 2)
h0_cpu_load = (nodes[host[0]]["cpu_used"] - vm["cpu_used"]) / nodes[host[0]]["maxcpu"]
h0_cpu_deviation = h0_cpu_load - cpu_average if h0_cpu_load > cpu_average else cpu_average - h0_cpu_load
h1_cpu_load = (nodes[host[1]]["cpu_used"] + vm["cpu_used"]) / nodes[host[1]]["maxcpu"]
h1_cpu_deviation = h1_cpu_load - cpu_average if h1_cpu_load > cpu_average else cpu_average - h1_cpu_load
cpu_temp_full_deviation = cpu_part_of_deviation + h0_cpu_deviation + h1_cpu_deviation
if mem_temp_full_deviation < mem_sum_of_deviations and cpu_temp_full_deviation < cpu_sum_of_deviations:
score = mem_temp_full_deviation / mem_sum_of_deviations
score += cpu_temp_full_deviation / cpu_sum_of_deviations
variant = (host[0], host[1], vm["vmid"], score)
variants.append(variant)
count += 1
logger.info(f'Number of options = {count}')
logger.info(f'Number of options = {len(variants)}')
return sorted(variants, key=lambda last: last[-1])


Expand Down Expand Up @@ -459,7 +495,6 @@ def send_mail(message: str):

def main():
"""The main body of the program"""
global iteration
authentication(server_url, auth)
cluster = Cluster(server_url)
if ONLY_ON_MASTER:
Expand All @@ -471,10 +506,12 @@ def main():
sleep(300)
return
cluster_load_verification(cluster.mem_load_included, cluster)
need_to_balance = need_to_balance_checking(cluster)
logger.info(f'Need to balance: {need_to_balance}')
if need_to_balance:
iteration = 0
calculate_sum_of_deviations(cluster)
need_to_balance_cpu = need_to_balance_cpu_checking(cluster)
need_to_balance_mem = need_to_balance_mem_checking(cluster)
logger.info(f'Need to balance (CPU): {need_to_balance_cpu}')
logger.info(f'Need to balance (MEM): {need_to_balance_mem}')
if need_to_balance_cpu or need_to_balance_mem:
balance_cl = temporary_dict(cluster)
sorted_variants = calculating(balance_cl, cluster)
if sorted_variants:
Expand All @@ -486,7 +523,6 @@ def main():
pass # TODO Aggressive algorithm
else:
logger.info('The cluster is balanced. Waiting 300 seconds.')
iteration += 1
sleep(300)


Expand Down