Skip to content

Commit 6938099

Browse files
Copilotalvarolopez
andcommitted
feat: Add Prometheus extractor and EnergyRecord for energy consumption metrics
Co-authored-by: alvarolopez <[email protected]>
1 parent a035867 commit 6938099

File tree

7 files changed

+460
-0
lines changed

7 files changed

+460
-0
lines changed

caso/extract/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
from caso.extract.openstack import CinderExtractor
2020
from caso.extract.openstack import NeutronExtractor
2121
from caso.extract.openstack import NovaExtractor
22+
from caso.extract.prometheus import PrometheusExtractor
2223

2324
__all__ = [
2425
"NovaExtractor",
2526
"CinderExtractor",
2627
"NeutronExtractor",
28+
"PrometheusExtractor",
2729
]

caso/extract/prometheus.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# Copyright 2014 Spanish National Research Council (CSIC)
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License. You may obtain
7+
# a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
# License for the specific language governing permissions and limitations
15+
# under the License.
16+
17+
"""Module containing the Prometheus extractor for energy consumption metrics."""
18+
19+
import uuid
20+
21+
import requests
22+
from oslo_config import cfg
23+
from oslo_log import log
24+
25+
from caso.extract import base
26+
from caso import record
27+
28+
CONF = cfg.CONF
29+
30+
opts = [
31+
cfg.StrOpt(
32+
"prometheus_endpoint",
33+
default="http://localhost:9090",
34+
help="Prometheus server endpoint URL.",
35+
),
36+
cfg.StrOpt(
37+
"prometheus_query",
38+
default="sum(rate(node_energy_joules_total[5m])) * 300 / 3600000",
39+
help="Prometheus query to retrieve energy consumption in kWh. "
40+
"The query should return energy consumption metrics.",
41+
),
42+
cfg.IntOpt(
43+
"prometheus_timeout",
44+
default=30,
45+
help="Timeout for Prometheus API requests in seconds.",
46+
),
47+
]
48+
49+
CONF.import_opt("site_name", "caso.extract.base")
50+
CONF.register_opts(opts, group="prometheus")
51+
52+
LOG = log.getLogger(__name__)
53+
54+
55+
class PrometheusExtractor(base.BaseProjectExtractor):
56+
"""A Prometheus extractor for energy consumption metrics in cASO."""
57+
58+
def __init__(self, project, vo):
59+
"""Initialize a Prometheus extractor for a given project."""
60+
super(PrometheusExtractor, self).__init__(project)
61+
self.vo = vo
62+
self.project_id = project
63+
64+
def _query_prometheus(self, query, timestamp=None):
65+
"""Query Prometheus API and return results.
66+
67+
:param query: PromQL query string
68+
:param timestamp: Optional timestamp for query (datetime object)
69+
:returns: Query results
70+
"""
71+
endpoint = CONF.prometheus.prometheus_endpoint
72+
url = f"{endpoint}/api/v1/query"
73+
74+
params = {"query": query}
75+
if timestamp:
76+
params["time"] = int(timestamp.timestamp())
77+
78+
try:
79+
response = requests.get(
80+
url, params=params, timeout=CONF.prometheus.prometheus_timeout
81+
)
82+
response.raise_for_status()
83+
data = response.json()
84+
85+
if data.get("status") != "success":
86+
error_msg = data.get("error", "Unknown error")
87+
LOG.error(f"Prometheus query failed: {error_msg}")
88+
return None
89+
90+
return data.get("data", {}).get("result", [])
91+
except requests.exceptions.RequestException as e:
92+
LOG.error(f"Failed to query Prometheus: {e}")
93+
return None
94+
except Exception as e:
95+
LOG.error(f"Unexpected error querying Prometheus: {e}")
96+
return None
97+
98+
def _build_energy_record(self, energy_value, measurement_time):
99+
"""Build an energy consumption record.
100+
101+
:param energy_value: Energy consumption value in kWh
102+
:param measurement_time: Time of measurement
103+
:returns: EnergyRecord object
104+
"""
105+
r = record.EnergyRecord(
106+
uuid=uuid.uuid4(),
107+
measurement_time=measurement_time,
108+
site_name=CONF.site_name,
109+
user_id=None,
110+
group_id=self.project_id,
111+
user_dn=None,
112+
fqan=self.vo,
113+
energy_consumption=energy_value,
114+
energy_unit="kWh",
115+
compute_service=CONF.service_name,
116+
)
117+
118+
return r
119+
120+
def extract(self, extract_from, extract_to):
121+
"""Extract energy consumption records from Prometheus.
122+
123+
This method queries Prometheus for energy consumption metrics
124+
in the specified time range.
125+
126+
:param extract_from: datetime.datetime object indicating the date to
127+
extract records from
128+
:param extract_to: datetime.datetime object indicating the date to
129+
extract records to
130+
:returns: A list of energy records
131+
"""
132+
records = []
133+
134+
# Query Prometheus at the extract_to timestamp
135+
query = CONF.prometheus.prometheus_query
136+
LOG.debug(
137+
f"Querying Prometheus for project {self.project} " f"with query: {query}"
138+
)
139+
140+
results = self._query_prometheus(query, extract_to)
141+
142+
if results is None:
143+
LOG.warning(
144+
f"No results returned from Prometheus for project {self.project}"
145+
)
146+
return records
147+
148+
# Process results and create records
149+
for result in results:
150+
value = result.get("value", [])
151+
152+
if len(value) < 2:
153+
continue
154+
155+
# value is [timestamp, value_string]
156+
energy_value = float(value[1])
157+
158+
LOG.debug(
159+
f"Creating energy record: {energy_value} kWh "
160+
f"for project {self.project}"
161+
)
162+
163+
energy_record = self._build_energy_record(energy_value, extract_to)
164+
records.append(energy_record)
165+
166+
LOG.info(f"Extracted {len(records)} energy records for project {self.project}")
167+
168+
return records

caso/record.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,3 +555,81 @@ def ssm_message(self):
555555
populate_by_name=True,
556556
extra="forbid",
557557
)
558+
559+
560+
def map_energy_fields(field: str) -> str:
561+
"""Map object fields to accounting Energy Usage Record fields."""
562+
d = {
563+
"measurement_time_epoch": "MeasurementTime",
564+
"site_name": "SiteName",
565+
"cloud_type": "CloudType",
566+
"user_id": "LocalUser",
567+
"group_id": "LocalGroup",
568+
"fqan": "FQAN",
569+
"user_dn": "GlobalUserName",
570+
"energy_consumption": "EnergyConsumption",
571+
"energy_unit": "EnergyUnit",
572+
"compute_service": "CloudComputeService",
573+
}
574+
return d.get(field, field)
575+
576+
577+
class EnergyRecord(_BaseRecord):
578+
"""The EnergyRecord class holds energy consumption information.
579+
580+
This class is used to report energy consumption metrics gathered from
581+
external monitoring systems like Prometheus.
582+
"""
583+
584+
version: str = pydantic.Field("0.1", exclude=True)
585+
586+
uuid: m_uuid.UUID
587+
588+
user_id: typing.Optional[str]
589+
user_dn: typing.Optional[str]
590+
group_id: str
591+
fqan: str
592+
593+
# Make these fields private, and deal with them as properties. This is done as all
594+
# the accounting infrastructure needs start and end times as integers, but it is
595+
# easier for us to maintain them as datetime objects internally.
596+
_measurement_time: datetime.datetime
597+
598+
energy_consumption: float
599+
energy_unit: str = "kWh"
600+
601+
def __init__(self, measurement_time: datetime.datetime, *args, **kwargs):
602+
"""Initialize the record."""
603+
super(EnergyRecord, self).__init__(*args, **kwargs)
604+
605+
self._measurement_time = measurement_time
606+
607+
@property
608+
def measurement_time(self) -> datetime.datetime:
609+
"""Get measurement time."""
610+
return self._measurement_time
611+
612+
@measurement_time.setter
613+
def measurement_time(self, measurement_time: datetime.datetime) -> None:
614+
"""Set measurement time."""
615+
self._measurement_time = measurement_time
616+
617+
@pydantic.computed_field() # type: ignore[misc]
618+
@property
619+
def measurement_time_epoch(self) -> int:
620+
"""Get measurement time as epoch."""
621+
return int(self._measurement_time.timestamp())
622+
623+
def ssm_message(self):
624+
"""Render record as the expected SSM message."""
625+
opts = {
626+
"by_alias": True,
627+
"exclude_none": True,
628+
}
629+
return self.model_dump_json(**opts)
630+
631+
model_config = dict(
632+
alias_generator=map_energy_fields,
633+
populate_by_name=True,
634+
extra="forbid",
635+
)

caso/tests/conftest.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,3 +757,51 @@ def expected_message_storage() -> str:
757757
"</sr:StorageUsageRecords>"
758758
)
759759
return message
760+
761+
762+
# Energy record fixtures
763+
764+
valid_energy_records_fields = [
765+
dict(
766+
uuid="e3c5aeef-37b8-4332-ad9f-9d068f156dc2",
767+
measurement_time=now,
768+
site_name="TEST-Site",
769+
user_id="a4519d7d-f60a-4908-9d63-7d9e17422188",
770+
group_id="03b6a6c4-cf2b-48b9-82f1-69c52b9f30af",
771+
user_dn="User 1 DN",
772+
fqan="VO 1 FQAN",
773+
energy_consumption=125.5,
774+
energy_unit="kWh",
775+
compute_service="Fake Cloud Service",
776+
cloud_type=cloud_type,
777+
),
778+
]
779+
780+
valid_energy_records_dict = [
781+
{
782+
"CloudComputeService": "Fake Cloud Service",
783+
"FQAN": "VO 1 FQAN",
784+
"GlobalUserName": "User 1 DN",
785+
"EnergyConsumption": 125.5,
786+
"EnergyUnit": "kWh",
787+
"LocalGroup": "03b6a6c4-cf2b-48b9-82f1-69c52b9f30af",
788+
"LocalUser": "a4519d7d-f60a-4908-9d63-7d9e17422188",
789+
"MeasurementTime": 1685051946,
790+
"SiteName": "TEST-Site",
791+
"uuid": "e3c5aeef-37b8-4332-ad9f-9d068f156dc2",
792+
"CloudType": cloud_type,
793+
},
794+
]
795+
796+
797+
@pytest.fixture()
798+
def energy_record() -> caso.record.EnergyRecord:
799+
"""Get a fixture for an Energy record."""
800+
record = caso.record.EnergyRecord(**valid_energy_records_fields[0])
801+
return record
802+
803+
804+
@pytest.fixture()
805+
def valid_energy_record() -> dict:
806+
"""Get a fixture for a valid Energy record as a dict."""
807+
return valid_energy_records_dict[0]

0 commit comments

Comments
 (0)