|
1 | | -import getpass, json, logging, sys, msal |
| 1 | +import base64, getpass, json, logging, sys, msal |
2 | 2 |
|
3 | 3 |
|
4 | 4 | AZURE_CLI = "04b07795-8ddb-461a-bbee-02f9e1bf7b46" |
5 | 5 | VISUAL_STUDIO = "04f0c124-f2bc-4f59-8241-bf6df9866bbd" |
6 | 6 |
|
7 | 7 | def print_json(blob): |
8 | | - print(json.dumps(blob, indent=2)) |
| 8 | + print(json.dumps(blob, indent=2, sort_keys=True)) |
9 | 9 |
|
10 | 10 | def _input_boolean(message): |
11 | 11 | return input( |
@@ -134,6 +134,24 @@ def acquire_ssh_cert_interactive(app): |
134 | 134 | if result.get("token_type") != "ssh-cert": |
135 | 135 | logging.error("Unable to acquire an ssh-cert") |
136 | 136 |
|
| 137 | +POP_KEY_ID = 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA-AAAAAAAA' # Fake key with a certain format and length |
| 138 | +RAW_REQ_CNF = json.dumps({"kid": POP_KEY_ID, "xms_ksl": "sw"}) |
| 139 | +POP_DATA = { # Sampled from Azure CLI's plugin connectedk8s |
| 140 | + 'token_type': 'pop', |
| 141 | + 'key_id': POP_KEY_ID, |
| 142 | + "req_cnf": base64.urlsafe_b64encode(RAW_REQ_CNF.encode('utf-8')).decode('utf-8').rstrip('='), |
| 143 | + # Note: Sending RAW_REQ_CNF without base64 encoding would result in an http 500 error |
| 144 | +} # See also https://github.com/Azure/azure-cli-extensions/blob/main/src/connectedk8s/azext_connectedk8s/_clientproxyutils.py#L86-L92 |
| 145 | + |
| 146 | +def acquire_pop_token_interactive(app): |
| 147 | + """Acquire a POP token interactively - This typically only works with Azure CLI""" |
| 148 | + POP_SCOPE = ['6256c85f-0aad-4d50-b960-e6e9b21efe35/.default'] # KAP 1P Server App Scope, obtained from https://github.com/Azure/azure-cli-extensions/pull/4468/files#diff-a47efa3186c7eb4f1176e07d0b858ead0bf4a58bfd51e448ee3607a5b4ef47f6R116 |
| 149 | + result = _acquire_token_interactive(app, POP_SCOPE, data=POP_DATA) |
| 150 | + print_json(result) |
| 151 | + if result.get("token_type") != "pop": |
| 152 | + logging.error("Unable to acquire a pop token") |
| 153 | + |
| 154 | + |
137 | 155 | def remove_account(app): |
138 | 156 | """remove_account() - Invalidate account and/or token(s) from cache, so that acquire_token_silent() would be reset""" |
139 | 157 | account = _select_account(app) |
@@ -188,6 +206,7 @@ def main(): |
188 | 206 | acquire_token_by_username_password, |
189 | 207 | acquire_ssh_cert_silently, |
190 | 208 | acquire_ssh_cert_interactive, |
| 209 | + acquire_pop_token_interactive, |
191 | 210 | remove_account, |
192 | 211 | exit, |
193 | 212 | ], option_renderer=lambda f: f.__doc__, header="MSAL Python APIs:") |
|
0 commit comments