|
| 1 | +from decimal import Decimal |
| 2 | +from datetime import datetime, timedelta, timezone |
| 3 | +from dataclasses import dataclass |
| 4 | +import functools |
| 5 | +import logging |
| 6 | +import os |
| 7 | +import tempfile |
| 8 | +from typing import Optional |
| 9 | + |
| 10 | +from coldfront_plugin_cloud import attributes |
| 11 | +from coldfront.core.utils.common import import_from_settings |
| 12 | +from coldfront_plugin_cloud import usage_models |
| 13 | +from coldfront_plugin_cloud.usage_models import UsageInfo, validate_date_str |
| 14 | +from coldfront_plugin_cloud import utils |
| 15 | + |
| 16 | +import boto3 |
| 17 | +from django.core.management.base import BaseCommand |
| 18 | +from coldfront.core.resource.models import Resource |
| 19 | +from coldfront.core.allocation.models import Allocation |
| 20 | +from coldfront.core.utils import mail |
| 21 | +import pandas |
| 22 | +import pyarrow |
| 23 | +from pandas.core.groupby.generic import DataFrameGroupBy |
| 24 | + |
| 25 | + |
| 26 | +logger = logging.getLogger(__name__) |
| 27 | +logger.setLevel(logging.INFO) |
| 28 | + |
| 29 | +RESOURCES_DAILY_ENABLED = ["NERC-OCP", "NERC-EDU", "NERC"] |
| 30 | +RESOURCE_NAME_TO_FILE = { |
| 31 | + "NERC": "NERC OpenStack", |
| 32 | + "NERC-OCP": "ocp-prod", |
| 33 | + "NERC-OCP-EDU": "academic", |
| 34 | +} |
| 35 | +STORAGE_FILE = "NERC Storage" |
| 36 | +ALLOCATION_STATES_TO_PROCESS = ["Active", "Active (Needs Renewal)"] |
| 37 | + |
| 38 | +INVOICE_COLUMN_ALLOCATION_ID = "Project - Allocation ID" |
| 39 | +INVOICE_COLUMN_SU_TYPE = "SU Type" |
| 40 | +INVOICE_COLUMN_COST = "Cost" |
| 41 | + |
| 42 | +S3_KEY_ID = os.getenv("S3_INVOICING_ACCESS_KEY_ID") |
| 43 | +S3_SECRET = os.getenv("S3_INVOICING_SECRET_ACCESS_KEY") |
| 44 | +S3_ENDPOINT = os.getenv( |
| 45 | + "S3_INVOICING_ENDPOINT_URL", "https://s3.us-east-005.backblazeb2.com" |
| 46 | +) |
| 47 | +S3_BUCKET = os.getenv("S3_INVOICING_BUCKET", "nerc-invoicing") |
| 48 | + |
| 49 | +CENTER_BASE_URL = import_from_settings("CENTER_BASE_URL") |
| 50 | +EMAIL_SENDER = import_from_settings("EMAIL_SENDER") |
| 51 | +EMAIL_ENABLED = import_from_settings("EMAIL_ENABLED") |
| 52 | +EMAIL_TEMPLATE = """Dear New England Research Cloud user, |
| 53 | +
|
| 54 | +Your {resource.name} {resource.type} Allocation in project {allocation.project.title} has reached your preset Alert value. |
| 55 | +
|
| 56 | +- As of midnight last night, your Allocation reached or exceeded your preset Alert value of {alert_value}. |
| 57 | +- To view your Allocation information visit {url}/allocation/{allocation.id} |
| 58 | +
|
| 59 | +Thank you, |
| 60 | +New England Research Cloud (NERC) |
| 61 | +https://nerc.mghpcc.org/ |
| 62 | +""" |
| 63 | + |
| 64 | + |
| 65 | +@dataclass() |
| 66 | +class TotalByDate(object): |
| 67 | + date: str |
| 68 | + total: Decimal |
| 69 | + |
| 70 | + def __str__(self): |
| 71 | + return f"{self.date}: {self.total} USD" |
| 72 | + |
| 73 | + |
| 74 | +class Command(BaseCommand): |
| 75 | + help = "Fetch daily billable usage." |
| 76 | + |
| 77 | + @property |
| 78 | + def previous_day(self): |
| 79 | + return datetime.now(timezone.utc) - timedelta(days=1) |
| 80 | + |
| 81 | + @property |
| 82 | + def previous_day_string(self): |
| 83 | + return self.previous_day.strftime("%Y-%m-%d") |
| 84 | + |
| 85 | + def add_arguments(self, parser): |
| 86 | + parser.add_argument( |
| 87 | + "--date", type=str, default=self.previous_day_string, help="Date." |
| 88 | + ) |
| 89 | + |
| 90 | + def handle(self, *args, **options): |
| 91 | + date = options["date"] |
| 92 | + validate_date_str(date) |
| 93 | + |
| 94 | + allocations = self.get_allocations_for_daily_billing() |
| 95 | + |
| 96 | + for allocation in allocations: |
| 97 | + resource = allocation.resources.first() |
| 98 | + allocation_project_id = allocation.get_attribute( |
| 99 | + attributes.ALLOCATION_PROJECT_ID |
| 100 | + ) |
| 101 | + |
| 102 | + if not allocation_project_id: |
| 103 | + logger.warning( |
| 104 | + f"Allocation {allocation.id} is in an active state without a Project ID attribute. Skipping." |
| 105 | + ) |
| 106 | + continue |
| 107 | + |
| 108 | + previous_total = self.get_total_from_attribute(allocation) |
| 109 | + |
| 110 | + try: |
| 111 | + # We must ensure both the cluster charges for the allocation and the storage |
| 112 | + # charges are both processed otherwise the value will be misleading. |
| 113 | + cluster_usage = self.get_allocation_usage( |
| 114 | + resource.name, date, allocation_project_id |
| 115 | + ) |
| 116 | + storage_usage = self.get_allocation_usage( |
| 117 | + STORAGE_FILE, date, allocation_project_id |
| 118 | + ) |
| 119 | + new_usage = usage_models.merge_models(cluster_usage, storage_usage) |
| 120 | + except Exception as e: |
| 121 | + logger.error( |
| 122 | + f"Unable to get daily billable usage from {resource.name}, skipping {allocation_project_id}: {e}" |
| 123 | + ) |
| 124 | + continue |
| 125 | + |
| 126 | + # Only update the latest value if the processed date is newer or same date. |
| 127 | + if not previous_total or date >= previous_total.date: |
| 128 | + new_total = TotalByDate(date, new_usage.total_charges) |
| 129 | + |
| 130 | + self.set_total_on_attribute(allocation, new_total) |
| 131 | + self.handle_alerting(allocation, previous_total, new_total) |
| 132 | + |
| 133 | + @staticmethod |
| 134 | + def get_daily_location_for_prefix(prefix: str, date: str): |
| 135 | + """Formats the S3 location for a given prefix and date. |
| 136 | +
|
| 137 | + For example, the service invoices for the Resource of type OpenStack and name |
| 138 | + NERC are located in /Invoices/<YYYY-MM>/Service Invoices/NERC OpenStack <YYYY-MM>""" |
| 139 | + return f"Invoices/{usage_models.get_invoice_month_from_date(date)}/Service Invoices/{prefix} {date}.csv" |
| 140 | + |
| 141 | + @staticmethod |
| 142 | + def get_allocations_for_daily_billing(): |
| 143 | + """Fetches all allocations of the production resources that are in the two Active states.""" |
| 144 | + return Allocation.objects.filter( |
| 145 | + resources__name__in=RESOURCES_DAILY_ENABLED, |
| 146 | + status__name__in=ALLOCATION_STATES_TO_PROCESS, |
| 147 | + ) |
| 148 | + |
| 149 | + @staticmethod |
| 150 | + def set_total_on_attribute(allocation, total_by_date: TotalByDate): |
| 151 | + """Adds the cumulative charges attribute to a resource.""" |
| 152 | + attribute_value = str(total_by_date) |
| 153 | + utils.set_attribute_on_allocation( |
| 154 | + allocation, attributes.ALLOCATION_CUMULATIVE_CHARGES, attribute_value |
| 155 | + ) |
| 156 | + |
| 157 | + @staticmethod |
| 158 | + def get_total_from_attribute(allocation: Allocation) -> Optional[TotalByDate]: |
| 159 | + """Load the total and date from the allocation attribute. |
| 160 | +
|
| 161 | + The format is <YYYY-MM-DD>: <Total> USD""" |
| 162 | + total = allocation.get_attribute(attributes.ALLOCATION_CUMULATIVE_CHARGES) |
| 163 | + if not total: |
| 164 | + return None |
| 165 | + |
| 166 | + try: |
| 167 | + date, total = total.rstrip(" USD").split(": ") |
| 168 | + return TotalByDate(date=date, total=Decimal(total)) |
| 169 | + except ValueError as e: |
| 170 | + logger.warning( |
| 171 | + f"Unable to parse total from attribute for allocation {allocation.id}: {e}" |
| 172 | + ) |
| 173 | + return None |
| 174 | + |
| 175 | + @functools.cached_property |
| 176 | + def s3_client(self): |
| 177 | + if not S3_KEY_ID or not S3_SECRET: |
| 178 | + raise Exception( |
| 179 | + "Must provide S3_INVOICING_ACCESS_KEY_ID and" |
| 180 | + " S3_INVOICING_SECRET_ACCESS_KEY environment variables." |
| 181 | + ) |
| 182 | + |
| 183 | + s3 = boto3.client( |
| 184 | + "s3", |
| 185 | + endpoint_url=S3_ENDPOINT, |
| 186 | + aws_access_key_id=S3_KEY_ID, |
| 187 | + aws_secret_access_key=S3_SECRET, |
| 188 | + ) |
| 189 | + return s3 |
| 190 | + |
| 191 | + @staticmethod |
| 192 | + @functools.cache |
| 193 | + def load_csv(location) -> DataFrameGroupBy: |
| 194 | + df = pandas.read_csv( |
| 195 | + location, |
| 196 | + dtype={INVOICE_COLUMN_COST: pandas.ArrowDtype(pyarrow.decimal128(12, 2))}, |
| 197 | + ) |
| 198 | + return df.groupby(INVOICE_COLUMN_ALLOCATION_ID) |
| 199 | + |
| 200 | + @functools.cache |
| 201 | + def load_service_invoice(self, resource: str, date_str: str) -> DataFrameGroupBy: |
| 202 | + """Fetches the dataframe of an invoice from S3.""" |
| 203 | + if resource in RESOURCE_NAME_TO_FILE: |
| 204 | + resource = RESOURCE_NAME_TO_FILE[resource] |
| 205 | + |
| 206 | + key = self.get_daily_location_for_prefix(resource, date_str) |
| 207 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 208 | + filename = os.path.basename(key) |
| 209 | + download_location = os.path.join(tmpdir, filename) |
| 210 | + logger.info(f"Downloading invoice {key} to {download_location}.") |
| 211 | + self.s3_client.download_file(S3_BUCKET, key, download_location) |
| 212 | + return self.load_csv(download_location) |
| 213 | + |
| 214 | + def get_allocation_usage( |
| 215 | + self, resource: str, date_str: str, allocation_id |
| 216 | + ) -> UsageInfo: |
| 217 | + """Loads the service invoice and parse UsageInfo for a specific allocation.""" |
| 218 | + invoice = self.load_service_invoice(resource, date_str) |
| 219 | + |
| 220 | + try: |
| 221 | + df = invoice.get_group(allocation_id)[ |
| 222 | + [INVOICE_COLUMN_SU_TYPE, INVOICE_COLUMN_COST] |
| 223 | + ] |
| 224 | + except KeyError: |
| 225 | + logger.debug(f"No usage for allocation {allocation_id}.") |
| 226 | + return UsageInfo({}) |
| 227 | + |
| 228 | + return UsageInfo( |
| 229 | + df.set_index(INVOICE_COLUMN_SU_TYPE)[INVOICE_COLUMN_COST].to_dict() |
| 230 | + ) |
| 231 | + |
| 232 | + @classmethod |
| 233 | + def handle_alerting( |
| 234 | + cls, allocation, previous_total: TotalByDate, new_total: TotalByDate |
| 235 | + ): |
| 236 | + allocation_alerting_value = allocation.get_attribute( |
| 237 | + attributes.ALLOCATION_ALERT |
| 238 | + ) |
| 239 | + already_alerted = False |
| 240 | + |
| 241 | + if allocation_alerting_value is None: |
| 242 | + # Allocation alerting value attribute is not present on this allocation. |
| 243 | + utils.set_attribute_on_allocation( |
| 244 | + allocation, attributes.ALLOCATION_ALERT, 0 |
| 245 | + ) |
| 246 | + return |
| 247 | + |
| 248 | + if allocation_alerting_value <= 0: |
| 249 | + # 0 is the default and does not send any alerts. |
| 250 | + return |
| 251 | + |
| 252 | + if previous_total and previous_total.total > allocation_alerting_value: |
| 253 | + if usage_models.is_date_same_month(previous_total.date, new_total.date): |
| 254 | + # Alerting value has already been exceeded, do not alert again. |
| 255 | + already_alerted = True |
| 256 | + |
| 257 | + if new_total.total > allocation_alerting_value: |
| 258 | + logger.info( |
| 259 | + f"{allocation.id} of {allocation.project.title} exceeded" |
| 260 | + f"alerting value of {allocation_alerting_value}." |
| 261 | + ) |
| 262 | + if not already_alerted and EMAIL_ENABLED: |
| 263 | + try: |
| 264 | + cls.send_alert_email( |
| 265 | + allocation, |
| 266 | + allocation.resources.first().name, |
| 267 | + allocation_alerting_value, |
| 268 | + ) |
| 269 | + logger.info( |
| 270 | + f"Sent alert email to PI of {allocation.id} of {allocation.project.title}" |
| 271 | + f"for exceeding alert value." |
| 272 | + ) |
| 273 | + except Exception as e: |
| 274 | + logger.error( |
| 275 | + f"Unable to send alert email to PI of {allocation.id} of {allocation.project.title}: {e}" |
| 276 | + ) |
| 277 | + |
| 278 | + @staticmethod |
| 279 | + def send_alert_email(allocation: Allocation, resource: Resource, alert_value): |
| 280 | + mail.send_mail( |
| 281 | + subject="Allocation Usage Alert", |
| 282 | + message=EMAIL_TEMPLATE.format( |
| 283 | + allocation=allocation, |
| 284 | + resource=resource, |
| 285 | + alert_value=alert_value, |
| 286 | + url=CENTER_BASE_URL, |
| 287 | + ), |
| 288 | + from_email=EMAIL_SENDER, |
| 289 | + recipient_list=[allocation.project.pi.email], |
| 290 | + ) |
0 commit comments