|
| 1 | +import logging |
| 2 | +import os |
| 3 | +import json |
| 4 | + |
| 5 | +import requests |
| 6 | + |
| 7 | +import msal |
| 8 | +from tests import unittest |
| 9 | + |
| 10 | + |
| 11 | +logger = logging.getLogger(__name__) |
| 12 | +logging.basicConfig(level=logging.DEBUG) |
| 13 | + |
| 14 | +def get_lab_user(query): |
| 15 | + # Based on https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/LAB.aspx |
| 16 | + user = requests.get("https://api.msidlab.com/api/user", params=query).json() |
| 17 | + return { # Mapping lab API response to our expected configuration format |
| 18 | + "authority": user["Authority"][0] + user["Users"]["tenantId"], |
| 19 | + "client_id": user["AppID"], |
| 20 | + "username": user["Users"]["upn"], |
| 21 | + "password": "TBD", # TODO |
| 22 | + "scope": ["https://graph.microsoft.com/.default"], |
| 23 | + } |
| 24 | + |
| 25 | +def get_lab_app( |
| 26 | + env_client_id="LAB_APP_CLIENT_ID", |
| 27 | + env_client_secret="LAB_APP_CLIENT_SECRET", |
| 28 | + ): |
| 29 | + """Returns the lab app as an MSAL confidential client. |
| 30 | +
|
| 31 | + Get it from environment variables if defined, otherwise fall back to use MSI. |
| 32 | + """ |
| 33 | + if os.getenv(env_client_id) and os.getenv(env_client_secret): |
| 34 | + # A shortcut mainly for running tests on developer's local development machine |
| 35 | + # or it could be setup on Travis CI |
| 36 | + # https://docs.travis-ci.com/user/environment-variables/#defining-variables-in-repository-settings |
| 37 | + # Data came from here |
| 38 | + # https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Rese.aspx#programmatic-access-info-for-lab-request-api |
| 39 | + logger.info("Using lap app defined by ENV variables %s and %s", |
| 40 | + env_client_id, env_client_secret) |
| 41 | + client_id = os.getenv(env_client_id) |
| 42 | + client_secret = os.getenv(env_client_secret) |
| 43 | + else: |
| 44 | + logger.info("ENV variables %s and/or %s are not defined. Fall back to MSI.", |
| 45 | + env_client_id, env_client_secret) |
| 46 | + # See also https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Programmatically-accessing-LAB-API's.aspx |
| 47 | + raise NotImplementedError("MSI-based mechanism has not been implemented yet") |
| 48 | + return msal.ConfidentialClientApplication(client_id, client_secret, |
| 49 | + authority="https://login.microsoftonline.com/" |
| 50 | + "72f988bf-86f1-41af-91ab-2d7cd011db47", # Microsoft tenant ID |
| 51 | + ) |
| 52 | + |
| 53 | +def get_lab_user_secret(access_token, lab_name="msidlab4"): |
| 54 | + return requests.get( |
| 55 | + # Note: Short link won't work "https://aka.ms/GetLabUserSecret?Secret=%s" |
| 56 | + "https://request.msidlab.com/api/GetLabUserSecret?code=KpY5uCcoKo0aW8VOL/CUO3wnu9UF2XbSnLFGk56BDnmQiwD80MQ7HA==&Secret=%s" |
| 57 | + % lab_name, |
| 58 | + headers={"Authorization": "Bearer %s" % access_token}, |
| 59 | + ).json()["Value"] |
| 60 | + |
| 61 | + |
| 62 | +@unittest.skip("for now") |
| 63 | +class E2eTestCase(unittest.TestCase): |
| 64 | + """ |
| 65 | + lab_token = get_lab_app().acquire_token_for_client( |
| 66 | + "https://request.msidlab.com/.default" |
| 67 | + ) # BTW, this infrastructure tests the confidential client flow |
| 68 | + lab_password = get_lab_user_secret(lab_token["access_token"]) |
| 69 | + """ |
| 70 | + |
| 71 | + def setUp(self): |
| 72 | + pass |
| 73 | + # client_id, client_secret = get_lab_app() |
| 74 | + # self.lab_app = msal.ConfidentialClientApplication(client_id, client_secret) |
| 75 | + |
| 76 | + def test_bar(self): |
| 77 | + self.assertEqual("********", self.lab_password) |
| 78 | + |
| 79 | + |
| 80 | +class BaseMixin(object): |
| 81 | + |
| 82 | + def skipIfNotConfigured(self, fields): |
| 83 | + if not all(map(self.config.get, fields)): |
| 84 | + self.skipTest("Configuration not sufficient") |
| 85 | + for field in fields: |
| 86 | + if not self.config.get(field): |
| 87 | + self.skipTest('"%s" not defined in configuration' % field) |
| 88 | + |
| 89 | + def assertLoosely(self, response, assertion=None, |
| 90 | + skippable_errors=("invalid_grant", "interaction_required")): |
| 91 | + if response.get("error") in skippable_errors: |
| 92 | + logger.debug("Response = %s", response) |
| 93 | + # Some of these errors are configuration issues, not library issues |
| 94 | + raise unittest.SkipTest(response.get("error_description")) |
| 95 | + else: |
| 96 | + if assertion is None: |
| 97 | + assertion = lambda: self.assertIn( |
| 98 | + "access_token", response, |
| 99 | + "{error}: {error_description}".format( |
| 100 | + # Do explicit response.get(...) rather than **response |
| 101 | + error=response.get("error"), |
| 102 | + error_description=response.get("error_description"))) |
| 103 | + assertion() |
| 104 | + |
| 105 | + def assertCacheWorks(self, result_from_wire): |
| 106 | + result = result_from_wire |
| 107 | + # You can filter by predefined username, or let end user to choose one |
| 108 | + accounts = self.app.get_accounts(username=self.config.get("username")) |
| 109 | + self.assertNotEqual(0, len(accounts)) |
| 110 | + account = accounts[0] |
| 111 | + # Going to test acquire_token_silent(...) to locate an AT from cache |
| 112 | + result_from_cache = self.app.acquire_token_silent( |
| 113 | + self.config["scope"], account=account) |
| 114 | + self.assertIsNotNone(result_from_cache) |
| 115 | + self.assertEqual(result['access_token'], result_from_cache['access_token'], |
| 116 | + "We should get a cached AT") |
| 117 | + |
| 118 | + # Going to test acquire_token_silent(...) to obtain an AT by a RT from cache |
| 119 | + self.app.token_cache._cache["AccessToken"] = {} # A hacky way to clear ATs |
| 120 | + result_from_cache = self.app.acquire_token_silent( |
| 121 | + self.config["scope"], account=account) |
| 122 | + self.assertIsNotNone(result_from_cache, |
| 123 | + "We should get a result from acquire_token_silent(...) call") |
| 124 | + self.assertNotEqual(result['access_token'], result_from_cache['access_token'], |
| 125 | + "We should get a fresh AT (via RT)") |
| 126 | + |
| 127 | + |
| 128 | +class UsernamePasswordMixin(object): |
| 129 | + def test_username_password(self): |
| 130 | + self.skipIfNotConfigured([ |
| 131 | + "authority", "client_id", "username", "password", "scope"]) |
| 132 | + self.app = msal.PublicClientApplication( |
| 133 | + self.config["client_id"], authority=self.config["authority"]) |
| 134 | + result = self.app.acquire_token_by_username_password( |
| 135 | + self.config["username"], self.config["password"], |
| 136 | + scopes=self.config.get("scope")) |
| 137 | + self.assertLoosely(result) |
| 138 | + self.assertCacheWorks(result) |
| 139 | + |
| 140 | + |
| 141 | +DEFAULT_QUERY = {"mam": False, "mfa": False} |
| 142 | + |
| 143 | +# Note: the following semi-parameterized testing approach is inspired from |
| 144 | +# https://bugs.python.org/msg151444 |
| 145 | + |
| 146 | +@unittest.skip("for now") |
| 147 | +class AadManagedUserPassTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 148 | + def setUp(self): |
| 149 | + self.config = get_lab_user(dict(DEFAULT_QUERY, isFederated=False)) |
| 150 | + |
| 151 | +@unittest.skip("for now") |
| 152 | +class Adfs4FedUserPassTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 153 | + def setUp(self): |
| 154 | + self.config = get_lab_user(dict( |
| 155 | + DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv4")) |
| 156 | + |
| 157 | +@unittest.skip("for now") |
| 158 | +class Adfs4ManagedUserPassTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 159 | + def setUp(self): |
| 160 | + self.config = get_lab_user(dict( |
| 161 | + DEFAULT_QUERY, isFederated=False, federationProvider="ADFSv4")) |
| 162 | + |
| 163 | +@unittest.skip("for now") # TODO: Need to pick up the real password |
| 164 | +class Adfs3FedUserPassTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 165 | + def setUp(self): |
| 166 | + self.config = get_lab_user(dict( |
| 167 | + DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv3")) |
| 168 | + |
| 169 | +@unittest.skip("for now") # TODO: Need to pick up the real password |
| 170 | +class Adfs3ManagedUserPassTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 171 | + def setUp(self): |
| 172 | + self.config = get_lab_user(dict( |
| 173 | + DEFAULT_QUERY, isFederated=False, federationProvider="ADFSv3")) |
| 174 | + |
| 175 | +@unittest.skip("for now") # TODO: Need to pick up the real password |
| 176 | +class Adfs2FedUserPassTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 177 | + def setUp(self): |
| 178 | + self.config = get_lab_user(dict( |
| 179 | + DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv2")) |
| 180 | + |
| 181 | +@unittest.skip("Lab API returns nothing. We might need to switch to beta api") |
| 182 | +class Adfs2019FedUserPassTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 183 | + def setUp(self): |
| 184 | + self.config = get_lab_user(dict( |
| 185 | + DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv2019")) |
| 186 | + |
| 187 | +CONFIG = os.path.join(os.path.dirname(__file__), "config.json") |
| 188 | +@unittest.skipIf(not os.path.exists(CONFIG), "Optional %s not found" % CONFIG) |
| 189 | +class FileBasedTestCase(BaseMixin, UsernamePasswordMixin, unittest.TestCase): |
| 190 | + def setUp(self): |
| 191 | + with open(CONFIG) as f: |
| 192 | + self.config = json.load(f) |
| 193 | + |
0 commit comments