|
3 | 3 | # Licensed under the MIT License. See License.txt in the project root for license information. |
4 | 4 | # -------------------------------------------------------------------------------------------- |
5 | 5 |
|
6 | | -import requests |
7 | 6 | from knack.log import get_logger |
8 | | -from knack.util import CLIError |
9 | | - |
10 | | -from .util import resource_to_scopes |
11 | 7 |
|
12 | 8 | logger = get_logger(__name__) |
13 | 9 |
|
14 | 10 |
|
15 | 11 | class CredentialAdaptor: |
16 | | - def __init__(self, credential, resource=None, auxiliary_credentials=None): |
17 | | - """ |
18 | | - Adaptor to both |
19 | | - - Track 1: msrest.authentication.Authentication, which exposes signed_session |
20 | | - - Track 2: azure.core.credentials.TokenCredential, which exposes get_token |
| 12 | + def __init__(self, credential, auxiliary_credentials=None): |
| 13 | + """Cross-tenant credential adaptor. It takes a main credential and auxiliary credentials. |
| 14 | +
|
| 15 | + It implements Track 2 SDK's azure.core.credentials.TokenCredential by exposing get_token. |
21 | 16 |
|
22 | 17 | :param credential: Main credential from .msal_authentication |
23 | | - :param resource: AAD resource for Track 1 only |
24 | 18 | :param auxiliary_credentials: Credentials from .msal_authentication for cross tenant authentication. |
25 | 19 | Details about cross tenant authentication: |
26 | 20 | https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/authenticate-multi-tenant |
27 | 21 | """ |
28 | 22 |
|
29 | 23 | self._credential = credential |
30 | 24 | self._auxiliary_credentials = auxiliary_credentials |
31 | | - self._resource = resource |
32 | | - |
33 | | - def _get_token(self, scopes=None, **kwargs): |
34 | | - external_tenant_tokens = [] |
35 | | - # If scopes is not provided, use CLI-managed resource |
36 | | - scopes = scopes or resource_to_scopes(self._resource) |
37 | | - try: |
38 | | - token = self._credential.get_token(*scopes, **kwargs) |
39 | | - if self._auxiliary_credentials: |
40 | | - external_tenant_tokens = [cred.get_token(*scopes) for cred in self._auxiliary_credentials] |
41 | | - return token, external_tenant_tokens |
42 | | - except requests.exceptions.SSLError as err: |
43 | | - from azure.cli.core.util import SSLERROR_TEMPLATE |
44 | | - raise CLIError(SSLERROR_TEMPLATE.format(str(err))) |
45 | | - |
46 | | - def signed_session(self, session=None): |
47 | | - logger.debug("CredentialAdaptor.signed_session") |
48 | | - session = session or requests.Session() |
49 | | - token, external_tenant_tokens = self._get_token() |
50 | | - header = "{} {}".format('Bearer', token.token) |
51 | | - session.headers['Authorization'] = header |
52 | | - if external_tenant_tokens: |
53 | | - aux_tokens = ';'.join(['{} {}'.format('Bearer', tokens2.token) for tokens2 in external_tenant_tokens]) |
54 | | - session.headers['x-ms-authorization-auxiliary'] = aux_tokens |
55 | | - return session |
56 | 25 |
|
57 | 26 | def get_token(self, *scopes, **kwargs): |
| 27 | + """Get an access token from the main credential.""" |
58 | 28 | logger.debug("CredentialAdaptor.get_token: scopes=%r, kwargs=%r", scopes, kwargs) |
59 | 29 |
|
60 | 30 | # Discard unsupported kwargs: tenant_id, enable_cae |
61 | 31 | filtered_kwargs = {} |
62 | 32 | if 'data' in kwargs: |
63 | 33 | filtered_kwargs['data'] = kwargs['data'] |
64 | 34 |
|
65 | | - token, _ = self._get_token(scopes, **filtered_kwargs) |
66 | | - return token |
| 35 | + return self._credential.get_token(*scopes, **filtered_kwargs) |
67 | 36 |
|
68 | 37 | def get_auxiliary_tokens(self, *scopes, **kwargs): |
| 38 | + """Get access tokens from auxiliary credentials.""" |
69 | 39 | # To test cross-tenant authentication, see https://github.com/Azure/azure-cli/issues/16691 |
70 | 40 | if self._auxiliary_credentials: |
71 | 41 | return [cred.get_token(*scopes, **kwargs) for cred in self._auxiliary_credentials] |
|
0 commit comments