Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions caso/extract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
from caso.extract.openstack import CinderExtractor
from caso.extract.openstack import NeutronExtractor
from caso.extract.openstack import NovaExtractor
from caso.extract.prometheus import PrometheusExtractor
Copy link
Member

@alvarolopez alvarolopez Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot Let's not call it "PrometheusExtractor" but "EnergyConsumptionExtractor"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to EnergyConsumptionExtractor in commit 6e355c5.


__all__ = [
"NovaExtractor",
"CinderExtractor",
"NeutronExtractor",
"PrometheusExtractor",
]
219 changes: 219 additions & 0 deletions caso/extract/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# -*- coding: utf-8 -*-

# Copyright 2014 Spanish National Research Council (CSIC)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Module containing the Prometheus extractor for energy consumption metrics."""

import uuid

import requests
from oslo_config import cfg
from oslo_log import log

from caso.extract.openstack import base
from caso import record

CONF = cfg.CONF

opts = [
cfg.StrOpt(
"prometheus_endpoint",
default="http://localhost:9090",
help="Prometheus server endpoint URL.",
),
cfg.StrOpt(
"prometheus_query",
default="sum(rate(libvirt_domain_info_energy_consumption_joules_total"
'{uuid=~"{{uuid}}"}[5m])) * 300 / 3600000',
help="Prometheus query to retrieve energy consumption in kWh. "
"The query can use {{uuid}} as a template variable for the VM UUID.",
),
cfg.IntOpt(
"prometheus_timeout",
default=30,
help="Timeout for Prometheus API requests in seconds.",
),
]

CONF.import_opt("site_name", "caso.extract.base")
CONF.register_opts(opts, group="prometheus")

LOG = log.getLogger(__name__)


class PrometheusExtractor(base.BaseOpenStackExtractor):
"""A Prometheus extractor for energy consumption metrics in cASO."""

def __init__(self, project, vo):
"""Initialize a Prometheus extractor for a given project."""
super(PrometheusExtractor, self).__init__(project, vo)
self.nova = self._get_nova_client()

def _query_prometheus(self, query, timestamp=None):
"""Query Prometheus API and return results.

:param query: PromQL query string
:param timestamp: Optional timestamp for query (datetime object)
:returns: Query results
"""
endpoint = CONF.prometheus.prometheus_endpoint
url = f"{endpoint}/api/v1/query"

params = {"query": query}
if timestamp:
params["time"] = int(timestamp.timestamp())

try:
response = requests.get(
url, params=params, timeout=CONF.prometheus.prometheus_timeout
)
response.raise_for_status()
data = response.json()

if data.get("status") != "success":
error_msg = data.get("error", "Unknown error")
LOG.error(f"Prometheus query failed: {error_msg}")
return None

return data.get("data", {}).get("result", [])
except requests.exceptions.RequestException as e:
LOG.error(f"Failed to query Prometheus: {e}")
return None
except Exception as e:
LOG.error(f"Unexpected error querying Prometheus: {e}")
return None

def _get_servers(self, extract_from):
"""Get all servers for a given date."""
servers = []
limit = 200
marker = None
# Use a marker and iter over results until we do not have more to get
while True:
aux = self.nova.servers.list(
search_opts={
"changes-since": extract_from,
"project_id": self.project_id,
"all_tenants": True,
},
limit=limit,
marker=marker,
)
servers.extend(aux)

if len(aux) < limit:
break
marker = aux[-1].id

return servers

def _build_energy_record(self, vm_uuid, vm_name, energy_value, measurement_time):
"""Build an energy consumption record for a VM.

:param vm_uuid: VM UUID
:param vm_name: VM name
:param energy_value: Energy consumption value in kWh
:param measurement_time: Time of measurement
:returns: EnergyRecord object
"""
r = record.EnergyRecord(
uuid=uuid.uuid4(),
measurement_time=measurement_time,
site_name=CONF.site_name,
user_id=None,
group_id=self.project_id,
user_dn=None,
fqan=self.vo,
energy_consumption=energy_value,
energy_unit="kWh",
compute_service=CONF.service_name,
)

return r

def extract(self, extract_from, extract_to):
"""Extract energy consumption records from Prometheus.

This method queries Prometheus for energy consumption metrics
for each VM in the project.

:param extract_from: datetime.datetime object indicating the date to
extract records from
:param extract_to: datetime.datetime object indicating the date to
extract records to
:returns: A list of energy records
"""
# Remove timezone as Nova doesn't expect it
extract_from = extract_from.replace(tzinfo=None)
extract_to = extract_to.replace(tzinfo=None)

records = []

# Get all servers for the project
LOG.debug(f"Getting servers for project {self.project}")
servers = self._get_servers(extract_from)

LOG.info(
f"Found {len(servers)} VMs for project {self.project}, "
f"querying Prometheus for energy metrics"
)

# Query Prometheus for each server
query_template = CONF.prometheus.prometheus_query

for server in servers:
vm_uuid = str(server.id)
vm_name = server.name

# Replace template variables in the query
query = query_template.replace("{{uuid}}", vm_uuid)

LOG.debug(
f"Querying Prometheus for VM {vm_name} ({vm_uuid}) "
f"with query: {query}"
)

results = self._query_prometheus(query, extract_to)

if results is None:
LOG.warning(
f"No results returned from Prometheus for VM "
f"{vm_name} ({vm_uuid})"
)
continue

# Process results and create records
for result in results:
value = result.get("value", [])

if len(value) < 2:
continue

# value is [timestamp, value_string]
energy_value = float(value[1])

LOG.debug(
f"Creating energy record: {energy_value} kWh "
f"for VM {vm_name} ({vm_uuid})"
)

energy_record = self._build_energy_record(
vm_uuid, vm_name, energy_value, extract_to
)
records.append(energy_record)

LOG.info(f"Extracted {len(records)} energy records for project {self.project}")

return records
2 changes: 2 additions & 0 deletions caso/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import caso.extract.base
import caso.extract.manager
import caso.extract.openstack.nova
import caso.extract.prometheus
import caso.keystone_client
import caso.manager
import caso.messenger.logstash
Expand All @@ -43,5 +44,6 @@ def list_opts():
("benchmark", caso.extract.openstack.nova.benchmark_opts),
("keystone_auth", caso.keystone_client.opts),
("logstash", caso.messenger.logstash.opts),
("prometheus", caso.extract.prometheus.opts),
("ssm", caso.messenger.ssm.opts),
]
78 changes: 78 additions & 0 deletions caso/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,81 @@ def ssm_message(self):
populate_by_name=True,
extra="forbid",
)


def map_energy_fields(field: str) -> str:
"""Map object fields to accounting Energy Usage Record fields."""
d = {
Copy link
Member

@alvarolopez alvarolopez Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot The record should look like this:

{
"CloudComputeService": "ifca",
"CloudType": "openstack",
"CPUNormalizationFactor": 2.7,
"CpuDuration_s": 1800,
"Efficiency": 0.5,
"EndExecTime": "2024-09-09T18:00:00Z",
"EnergyWh": 5.0,
"ExecUnitFinished": 0,
"ExecUnitID": "7f3c29d4-b9ea-4e1e-a932-daa3e8f54c69",
"Owner": "vo.ai4eosc.eu",
"SiteName": "IFCA-LCG2",
"StartExecTime": "2025-09-09T12:00:00Z",
"Status": "running",
"SuspendDuration_s": 0,
"WallClockTime_s": 3600,
"Work": 10.0
}

ExecUnitID refers to the VM uuid

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated EnergyRecord to match the specified format in commit 6e355c5. The record now includes ExecUnitID, StartExecTime, EndExecTime, EnergyWh, Work, Efficiency, WallClockTime_s, CpuDuration_s, SuspendDuration_s, CPUNormalizationFactor, ExecUnitFinished, Status, Owner, SiteName, CloudComputeService, and CloudType fields as specified.

"measurement_time_epoch": "MeasurementTime",
"site_name": "SiteName",
"cloud_type": "CloudType",
"user_id": "LocalUser",
"group_id": "LocalGroup",
"fqan": "FQAN",
"user_dn": "GlobalUserName",
"energy_consumption": "EnergyConsumption",
"energy_unit": "EnergyUnit",
"compute_service": "CloudComputeService",
}
return d.get(field, field)


class EnergyRecord(_BaseRecord):
"""The EnergyRecord class holds energy consumption information.

This class is used to report energy consumption metrics gathered from
external monitoring systems like Prometheus.
"""

version: str = pydantic.Field("0.1", exclude=True)

uuid: m_uuid.UUID

user_id: typing.Optional[str]
user_dn: typing.Optional[str]
group_id: str
fqan: str

# Make these fields private, and deal with them as properties. This is done as all
# the accounting infrastructure needs start and end times as integers, but it is
# easier for us to maintain them as datetime objects internally.
_measurement_time: datetime.datetime

energy_consumption: float
energy_unit: str = "kWh"

def __init__(self, measurement_time: datetime.datetime, *args, **kwargs):
"""Initialize the record."""
super(EnergyRecord, self).__init__(*args, **kwargs)

self._measurement_time = measurement_time

@property
def measurement_time(self) -> datetime.datetime:
"""Get measurement time."""
return self._measurement_time

@measurement_time.setter
def measurement_time(self, measurement_time: datetime.datetime) -> None:
"""Set measurement time."""
self._measurement_time = measurement_time

@pydantic.computed_field() # type: ignore[misc]
@property
def measurement_time_epoch(self) -> int:
"""Get measurement time as epoch."""
return int(self._measurement_time.timestamp())

def ssm_message(self):
"""Render record as the expected SSM message."""
opts = {
"by_alias": True,
"exclude_none": True,
}
return self.model_dump_json(**opts)

model_config = dict(
alias_generator=map_energy_fields,
populate_by_name=True,
extra="forbid",
)
48 changes: 48 additions & 0 deletions caso/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,3 +757,51 @@ def expected_message_storage() -> str:
"</sr:StorageUsageRecords>"
)
return message


# Energy record fixtures

valid_energy_records_fields = [
dict(
uuid="e3c5aeef-37b8-4332-ad9f-9d068f156dc2",
measurement_time=now,
site_name="TEST-Site",
user_id="a4519d7d-f60a-4908-9d63-7d9e17422188",
group_id="03b6a6c4-cf2b-48b9-82f1-69c52b9f30af",
user_dn="User 1 DN",
fqan="VO 1 FQAN",
energy_consumption=125.5,
energy_unit="kWh",
compute_service="Fake Cloud Service",
cloud_type=cloud_type,
),
]

valid_energy_records_dict = [
{
"CloudComputeService": "Fake Cloud Service",
"FQAN": "VO 1 FQAN",
"GlobalUserName": "User 1 DN",
"EnergyConsumption": 125.5,
"EnergyUnit": "kWh",
"LocalGroup": "03b6a6c4-cf2b-48b9-82f1-69c52b9f30af",
"LocalUser": "a4519d7d-f60a-4908-9d63-7d9e17422188",
"MeasurementTime": 1685051946,
"SiteName": "TEST-Site",
"uuid": "e3c5aeef-37b8-4332-ad9f-9d068f156dc2",
"CloudType": cloud_type,
},
]


@pytest.fixture()
def energy_record() -> caso.record.EnergyRecord:
"""Get a fixture for an Energy record."""
record = caso.record.EnergyRecord(**valid_energy_records_fields[0])
return record


@pytest.fixture()
def valid_energy_record() -> dict:
"""Get a fixture for a valid Energy record as a dict."""
return valid_energy_records_dict[0]
Loading
Loading