Skip to content

Commit fb56287

Browse files
committed
Combine multiple sub-classes back to one test class
1 parent 64819e2 commit fb56287

File tree

1 file changed

+67
-82
lines changed

1 file changed

+67
-82
lines changed

tests/test_e2e.py

Lines changed: 67 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@
1313

1414

1515
class E2eTestCase(unittest.TestCase):
16-
config = {}
17-
18-
def skipIfNotConfigured(self, fields):
19-
for field in fields:
20-
if not self.config.get(field):
21-
self.skipTest('"%s" not found in configuration' % field)
2216

2317
def assertLoosely(self, response, assertion=None,
2418
skippable_errors=("invalid_grant", "interaction_required")):
@@ -36,39 +30,36 @@ def assertLoosely(self, response, assertion=None,
3630
error_description=response.get("error_description")))
3731
assertion()
3832

39-
def assertCacheWorks(self, result_from_wire):
33+
def assertCacheWorks(self, result_from_wire, username, scope):
4034
result = result_from_wire
4135
# You can filter by predefined username, or let end user to choose one
42-
accounts = self.app.get_accounts(username=self.config.get("username"))
36+
accounts = self.app.get_accounts(username=username)
4337
self.assertNotEqual(0, len(accounts))
4438
account = accounts[0]
4539
# Going to test acquire_token_silent(...) to locate an AT from cache
46-
result_from_cache = self.app.acquire_token_silent(
47-
self.config["scope"], account=account)
40+
result_from_cache = self.app.acquire_token_silent(scope, account=account)
4841
self.assertIsNotNone(result_from_cache)
4942
self.assertEqual(result['access_token'], result_from_cache['access_token'],
5043
"We should get a cached AT")
5144

5245
# Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
5346
self.app.token_cache._cache["AccessToken"] = {} # A hacky way to clear ATs
54-
result_from_cache = self.app.acquire_token_silent(
55-
self.config["scope"], account=account)
47+
result_from_cache = self.app.acquire_token_silent(scope, account=account)
5648
self.assertIsNotNone(result_from_cache,
5749
"We should get a result from acquire_token_silent(...) call")
5850
self.assertNotEqual(result['access_token'], result_from_cache['access_token'],
5951
"We should get a fresh AT (via RT)")
6052

61-
def test_username_password(self):
62-
self.skipIfNotConfigured([
63-
"authority", "client_id", "username", "password", "scope"])
64-
self.app = msal.PublicClientApplication(
65-
self.config["client_id"], authority=self.config["authority"])
53+
def _test_username_password(self,
54+
authority=None, client_id=None, username=None, password=None, scope=None,
55+
**ignored):
56+
assert authority and client_id and username and password and scope
57+
self.app = msal.PublicClientApplication(client_id, authority=authority)
6658
result = self.app.acquire_token_by_username_password(
67-
self.config["username"], self.config["password"],
68-
scopes=self.config.get("scope"))
59+
username, password, scopes=scope)
6960
self.assertLoosely(result)
7061
# self.assertEqual(None, result.get("error"), str(result))
71-
self.assertCacheWorks(result)
62+
self.assertCacheWorks(result, username, scope)
7263

7364

7465
CONFIG = os.path.join(os.path.dirname(__file__), "config.json")
@@ -78,10 +69,14 @@ def setUp(self):
7869
with open(CONFIG) as f:
7970
self.config = json.load(f)
8071

72+
def test_username_password(self):
73+
self._test_username_password(**self.config)
8174

82-
def get_lab_user(query): # This API requires no authorization
75+
def get_lab_user(mam=False, mfa=False, isFederated=False, federationProvider=None):
8376
# Based on https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/LAB.aspx
84-
user = requests.get("https://api.msidlab.com/api/user", params=query).json()
77+
user = requests.get("https://api.msidlab.com/api/user", params=dict( # Publicly available
78+
mam=mam, mfa=mfa, isFederated=isFederated, federationProvider=federationProvider,
79+
)).json()
8580
return { # Mapping lab API response to our simplified configuration format
8681
"authority": user["Authority"][0] + user["Users"]["tenantId"],
8782
"client_id": user["AppID"],
@@ -112,25 +107,35 @@ def get_lab_app(
112107
logger.info("ENV variables %s and/or %s are not defined. Fall back to MSI.",
113108
env_client_id, env_client_secret)
114109
# See also https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Programmatically-accessing-LAB-API's.aspx
115-
raise NotImplementedError("MSI-based mechanism has not been implemented yet")
110+
raise unittest.SkipTest("MSI-based mechanism has not been implemented yet")
116111
return msal.ConfidentialClientApplication(client_id, client_secret,
117112
authority="https://login.microsoftonline.com/"
118113
"72f988bf-86f1-41af-91ab-2d7cd011db47", # Microsoft tenant ID
119114
)
120115

121-
def get_session(lab_app): # BTW, this infrastructure tests the confidential client flow
116+
def get_session(lab_app, scopes): # BTW, this infrastructure tests the confidential client flow
122117
logger.info("Creating session")
123-
lab_token = lab_app.acquire_token_for_client("https://request.msidlab.com/.default")
118+
lab_token = lab_app.acquire_token_for_client(scopes)
124119
session = requests.Session()
125120
session.headers.update({"Authorization": "Bearer %s" % lab_token["access_token"]})
126121
session.hooks["response"].append(lambda r, *args, **kwargs: r.raise_for_status())
127122
return session
128123

129124

130125
class LabBasedTestCase(E2eTestCase):
131-
session = get_session(get_lab_app()) # It will run even all test cases are skipped
132126
_secrets = {}
133127

128+
@classmethod
129+
def setUpClass(cls):
130+
cls.session = get_session(get_lab_app(), [
131+
"https://request.msidlab.com/.default", # Existing user & password API
132+
# "https://user.msidlab.com/.default", # New user API
133+
])
134+
135+
@classmethod
136+
def tearDownClass(cls):
137+
cls.session.close()
138+
134139
@classmethod
135140
def get_lab_user_secret(cls, lab_name="msidlab4"):
136141
lab_name = lab_name.lower()
@@ -146,7 +151,7 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
146151
return cls._secrets[lab_name]
147152

148153
@classmethod
149-
def get_lab_user(cls, query): # The query format is in lab team's Aug 9 email
154+
def get_lab_user(cls, query): # Experimental: The query format is in lab team's Aug 9 email
150155
resp = cls.session.get("https://user.msidlab.com/api/user", params=query)
151156
result = resp.json()[0]
152157
return { # Mapping lab API response to our simplified configuration format
@@ -157,59 +162,39 @@ def get_lab_user(cls, query): # The query format is in lab team's Aug 9 email
157162
"scope": ["https://graph.microsoft.com/.default"],
158163
}
159164

160-
DEFAULT_QUERY = {"mam": False, "mfa": False}
161-
162-
class AadManagedUserTestCase(LabBasedTestCase):
163-
@classmethod
164-
def setUpClass(cls):
165-
cls.config = get_lab_user(dict(DEFAULT_QUERY,
166-
isFederated=False, # Supposed to find a pure managed user,
167-
# but lab still gives us a [email protected]
168-
))
169-
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
170-
171-
class Adfs4FedUserTestCase(LabBasedTestCase):
172-
@classmethod
173-
def setUpClass(cls):
174-
cls.config = get_lab_user(dict(
175-
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv4"))
176-
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
177-
178-
class Adfs4ManagedUserTestCase(LabBasedTestCase): # a.k.a. the hybrid
179-
@classmethod
180-
def setUpClass(cls):
181-
cls.config = get_lab_user(dict(
182-
DEFAULT_QUERY, isFederated=False, federationProvider="ADFSv4"))
183-
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
184-
185-
class Adfs3FedUserTestCase(LabBasedTestCase):
186-
@classmethod
187-
def setUpClass(cls):
188-
cls.config = get_lab_user(dict(
189-
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv3"))
190-
#cls.config = cls.get_lab_user({
191-
# "MFA": "none", "UserType": "federated", "FederationProvider": "adfsv3"})
192-
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
193-
194-
class Adfs3ManagedUserTestCase(LabBasedTestCase): # a.k.a. the hybrid
195-
@classmethod
196-
def setUpClass(cls):
197-
cls.config = get_lab_user(dict(
198-
DEFAULT_QUERY, isFederated=False, federationProvider="ADFSv3"))
199-
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
200-
201-
class Adfs2FedUserTestCase(LabBasedTestCase):
202-
@classmethod
203-
def setUpClass(cls):
204-
cls.config = get_lab_user(dict(
205-
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv2"))
206-
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
207-
208-
@unittest.skip("Lab API returns nothing. We might need to switch to beta api")
209-
class Adfs2019FedUserTestCase(LabBasedTestCase):
210-
@classmethod
211-
def setUpClass(cls):
212-
cls.config = get_lab_user(dict(
213-
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv2019"))
214-
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
165+
def test_aad_managed_user(self): # Pure cloud or hybrid
166+
config = get_lab_user(isFederated=False)
167+
self._test_username_password(
168+
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)
169+
170+
def test_adfs4_fed_user(self):
171+
config = get_lab_user(isFederated=True, federationProvider="ADFSv4")
172+
self._test_username_password(
173+
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)
174+
175+
def test_adfs4_managed_user(self): # Conceptually the hybrid
176+
config = get_lab_user(isFederated=False, federationProvider="ADFSv4")
177+
self._test_username_password(
178+
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)
179+
180+
def test_adfs3_fed_user(self):
181+
config = get_lab_user(isFederated=True, federationProvider="ADFSv3")
182+
self._test_username_password(
183+
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)
184+
185+
def test_adfs3_managed_user(self):
186+
config = get_lab_user(isFederated=False, federationProvider="ADFSv3")
187+
self._test_username_password(
188+
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)
189+
190+
def test_adfs2_fed_user(self):
191+
config = get_lab_user(isFederated=True, federationProvider="ADFSv2")
192+
self._test_username_password(
193+
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)
194+
195+
@unittest.skip("Old Lab API returns nothing. We will switch to new api later")
196+
def test_adfs2019_fed_user(self):
197+
config = get_lab_user(isFederated=True, federationProvider="ADFSv2019")
198+
self._test_username_password(
199+
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)
215200

0 commit comments

Comments
 (0)