11# pylint: disable=E1131,C0103
22
3+ import copy
34import logging
45import os
56from dataclasses import dataclass
1011
1112from understack_workflows .bmc_password_standard import standard_password
1213from understack_workflows .helpers import credential
14+ from understack_workflows .helpers import setup_logger
1315
1416urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning ) # type: ignore
1517logging .getLogger ("urllib3" ).setLevel (logging .WARNING )
1618
19+ logger = setup_logger (__name__ )
20+
1721HEADERS = {
1822 "Accept" : "application/json" ,
1923 "Content-Type" : "application/json; charset=utf-8" ,
@@ -24,6 +28,14 @@ class RedfishRequestError(Exception):
2428 """Handle Exceptions from Redfish handler."""
2529
2630
31+ class AuthException (Exception ):
32+ """Authentication Exception."""
33+
34+
35+ class AccountServiceException (Exception ):
36+ """AccountService Query Exception."""
37+
38+
2739@dataclass
2840class Bmc :
2941 """Represent DRAC/iLo and know how to perform low-level query on it."""
@@ -35,9 +47,16 @@ def __init__(
3547 self .ip_address = ip_address
3648 self .username = username
3749 self .password = password if password else ""
50+ self ._base_path : str | None = None
3851 self ._system_path : str | None = None
3952 self ._manager_path : str | None = None
4053
54+ @property
55+ def base_path (self ) -> str :
56+ """Read System path from BMC."""
57+ self ._system_path = self ._base_path or self .get_base_path ()
58+ return self ._system_path
59+
4160 @property
4261 def system_path (self ) -> str :
4362 """Read System path from BMC."""
@@ -58,6 +77,11 @@ def url(self):
5877 """Return base redfish URL."""
5978 return f"https://{ self .ip_address } "
6079
80+ def get_base_path (self ):
81+ """Get Base Path."""
82+ _result = "/redfish/v1/"
83+ return _result
84+
6185 def get_system_path (self ):
6286 """Get System Path."""
6387 _result = self .redfish_request ("/redfish/v1/Systems/" )
@@ -68,24 +92,143 @@ def get_manager_path(self):
6892 _result = self .redfish_request ("/redfish/v1/Managers/" )
6993 return _result ["Members" ][0 ]["@odata.id" ].rstrip ("/" )
7094
95+ def get_user_accounts (self , token : str | None = None ) -> list [dict ]:
96+ """A vendor agnostic approach to crawling the API for BMC accounts."""
97+ try :
98+ # get account service
99+ r = (
100+ self .redfish_request (path = self .base_path , token = token )
101+ if token
102+ else self .redfish_request (path = self .base_path )
103+ )
104+ account_service_uri = r ["AccountService" ]["@odata.id" ]
105+ logger .debug ("account_service_url: %s" , account_service_uri )
106+
107+ # get account collection uri
108+ r = (
109+ self .redfish_request (path = account_service_uri , token = token )
110+ if token
111+ else self .redfish_request (path = account_service_uri )
112+ )
113+ accounts_uri = r ["Accounts" ]["@odata.id" ]
114+ logger .debug ("accounts_url: %s" , accounts_uri )
115+
116+ # get accounts
117+ r = (
118+ self .redfish_request (path = accounts_uri , token = token )
119+ if token
120+ else self .redfish_request (path = accounts_uri )
121+ )
122+ accounts = r ["Members" ]
123+ logger .debug ("accounts: %s" , accounts )
124+
125+ return accounts
126+ except AccountServiceException :
127+ logger .exception ("Can't fetch accounts from Redfish account service." )
128+ raise
129+
130+ def set_bmc_creds (self , password : str , token : str | None = None ):
131+ """Change password for the account associated with the bmc."""
132+ accounts = self .get_user_accounts (token )
133+ matched_account = None
134+ for account in accounts :
135+ account_url = account ["@odata.id" ]
136+ if token :
137+ a = self .redfish_request (path = account_url , token = token )
138+ else :
139+ a = self .redfish_request (path = account_url )
140+ if self .username == a ["UserName" ]:
141+ logger .debug ("found account: %s" , a )
142+ matched_account = a
143+ break
144+ if not matched_account :
145+ raise AuthException (f"Unable to find BMC account for { self .username } " )
146+ account_uri = matched_account ["@odata.id" ]
147+ _payload = {"Password" : password }
148+ if token :
149+ self .redfish_request (
150+ method = "PATCH" , path = account_uri , token = token , payload = _payload
151+ )
152+ else :
153+ self .redfish_request (method = "PATCH" , path = account_uri , payload = _payload )
154+
155+ def get_session (self , password : str ) -> tuple [str | None , str | None ]:
156+ """Request a new session."""
157+ _payload = {"UserName" : self .username , "Password" : password }
158+ token , session = self .session_request (
159+ method = "POST" ,
160+ path = "/redfish/v1/SessionService/Sessions" ,
161+ payload = _payload ,
162+ )
163+ if token and session :
164+ return token , session
165+ else :
166+ return None , None
167+
168+ def close_session (self , session : str , token : str | None = None ) -> None :
169+ """Close BMC token session."""
170+ if token :
171+ self .redfish_request (method = "DELETE" , path = session , token = token )
172+ else :
173+ self .redfish_request (method = "DELETE" , path = session )
174+
175+ def session_request (
176+ self ,
177+ path : str ,
178+ method : str = "POST" ,
179+ payload : dict | None = None ,
180+ verify : bool = False ,
181+ timeout : int = 30 ,
182+ ) -> tuple [str | None , str | None ]:
183+ """Request a session via Redfish against the Bmc."""
184+ _headers = copy .copy (HEADERS )
185+ url = f"{ self .url ()} { path } "
186+ r = requests .request (
187+ method ,
188+ url ,
189+ verify = verify ,
190+ timeout = timeout ,
191+ json = payload ,
192+ headers = _headers ,
193+ )
194+ if r .status_code >= 400 :
195+ raise RedfishRequestError (
196+ f"BMC communications failure HTTP { r .status_code } "
197+ + f"{ r .reason } from { url } - { r .text } "
198+ )
199+ if r .text :
200+ token = r .headers ["X-Auth-Token" ]
201+ if "Location" in r .headers :
202+ location = r .headers ["Location" ].split (self .ip_address )[1 ]
203+ else :
204+ location = r .json ()["@odata.id" ]
205+
206+ return (token , location )
207+ else :
208+ return (None , None )
209+
71210 def redfish_request (
72211 self ,
73212 path : str ,
74213 method : str = "GET" ,
75214 payload : dict | None = None ,
215+ token : str | None = None ,
76216 verify : bool = False ,
77217 timeout : int = 30 ,
78218 ) -> dict :
79219 """Request a path via Redfish against the Bmc."""
220+ _headers = copy .copy (HEADERS )
221+ if token :
222+ _headers .update ({"X-Auth-Token" : token })
80223 url = f"{ self .url ()} { path } "
81224 r = requests .request (
82225 method ,
83226 url ,
84- auth = (self .username , self .password ),
227+ auth = None if token else (self .username , self .password ),
85228 verify = verify ,
86229 timeout = timeout ,
87230 json = payload ,
88- headers = HEADERS ,
231+ headers = _headers ,
89232 )
90233 if r .status_code >= 400 :
91234 raise RedfishRequestError (
0 commit comments