Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions device_alloc/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,25 @@ def optimize_contention(
solver: str = 'COIN_CMD',
**kwargs
) -> tuple[dict[int, int], dict[int, int], float] | tuple[()]:
"""
Run optimization with automatic contention fallback.

Tries optimization without contention penalty first. If it fails,
retries with contention correction to spread VMs across clusters.

Args:
devices: Collection of devices to assign
clusters: Collection of clusters to optimize over
min_capacity: Minimum total system capacity
max_capacity: Maximum total system capacity
contention_corr: Contention penalty multiplier (default: 2.0)
solver: MILP solver to use (default: 'COIN_CMD')

Returns:
Tuple of (allocs, n_vms, objective) or empty tuple if optimization fails
"""
# First attempt: no contention allowed (strict, better performance)
# Removes last energy breakpoint → clusters cannot reach full capacity
opt = DeviceOptimizer(
devices=devices,
clusters=clusters,
Expand All @@ -272,8 +291,10 @@ def optimize_contention(
**kwargs
)
if result := opt.optimize():
return result

return result # Success: found solution without contention

# Second attempt: allow contention with penalty (relaxed, more flexible)
# Keeps last breakpoint but inflates energy consumption → clusters can reach full capacity
opt = DeviceOptimizer(
devices=devices,
clusters=clusters,
Expand Down
66 changes: 44 additions & 22 deletions modules/cluster_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,23 @@ def call_scale_endpoint(endpoint: str) -> bool:
return False


def scale_cluster(cluster_id: int, target_cardinality: int) -> bool:
def scale_cluster(cluster_id: int, target_cardinality: int, flavour: str) -> bool:
"""Scale a single cluster to target cardinality.

Args:
cluster_id: The cluster ID to scale
target_cardinality: Target number of VMs for the cluster
flavour: Flavour to use for scaling (if None, extracts from template)

Returns:
True if scaling was successful, False otherwise
"""

logger.info(f"Scaling cluster {cluster_id} with flavour {flavour} to target cardinality {target_cardinality}")
cluster_template = get_cluster_template(cluster_id)
if not cluster_template:
return False

flavour = get_flavour_from_template(cluster_template)
if not flavour:
logger.warning(f"Could not determine flavour for cluster {cluster_id}")
return False

endpoint = construct_endpoint(cluster_template, flavour, target_cardinality)
if not endpoint:
logger.warning(f"Could not construct endpoint for cluster {cluster_id} (EDGE_CLUSTER_FRONTEND missing)")
Expand All @@ -101,39 +98,64 @@ def scale_cluster(cluster_id: int, target_cardinality: int) -> bool:
def scale_clusters_and_update_db(n_vms: dict[int, int], allocs: dict) -> int:
"""
Scale clusters in parallel and update DB for each successfully scaled cluster.
Each flavour within a cluster is scaled separately.

Args:
n_vms: Cluster ID to target cardinality mapping
allocs: Device ID to cluster ID mapping
allocs: Composite ID (device_id:::flavour) to cluster ID mapping

Returns:
Total number of devices updated in database
"""
n_clusters_to_scale = len(n_vms)
logger.info("=== CLUSTER SCALING ===")
logger.info(f"Scaling {n_clusters_to_scale} clusters in parallel")

if not n_vms:
logger.info("No clusters to scale")
return 0

# Group devices by (cluster_id, flavour) and use optimizer's n_vms for target cardinality
cluster_flavour_targets = {}
for composite_id, cluster_id in allocs.items():
if cluster_id in n_vms and ':::' in composite_id:
flavour = composite_id.split(':::', 1)[1]
key = (cluster_id, flavour)
# Use optimizer's calculated VM count, ensure minimum of 1
if key not in cluster_flavour_targets:
target_cardinality = max(1, n_vms[cluster_id])
cluster_flavour_targets[key] = target_cardinality
logger.info(f"Cluster {cluster_id} (flavour {flavour}): optimizer n_vms={n_vms[cluster_id]}, target_cardinality={target_cardinality}")

if not cluster_flavour_targets:
logger.info("No device assignments found for clusters to scale")
return 0

logger.info(f"Scaling {len(cluster_flavour_targets)} cluster-flavour combinations in parallel")

total_updated = 0
max_workers = n_clusters_to_scale
max_workers = max(1, len(cluster_flavour_targets))

def scale_with_id(cid: int, card: int) -> tuple[int, bool]:
"""Scale a single cluster and return the cluster ID and scaling result."""
return cid, scale_cluster(cid, card)
def scale_with_flavour(cid: int, flavour: str, card: int) -> tuple[int, str, bool]:
"""Scale a single cluster-flavour combination and return the result."""
return cid, flavour, scale_cluster(cid, card, flavour)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(scale_with_id, cid, card): cid
for cid, card in n_vms.items()
executor.submit(scale_with_flavour, cid, flavour, target_cardinality): (cid, flavour)
for (cid, flavour), target_cardinality in cluster_flavour_targets.items()
}
scaled_clusters = set()
for future in as_completed(future_map):
cid, ok = future.result()
cid, flavour, ok = future.result()
if ok:
# Check which devices are assigned to the scaled cluster and update their corresponding cluster ID in the local database
cluster_allocs = {dev_id: cluster_id for dev_id, cluster_id in allocs.items() if cluster_id == cid}
updated = update_device_cluster_assignments(cluster_allocs)
total_updated += updated
if updated > 0:
logger.info(f"Cluster {cid} scaled successfully: {updated} devices updated")
scaled_clusters.add(cid)

# Update DB for all successfully scaled clusters
for cid in scaled_clusters:
cluster_allocs = {dev_id: cluster_id for dev_id, cluster_id in allocs.items() if cluster_id == cid}
updated = update_device_cluster_assignments(cluster_allocs)
total_updated += updated
if updated > 0:
logger.info(f"Cluster {cid} scaled successfully: {updated} devices updated")

logger.info(f"Cluster scaling completed: {total_updated} total devices updated")
return total_updated
Expand Down
64 changes: 55 additions & 9 deletions modules/db_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,68 @@

def get_device_assignments() -> list[dict]:
"""Retrieve all device assignments from database."""
return [
assignment
for device_id in _db.get_all_device_ids()
if (assignment := _db.get_device_assignment(device_id))
]
return _db.get_all_device_assignments()

def cleanup_old_records() -> int:
"""Trigger cleanup of old records in the database.

Returns:
Number of records deleted
"""
# Access the cleanup method through the singleton instance

from modules.logger import get_logger
logger = get_logger(__name__)

try:
# Get count before cleanup
before_count = len(_db.get_all_device_assignments())

# Trigger cleanup
_db.cleanup_old_records()

# Get count after cleanup
after_count = len(_db.get_all_device_assignments())

deleted_count = before_count - after_count

if deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} old device assignments")

return deleted_count
except Exception as e:
logger.error(f"Error during cleanup: {e}")
return 0

def update_device_cluster_assignments(allocations: dict[str, int]) -> int:
"""Update device cluster assignments in database only when changed."""
"""Update device cluster assignments in database only when changed.

Args:
allocations: Dict mapping composite_id (device_id:::flavour) to cluster_id

Returns:
Number of updated assignments
"""
updated_count = 0
for device_id, new_cluster_id in allocations.items():
current = _db.get_device_assignment(device_id)
# Get all assignments to verify updates
all_assignments = _db.get_all_device_assignments()
assignment_lookup = {f"{a['device_id']}:::{a['flavour']}": a for a in all_assignments}

for composite_id, new_cluster_id in allocations.items():
# Parse composite_id (format: device_id:::flavour)
if ':::' not in composite_id:
continue # Skip invalid composite IDs

device_id, flavour = composite_id.split(':::', 1)

# Get current assignment
current = assignment_lookup.get(composite_id)

if current and current['cluster_id'] != new_cluster_id:
_db.update_device_assignment(
device_id=device_id,
cluster_id=new_cluster_id,
flavour=current['flavour'],
flavour=flavour,
app_req_id=current['app_req_id'],
app_req_json=current['app_req_json']
)
Expand Down
7 changes: 7 additions & 0 deletions modules/opennebula_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def get_cluster_pool() -> tuple[list, dict[int, dict[str, Any]]]:
def get_app_requirement(app_req_id: int) -> dict:
"""Retrieve app requirement from OpenNebula by ID using OnedServerProxy."""
from device_alloc import OnedServerProxy
from device_alloc.xmlrpc_client import OneXMLRPCExistenceError
from modules.logger import get_logger

logger = get_logger(__name__)
Expand All @@ -42,6 +43,12 @@ def get_app_requirement(app_req_id: int) -> dict:
# Parse the template which contains the app requirements
template = result[DOCUMENT_KEY].get(TEMPLATE_KEY, {})
return template
except OneXMLRPCExistenceError as e:
logger.debug(
f"App requirement {app_req_id} not found in OpenNebula database. "
f"This app_req_id was most probably deleted when device_runtime stopped."
)
return {}
except Exception as e:
logger.error(f"Failed to fetch app requirement {app_req_id}: {e}")
return {}
Expand Down
51 changes: 36 additions & 15 deletions modules/optimizer_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,31 @@ def _format_cluster_attributes(cluster_id: int, template: dict[str, Any]) -> str
f"CARBON_INTENSITY={template.get('CARBON_INTENSITY')}")

def create_devices_from_assignments(assignments: list[dict]) -> list:
"""Create Device objects for the optimization algorithm from database assignments with per-device feasible clusters."""
"""Create Device objects for the optimization algorithm from database assignments with per-device feasible clusters.

Uses composite identifier (device_id:::flavour) to uniquely identify each device-flavour combination.
"""
from device_alloc import Device
from modules.opennebula_adapter import get_feasible_clusters_for_device

devices = []
for assignment in assignments:
device_id = assignment['device_id']
flavour = assignment['flavour']
load = assignment['estimated_load']
capacity_load = 1.0 # TODO: Check with colleagues
app_req_id = assignment['app_req_id']

# Create composite identifier: device_id:::flavour
# Using ::: as separator (unlikely to appear in device_id or flavour)
composite_id = f"{device_id}:::{flavour}"

# Get feasible clusters for this device based on app requirements
feasible_cluster_ids = get_feasible_clusters_for_device(app_req_id)

device = Device(
id=device_id,
id=composite_id,
load=load,
capacity_load=capacity_load,
capacity_load=load,
cluster_ids=feasible_cluster_ids
)
devices.append(device)
Expand All @@ -48,36 +55,47 @@ def create_devices_from_assignments(assignments: list[dict]) -> list:

def optimize_device_assignments(devices: list, clusters: list) -> tuple:
"""Run optimization algorithm on devices and clusters."""
from device_alloc import optimize

results = optimize(clusters=clusters, devices=devices, n_iter=5)
from device_alloc import optimize_contention

if results:
return results[0]
return ()
# Here we can call the optimizer method with multiple iterations and develop a logic to select the best result
# For now, we just call the optimize_contention method with one (or max 2) iteration
return optimize_contention(devices=devices, clusters=clusters)

def run_optimization_with_db_updates() -> tuple | None:
"""Run complete optimization cycle with devices database updates."""
from modules.db_adapter import get_device_assignments
from modules.db_adapter import get_device_assignments, cleanup_old_records
from modules.opennebula_adapter import get_cluster_pool, get_app_requirement

try:
# Clean up old records (older than DB_CLEANUP_DAYS)
cleanup_old_records()

assignments = get_device_assignments()

logger.info("=== DEVICE REQUIREMENTS ===")
valid_assignments = []
for assignment in assignments:
device_id = assignment['device_id']
app_req_id = assignment['app_req_id']
app_req = get_app_requirement(app_req_id)
if not app_req:
logger.warning(f"{device_id}: Could not fetch app requirements (app_req_id={app_req_id})")
logger.info(f"Device {device_id}: Skipping device - app requirement {app_req_id} not found in OpenNebula")
continue
logger.info(_format_device_requirements(device_id, app_req))
valid_assignments.append(assignment)

if not valid_assignments:
logger.warning("No devices with valid app requirements found. Skipping optimization.")
return None

devices = create_devices_from_assignments(assignments)
devices = create_devices_from_assignments(valid_assignments)

# Filter cluster pool to only include clusters that are feasible for at least one device
all_feasible_cluster_ids = {cid for device in devices for cid in device.cluster_ids}

logger.info(f"=== FEASIBILITY CHECK ===")
logger.info(f"Total devices: {len(devices)}")
logger.info(f"All feasible cluster IDs across all devices: {sorted(all_feasible_cluster_ids)}")

clusters, cluster_lookup = get_cluster_pool()
filtered_clusters = [c for c in clusters if c.id in all_feasible_cluster_ids]
Expand All @@ -102,8 +120,11 @@ def run_optimization_with_db_updates() -> tuple | None:
if result:
allocs, n_vms, objective = result
logger.info("=== OPTIMIZATION RESULT ===")
for device_id, cluster_id in allocs.items():
logger.info(f"{device_id} -> Cluster {cluster_id}")
for composite_id, cluster_id in allocs.items():
# Parse composite_id (format: device_id:::flavour)
device_id = composite_id.split(':::')[0] if ':::' in composite_id else composite_id
flavour = composite_id.split(':::')[1] if ':::' in composite_id else None
logger.info(f"Device {device_id} - Flavour {flavour} --> Cluster {cluster_id}")

scale_clusters_and_update_db(n_vms, allocs)

Expand Down
Loading