Skip to content

Commit de48bf1

Browse files
committed
getting properly failing test on refreshs
1 parent 1287195 commit de48bf1

File tree

1 file changed

+85
-0
lines changed

1 file changed

+85
-0
lines changed

apps/dot_ext/tests/test_authorization.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,91 @@ def test_post_with_invalid_non_standard_scheme_granttype_authcode_clienttype_con
234234
response = self.client.post(reverse('oauth2_provider:authorize'), data=payload)
235235
self.assertEqual(response.status_code, 400)
236236

237+
# FIXME: This should be merged somehow with test_refresh_token and also include version checking.
238+
# Currently, this is expected to fail because the fhir_ids somehow aren't being updated on refresh.
239+
def test_refresh_token_fhir_id_storing(self):
240+
redirect_uri = 'http://localhost'
241+
# create a user
242+
self._create_user('anna', '123456')
243+
capability_a = self._create_capability('Capability A', [])
244+
capability_b = self._create_capability('Capability B', [])
245+
# create an application and add capabilities
246+
application = self._create_application(
247+
'an app',
248+
grant_type=Application.GRANT_AUTHORIZATION_CODE,
249+
client_type=Application.CLIENT_CONFIDENTIAL,
250+
redirect_uris=redirect_uri)
251+
application.scope.add(capability_a, capability_b)
252+
# user logs in
253+
request = HttpRequest()
254+
self.client.login(request=request, username='anna', password='123456')
255+
# post the authorization form with only one scope selected
256+
payload = {
257+
'client_id': application.client_id,
258+
'response_type': 'code',
259+
'redirect_uri': redirect_uri,
260+
'scope': ['capability-a'],
261+
'expires_in': 86400,
262+
'allow': True,
263+
"state": "0123456789abcdef",
264+
}
265+
response = self.client.post(reverse('oauth2_provider:authorize'), data=payload)
266+
self.client.logout()
267+
self.assertEqual(response.status_code, 302)
268+
# now extract the authorization code and use it to request an access_token
269+
query_dict = parse_qs(urlparse(response['Location']).query)
270+
authorization_code = query_dict.pop('code')
271+
token_request_data = {
272+
'grant_type': 'authorization_code',
273+
'code': authorization_code,
274+
'redirect_uri': redirect_uri,
275+
'client_id': application.client_id,
276+
'client_secret': application.client_secret_plain,
277+
}
278+
c = Client()
279+
if switch_is_active('v3_endpoints'):
280+
response = c.post('/v3/o/token/', data=token_request_data)
281+
else:
282+
response = c.post('/v2/o/token/', data=token_request_data)
283+
self.assertEqual(response.status_code, 200)
284+
# Now we have a token and refresh token
285+
tkn = response.json()['access_token']
286+
refresh_tkn = response.json()['refresh_token']
287+
refresh_request_data = {
288+
'grant_type': 'refresh_token',
289+
'refresh_token': refresh_tkn,
290+
'redirect_uri': redirect_uri,
291+
'client_id': application.client_id,
292+
'client_secret': application.client_secret_plain,
293+
}
294+
response = self.client.post(reverse('oauth2_provider:token'), data=refresh_request_data)
295+
self.assertEqual(response.status_code, 200)
296+
self.assertNotEqual(response.json()['access_token'], tkn)
297+
# Capture rotated refresh token (server may rotate refresh tokens)
298+
new_refresh = response.json().get('refresh_token')
299+
if new_refresh:
300+
refresh_request_data['refresh_token'] = new_refresh
301+
user = User.objects.get(username='anna')
302+
crosswalk = Crosswalk.objects.get(user=user)
303+
print(f'what is in crosswalk {crosswalk.fhir_id_v3}')
304+
# Verify both fhir_id_v2 and fhir_id_v3 are populated
305+
self.assertIsNotNone(crosswalk.fhir_id_v2)
306+
self.assertIsNotNone(crosswalk.fhir_id_v3)
307+
self.assertTrue(len(crosswalk.fhir_id_v2) > 0)
308+
self.assertTrue(len(crosswalk.fhir_id_v3) > 0)
309+
# Changing the fhir ids to test that they get updated on refresh
310+
crosswalk.fhir_id_v2 = 'old_fhir_id_v2'
311+
crosswalk.fhir_id_v3 = 'old_fhir_id_v3'
312+
crosswalk.save()
313+
response = self.client.post(reverse('oauth2_provider:token'), data=refresh_request_data)
314+
print(f'Refresh response: {response.json()}')
315+
self.assertEqual(response.status_code, 200)
316+
self.assertNotEqual(response.json()['access_token'], tkn)
317+
crosswalk.refresh_from_db()
318+
# Verify both fhir_id_v2 and fhir_id_v3 are updated
319+
self.assertNotEqual(crosswalk.fhir_id_v2, 'old_fhir_id_v2')
320+
self.assertNotEqual(crosswalk.fhir_id_v3, 'old_fhir_id_v3')
321+
237322
def test_refresh_token(self):
238323
redirect_uri = 'http://localhost'
239324
# create a user

0 commit comments

Comments
 (0)