|
1 | | -import adal |
2 | 1 | import pytest |
| 2 | +import os |
| 3 | +import json |
| 4 | +import requests |
| 5 | +from azure.identity import ClientAssertionCredential, ClientSecretCredential, ManagedIdentityCredential, CertificateCredential |
3 | 6 |
|
4 | | -from msrestazure.azure_active_directory import AADTokenCredentials |
| 7 | +# Function that returns aad token credentials for a given spn |
| 8 | +# Default behavior is to use managed identity, if use_cert_auth is set we attempt to use a certificate, if use_SPN_auth is set we fall back to SPN client secret auth |
| 9 | +def get_fed_token(): |
| 10 | + system_accesstoken = os.getenv('SYSTEM_ACCESSTOKEN') |
| 11 | + service_connection_id = os.getenv('SERVICE_CONNECTION_ID') |
| 12 | + system_oidc_request_uri = os.getenv('SYSTEM_OIDCREQUESTURI') |
5 | 13 |
|
| 14 | + if system_accesstoken and service_connection_id and system_oidc_request_uri: |
| 15 | + # Construct the OIDC_REQUEST_URL |
| 16 | + oidc_request_url = f"{system_oidc_request_uri}?api-version=7.1&serviceConnectionId={service_connection_id}" |
| 17 | + # Preparing headers for ADO Pipeline OIDC authentication |
| 18 | + headers = { |
| 19 | + "Content-Length": "0", |
| 20 | + "Content-Type": "application/json", |
| 21 | + "Authorization": f"Bearer {system_accesstoken}" |
| 22 | + } |
6 | 23 |
|
7 | | -# Function to fetch aad token from spn id and password |
8 | | -def fetch_aad_token(client_id, client_secret, authority_uri, resource_uri): |
9 | | - """ |
10 | | - Authenticate using service principal w/ key. |
11 | | - """ |
12 | | - try: |
13 | | - context = adal.AuthenticationContext(authority_uri, api_version=None) |
14 | | - return context.acquire_token_with_client_credentials(resource_uri, client_id, client_secret) |
15 | | - except Exception as e: |
16 | | - pytest.fail("Error occured while fetching aad token: " + str(e)) |
| 24 | + # Make the POST request |
| 25 | + response = requests.post(oidc_request_url, headers=headers) |
17 | 26 |
|
| 27 | + # Check the response and extract the OIDC token |
| 28 | + if response.status_code == 200: |
| 29 | + # Assuming the response is JSON and has an 'oidcToken' field |
| 30 | + arm_oidc_token = response.json().get('oidcToken') |
| 31 | + print("Return Fed token") |
| 32 | + return arm_oidc_token |
| 33 | + else: |
| 34 | + print("Failed to retrieve FED Token:", response.status_code, response.text) |
18 | 35 |
|
19 | | -# Function that returns aad token credentials for a given spn |
20 | | -def fetch_aad_token_credentials(client_id, client_secret, authority_uri, resource_uri): |
21 | | - mgmt_token = fetch_aad_token(client_id, client_secret, authority_uri, resource_uri) |
| 36 | + else: |
| 37 | + print(""" |
| 38 | + One or more variables (SYSTEM_ACCESSTOKEN, |
| 39 | + SERVICE_CONNECTION_ID, |
| 40 | + SYSTEM_OIDCREQUESTURI) are either not set or empty. |
| 41 | + """) |
| 42 | + |
| 43 | +def fetch_aad_token_credentials(tenant_id, client_id, client_secret, authority, use_cert_auth = False, use_SPN_auth = False, use_FIC_auth = False): |
22 | 44 | try: |
23 | | - return AADTokenCredentials(mgmt_token, client_id) |
| 45 | + if use_FIC_auth: |
| 46 | + return ClientAssertionCredential(tenant_id=tenant_id, client_id=client_id, func=get_fed_token, authority=authority) |
| 47 | + if use_SPN_auth: |
| 48 | + return ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret, authority=authority) |
| 49 | + if use_cert_auth: |
| 50 | + import base64 |
| 51 | + cert_bytes = base64.b64decode(client_secret) |
| 52 | + return CertificateCredential(tenant_id=tenant_id, client_id=client_id, certificate_data=cert_bytes, send_certificate_chain=True) |
| 53 | + else: |
| 54 | + return ManagedIdentityCredential(client_id=client_id) |
24 | 55 | except Exception as e: |
25 | 56 | pytest.fail("Error occured while fetching credentials: " + str(e)) |
0 commit comments