Skip to content

Commit de8010f

Browse files
soerfacen2ygk
authored andcommitted
Add tests which describe desired behaviour for REFRESH_TOKEN_REUSE_PROTECTION (#1404)
1 parent 9c18de2 commit de8010f

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

tests/test_authorization_code.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,53 @@ def test_refresh_fail_repeating_requests(self):
985985
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
986986
self.assertEqual(response.status_code, 400)
987987

988+
def test_refresh_repeating_requests_revokes_old_token(self):
989+
"""
990+
If a refresh token is reused, the server should invalidate *all* access tokens that have a relation
991+
to the re-used token. This forces a malicious actor to be logged out.
992+
The server can't determine whether the first or the second client was legitimate, so it needs to
993+
revoke both.
994+
See https://datatracker.ietf.org/doc/html/draft-ietf-oauth-security-topics-29#name-recommendations
995+
"""
996+
self.oauth2_settings.REFRESH_TOKEN_REUSE_PROTECTION = True
997+
self.client.login(username="test_user", password="123456")
998+
authorization_code = self.get_auth()
999+
1000+
token_request_data = {
1001+
"grant_type": "authorization_code",
1002+
"code": authorization_code,
1003+
"redirect_uri": "http://example.org",
1004+
}
1005+
auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET)
1006+
1007+
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
1008+
content = json.loads(response.content.decode("utf-8"))
1009+
self.assertTrue("refresh_token" in content)
1010+
1011+
token_request_data = {
1012+
"grant_type": "refresh_token",
1013+
"refresh_token": content["refresh_token"],
1014+
"scope": content["scope"],
1015+
}
1016+
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
1017+
self.assertEqual(response.status_code, 200)
1018+
new_tokens = json.loads(response.content.decode("utf-8"))
1019+
1020+
# Second request fails
1021+
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
1022+
self.assertEqual(response.status_code, 400)
1023+
1024+
# Previously returned tokens are now invalid
1025+
new_token_request_data = {
1026+
"grant_type": "refresh_token",
1027+
"refresh_token": new_tokens["refresh_token"],
1028+
"scope": new_tokens["scope"],
1029+
}
1030+
response = self.client.post(
1031+
reverse("oauth2_provider:token"), data=new_token_request_data, **auth_headers
1032+
)
1033+
self.assertEqual(response.status_code, 400)
1034+
9881035
def test_refresh_repeating_requests(self):
9891036
"""
9901037
Trying to refresh an access token with the same refresh token more than
@@ -1024,6 +1071,63 @@ def test_refresh_repeating_requests(self):
10241071
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
10251072
self.assertEqual(response.status_code, 400)
10261073

1074+
def test_refresh_repeating_requests_grace_period_with_reuse_protection(self):
1075+
"""
1076+
Trying to refresh an access token with the same refresh token more than
1077+
once succeeds. Should work within the grace period, but should revoke previous tokens
1078+
"""
1079+
self.oauth2_settings.REFRESH_TOKEN_GRACE_PERIOD_SECONDS = 120
1080+
self.oauth2_settings.REFRESH_TOKEN_REUSE_PROTECTION = True
1081+
self.client.login(username="test_user", password="123456")
1082+
authorization_code = self.get_auth()
1083+
1084+
token_request_data = {
1085+
"grant_type": "authorization_code",
1086+
"code": authorization_code,
1087+
"redirect_uri": "http://example.org",
1088+
}
1089+
auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET)
1090+
1091+
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
1092+
content = json.loads(response.content.decode("utf-8"))
1093+
self.assertTrue("refresh_token" in content)
1094+
1095+
refresh_token_1 = content["refresh_token"]
1096+
token_request_data = {
1097+
"grant_type": "refresh_token",
1098+
"refresh_token": refresh_token_1,
1099+
"scope": content["scope"],
1100+
}
1101+
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
1102+
self.assertEqual(response.status_code, 200)
1103+
refresh_token_2 = json.loads(response.content.decode("utf-8"))["refresh_token"]
1104+
1105+
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
1106+
self.assertEqual(response.status_code, 200)
1107+
refresh_token_3 = json.loads(response.content.decode("utf-8"))["refresh_token"]
1108+
1109+
self.assertEqual(refresh_token_2, refresh_token_3)
1110+
1111+
# Let the first refresh token expire
1112+
rt = RefreshToken.objects.get(token=refresh_token_1)
1113+
rt.revoked = timezone.now() - datetime.timedelta(minutes=10)
1114+
rt.save()
1115+
1116+
# Using the expired token fails
1117+
response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers)
1118+
self.assertEqual(response.status_code, 400)
1119+
1120+
# Because we used the expired token, the recently issued token is also revoked
1121+
new_token_request_data = {
1122+
"grant_type": "refresh_token",
1123+
"refresh_token": refresh_token_2,
1124+
"scope": content["scope"],
1125+
}
1126+
response = self.client.post(
1127+
reverse("oauth2_provider:token"), data=new_token_request_data, **auth_headers
1128+
)
1129+
self.assertEqual(response.status_code, 400)
1130+
10271131
def test_refresh_repeating_requests_non_rotating_tokens(self):
10281132
"""
10291133
Try refreshing an access token with the same refresh token more than once when not rotating tokens.

0 commit comments

Comments
 (0)