11# pylint: disable=E1131,C0103
22
3+ import copy
34import logging
45import os
56from dataclasses import dataclass
1011
1112from understack_workflows .bmc_password_standard import standard_password
1213from understack_workflows .helpers import credential
14+ from understack_workflows .helpers import setup_logger
1315
1416urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning ) # type: ignore
1517logging .getLogger ("urllib3" ).setLevel (logging .WARNING )
1618
19+ logger = setup_logger (__name__ )
20+
1721HEADERS = {
1822 "Accept" : "application/json" ,
1923 "Content-Type" : "application/json; charset=utf-8" ,
@@ -24,6 +28,10 @@ class RedfishRequestError(Exception):
2428 """Handle Exceptions from Redfish handler."""
2529
2630
31+ class AuthException (Exception ):
32+ """Authentication Exception."""
33+
34+
2735@dataclass
2836class Bmc :
2937 """Represent DRAC/iLo and know how to perform low-level query on it."""
@@ -35,9 +43,16 @@ def __init__(
3543 self .ip_address = ip_address
3644 self .username = username
3745 self .password = password if password else ""
46+ self ._base_path : str | None = None
3847 self ._system_path : str | None = None
3948 self ._manager_path : str | None = None
4049
50+ @property
51+ def base_path (self ) -> str :
52+ """Read System path from BMC."""
53+ self ._system_path = self ._base_path or self .get_base_path ()
54+ return self ._system_path
55+
4156 @property
4257 def system_path (self ) -> str :
4358 """Read System path from BMC."""
@@ -58,6 +73,11 @@ def url(self):
5873 """Return base redfish URL."""
5974 return f"https://{ self .ip_address } "
6075
76+ def get_base_path (self ):
77+ """Get Base Path."""
78+ _result = "/redfish/v1"
79+ return _result
80+
6181 def get_system_path (self ):
6282 """Get System Path."""
6383 _result = self .redfish_request ("/redfish/v1/Systems/" )
@@ -68,24 +88,106 @@ def get_manager_path(self):
6888 _result = self .redfish_request ("/redfish/v1/Managers/" )
6989 return _result ["Members" ][0 ]["@odata.id" ].rstrip ("/" )
7090
91+ def get_user_accounts (self , token : str | None = None ) -> list [dict ]:
92+ """A vendor agnostic approach to crawling the API for BMC accounts."""
93+ path = self .base_path
94+ path = self .redfish_request (path , token = token )["AccountService" ]["@odata.id" ]
95+ path = self .redfish_request (path , token = token )["Accounts" ]["@odata.id" ]
96+ return self .redfish_request (path , token = token )["Members" ]
97+
98+ def set_bmc_creds (self , password : str , token : str | None = None ):
99+ """Change password for the account associated with the bmc."""
100+ accounts = self .get_user_accounts (token )
101+ matched_account = None
102+ for account in accounts :
103+ account_url = account ["@odata.id" ]
104+ a = self .redfish_request (path = account_url , token = token )
105+ if self .username == a ["UserName" ]:
106+ logger .debug ("found account: %s" , a )
107+ matched_account = a
108+ break
109+ if not matched_account :
110+ raise AuthException (f"Unable to find BMC account for { self .username } " )
111+ account_uri = matched_account ["@odata.id" ]
112+ _payload = {"Password" : password }
113+ self .redfish_request (
114+ method = "PATCH" , path = account_uri , token = token , payload = _payload
115+ )
116+
117+ def get_session (self , password : str ) -> tuple [str , str ] | tuple [None , None ]:
118+ """Request a new session."""
119+ _payload = {"UserName" : self .username , "Password" : password }
120+ token , session = self .session_request (
121+ method = "POST" ,
122+ path = "/redfish/v1/SessionService/Sessions" ,
123+ payload = _payload ,
124+ )
125+ if token and session :
126+ return token , session
127+ else :
128+ return None , None
129+
130+ def close_session (self , session : str , token : str | None = None ) -> None :
131+ """Close BMC token session."""
132+ self .redfish_request (method = "DELETE" , path = session , token = token )
133+
134+ def session_request (
135+ self ,
136+ path : str ,
137+ method : str = "POST" ,
138+ payload : dict | None = None ,
139+ verify : bool = False ,
140+ timeout : int = 30 ,
141+ ) -> tuple [str , str ] | tuple [None , None ]:
142+ """Request a session via Redfish against the Bmc."""
143+ _headers = copy .copy (HEADERS )
144+ url = f"{ self .url ()} { path } "
145+ r = requests .request (
146+ method ,
147+ url ,
148+ verify = verify ,
149+ timeout = timeout ,
150+ json = payload ,
151+ headers = _headers ,
152+ )
153+ if r .status_code >= 400 :
154+ raise RedfishRequestError (
155+ f"BMC communications failure HTTP { r .status_code } "
156+ + f"{ r .reason } from { url } - { r .text } "
157+ )
158+ if r .text :
159+ token = r .headers ["X-Auth-Token" ]
160+ if "Location" in r .headers :
161+ location = r .headers ["Location" ].split (self .ip_address )[1 ]
162+ else :
163+ location = r .json ()["@odata.id" ]
164+
165+ return (token , location )
166+ else :
167+ return (None , None )
168+
71169 def redfish_request (
72170 self ,
73171 path : str ,
74172 method : str = "GET" ,
75173 payload : dict | None = None ,
174+ token : str | None = None ,
76175 verify : bool = False ,
77176 timeout : int = 30 ,
78177 ) -> dict :
79178 """Request a path via Redfish against the Bmc."""
179+ _headers = copy .copy (HEADERS )
180+ if token :
181+ _headers .update ({"X-Auth-Token" : token })
80182 url = f"{ self .url ()} { path } "
81183 r = requests .request (
82184 method ,
83185 url ,
84- auth = (self .username , self .password ),
186+ auth = None if token else (self .username , self .password ),
85187 verify = verify ,
86188 timeout = timeout ,
87189 json = payload ,
88- headers = HEADERS ,
190+ headers = _headers ,
89191 )
90192 if r .status_code >= 400 :
91193 raise RedfishRequestError (
0 commit comments