-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathcoldfront_fetch_processor.py
More file actions
185 lines (161 loc) · 7.16 KB
/
coldfront_fetch_processor.py
File metadata and controls
185 lines (161 loc) · 7.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import os
import sys
import functools
import logging
import json
from dataclasses import dataclass, field
import requests
import pandas
from process_report.loader import loader
from process_report.settings import invoice_settings
from process_report.invoices import invoice
from process_report.processors import (
processor,
validate_billable_pi_processor,
validate_cluster_name_processor,
)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
CF_ATTR_ALLOCATED_PROJECT_NAME = "Allocated Project Name"
CF_ATTR_ALLOCATED_PROJECT_ID = "Allocated Project ID"
CF_ATTR_INSTITUTION_SPECIFIC_CODE = "Institution-Specific Code"
CF_ATTR_IS_COURSE = "Is Course?"
SUPPLEMENTAL_PROJECT_ID = "Project - Allocation Name"
SUPPLEMENTAL_PROJECT_NAME = "Project - Title"
SUPPLEMENTAL_PI = "Manager (PI)"
SUPPLEMENTAL_CLUSTER_NAME = "Cluster Name"
@dataclass
class ColdfrontFetchProcessor(processor.Processor):
nonbillable_projects: pandas.DataFrame = field(
default_factory=loader.get_nonbillable_projects
)
coldfront_data_filepath: str = invoice_settings.coldfront_api_filepath
supplement_api_data: pandas.DataFrame = field(
default_factory=lambda: loader.get_supplement_api_data()
)
@functools.cached_property
def coldfront_client(self):
keycloak_url = os.environ.get("KEYCLOAK_URL", "https://keycloak.mss.mghpcc.org")
# Authenticate with Keycloak
token_url = f"{keycloak_url}/auth/realms/mss/protocol/openid-connect/token"
r = requests.post(
token_url,
data={"grant_type": "client_credentials"},
auth=requests.auth.HTTPBasicAuth(
os.environ["KEYCLOAK_CLIENT_ID"],
os.environ["KEYCLOAK_CLIENT_SECRET"],
),
)
try:
r.raise_for_status()
except requests.HTTPError:
sys.exit(f"Keycloak authentication failed:\n{r.status_code} {r.text}")
client_token = r.json()["access_token"]
session = requests.session()
headers = {
"Authorization": f"Bearer {client_token}",
"Content-Type": "application/json",
}
session.headers.update(headers)
return session
def _get_billable_projects_clusters(self) -> set[str]:
"""Returns set of billable project and cluster name tuples."""
project_mask = validate_billable_pi_processor.find_billable_projects(
self.data, self.nonbillable_projects
)
return set(
self.data[project_mask][
[invoice.PROJECT_FIELD, invoice.CLUSTER_NAME_FIELD]
].itertuples(index=False, name=None)
)
def _fetch_coldfront_allocation_api(self):
coldfront_api_url = os.environ.get(
"COLDFRONT_URL", "https://coldfront.mss.mghpcc.org/api/allocations"
)
r = self.coldfront_client.get(f"{coldfront_api_url}?all=true")
return r.json()
def _get_coldfront_api_data(self):
if self.coldfront_data_filepath:
logger.info(
f"Using Coldfront data from {self.coldfront_data_filepath} instead of API"
)
with open(self.coldfront_data_filepath, "r") as f:
return json.load(f)
else:
return self._fetch_coldfront_allocation_api()
def _get_allocation_data(self, coldfront_api_data):
"""Returns a mapping of (project ID, cluster name) tupels to a dict of project name, PI name, and institution code."""
allocation_data = {}
for project_dict in coldfront_api_data:
try:
# Allow allocation to not have institute code
project_id = project_dict["attributes"][CF_ATTR_ALLOCATED_PROJECT_ID]
project_name = project_dict["attributes"][
CF_ATTR_ALLOCATED_PROJECT_NAME
]
pi_name = project_dict["project"]["pi"]
institute_code = project_dict["attributes"].get(
CF_ATTR_INSTITUTION_SPECIFIC_CODE, "N/A"
)
cluster_name = project_dict["resource"]["name"]
cluster_name = validate_cluster_name_processor.ValidateClusterNameProcessor.CLUSTER_NAME_MAP.get(
cluster_name, cluster_name
)
is_course = (
project_dict["attributes"].get(CF_ATTR_IS_COURSE, "No").lower()
== "yes"
)
allocation_data[(project_id, cluster_name)] = {
invoice.PROJECT_FIELD: project_name,
invoice.PI_FIELD: pi_name,
invoice.INSTITUTION_ID_FIELD: institute_code,
invoice.CLUSTER_NAME_FIELD: cluster_name,
invoice.IS_COURSE_FIELD: is_course,
}
except KeyError:
continue
for _, row in self.supplement_api_data.iterrows():
project_id = row[SUPPLEMENTAL_PROJECT_ID]
project_name = row[SUPPLEMENTAL_PROJECT_NAME]
pi_name = row[SUPPLEMENTAL_PI]
cluster_name = row[SUPPLEMENTAL_CLUSTER_NAME]
allocation_data[(project_id, cluster_name)] = {
invoice.PROJECT_FIELD: project_name,
invoice.PI_FIELD: pi_name,
invoice.INSTITUTION_ID_FIELD: "N/A",
invoice.CLUSTER_NAME_FIELD: cluster_name,
invoice.IS_COURSE_FIELD: False, # (TODO) Quan Assuming supplemental data does not contain course info?
}
return allocation_data
def _validate_allocation_data(self, allocation_data):
allocation_project_names = {
(data[invoice.PROJECT_FIELD], data[invoice.CLUSTER_NAME_FIELD])
for data in allocation_data.values()
}
missing_projects = (
set(self._get_billable_projects_clusters()) - allocation_project_names
)
missing_projects = list(missing_projects)
missing_projects.sort() # Ensures order for testing purposes
if missing_projects:
raise ValueError(
f"Projects {missing_projects} not found in Coldfront and are billable! Please check the project names"
)
def _apply_allocation_data(self, allocation_data):
self.data[invoice.IS_COURSE_FIELD] = False
for project_cluster_tuple, data in allocation_data.items():
project_id, cluster_name = project_cluster_tuple
mask = (self.data[invoice.PROJECT_ID_FIELD] == project_id) & (
self.data[invoice.CLUSTER_NAME_FIELD] == cluster_name
)
self.data.loc[mask, invoice.PROJECT_FIELD] = data[invoice.PROJECT_FIELD]
self.data.loc[mask, invoice.PI_FIELD] = data[invoice.PI_FIELD]
self.data.loc[mask, invoice.INSTITUTION_ID_FIELD] = data[
invoice.INSTITUTION_ID_FIELD
]
self.data.loc[mask, invoice.IS_COURSE_FIELD] = data[invoice.IS_COURSE_FIELD]
def _process(self):
api_data = self._get_coldfront_api_data()
allocation_data = self._get_allocation_data(api_data)
self._apply_allocation_data(allocation_data)
self._validate_allocation_data(allocation_data)