-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Labels
enhancementNew feature or requestNew feature or request
Description
CLI Script
cwms-cli
Feature Proposal
use of API keys in intended for m2m communication
Below if example python of how a python script can prompt the user (though it can also open the browser) and handle the directory to establish the appropriate credentials (access and refresh token).
NOTE: I suspect cwms-python will need to also be updated to except other than just the api key.
#import cwms.locations.physical_locations
#import cwms.timeseries.timerseries_identifier
import requests,http.server, socketserver,pkce,urllib, json, base64
"""
as-is requires:
pip install pkce cwms-python
replace cwms_python with appropriate test URL for given application, or just remove to print only the tokens.
Without additional CWMS user config you will get 403 or 401 errors
"""
import cwms.timeseries
import cwms.locations
code_verifier, code_challenge = pkce.generate_pkce_pair()
done = False
session_state = None
code = None
CLIENT_ID="cwms"
CDA_URL="https://water.dev.cwbi.us/cwms-data"
OIDC_URL="https://identity-test.cwbi.us/auth/realms/cwbi/protocol/openid-connect"
OIDC_URL_CAC="https://identityc-test.cwbi.us/auth/realms/cwbi/protocol/openid-connect"
REDIRECT_PORT=5000
REDIRECT_URL=f"http://localhost:{REDIRECT_PORT}"
KC_IDP_HINT="&login.gov" # or &federation-eams
TRUST = None # "/home/mike/projects/cwms-data-api/compose_files/pki/certs/rootca.pem"
class Server(socketserver.TCPServer):
# Avoid "address already used" error when frequently restarting the script
allow_reuse_address = True
class SingleRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
global done,session_state,code
parsed_path = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed_path.query)
print(repr(query_params))
session_state = query_params["session_state"][0]
code = query_params["code"][0]
self.send_error(204)
if __name__ == "__main__":
print(f"Go to: {OIDC_URL}/auth?client_id={CLIENT_ID}&response_type=code&scope=openid%20profile&redirect_uri={REDIRECT_URL}&code_challenge_method=S256&code_challenge={code_challenge}{KC_IDP_HINT}")
with Server(("127.0.0.1",REDIRECT_PORT), SingleRequestHandler) as httpd:
httpd.handle_request()
print("State: " + repr(session_state))
print("Code: " + repr(code))
r = requests.post(f"{OIDC_URL}/token", {
"client_id": CLIENT_ID,
"grant_type": "authorization_code",
"code_verifier": code_verifier,
"scope": "openid profile",
"code": code,
"response_type": "id_token token",
"response_mode": "fragment",
"session_state": session_state,
"redirect_uri": REDIRECT_URL
}, verify=TRUST)
print(r.request.body)
print("============================1")
print(r.cookies)
print(r.status_code)
print(r.headers)
data=json.loads(r.text)
print(json.dumps(data,indent=2))
access_token = data["access_token"]
id_token = data["id_token"]
access_claims_str = access_token.split(".")[1]
access_claims = json.loads(base64.urlsafe_b64decode(access_claims_str+"=="))
print("----------- ACCESS claims --------------")
print(json.dumps(access_claims, indent=2))
id_claims_str = id_token.split(".")[1]
id_claims = json.loads(base64.urlsafe_b64decode(id_claims_str+"=="))
print("------------- ID claims -------------")
print(json.dumps(id_claims,indent=2))
refresh = data["refresh_token"]
r = requests.post(f"{OIDC_URL}/token", {
"grant_type":"refresh_token",
"client_id": "cwms",
"refresh_token": refresh
}, verify=TRUST)
print(r.text)
r = requests.get(f"{CDA_URL}/auth/keys", headers= {"Authorization": "Bearer " + access_token}, verify=TRUST)
print(r.text)
REDACTED relevant log output
Related Context or Links
No response
krowvin
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request