diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d1d8618a..8839c4889 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## [Unreleased] + +### Changed +- **BREAKING:** In `serializers.py`, when a user linked to a token is missing or deleted, the code now raises `AuthenticationFailed("no_active_account")` instead of allowing `DoesNotExist` to propagate. + - Response changed from **404 Not Found** → **401 Unauthorized**. + - Improves security by not leaking whether a user/token exists. + - Follows RFC 7235, where authentication failures should return 401. + - Clearer for clients: signals an auth issue instead of suggesting the endpoint is missing. + + ## 5.5.1 Missing Migration for rest_framework_simplejwt.token_blacklist app. A previously missing migration (0013_blacklist) has now been added. This issue arose because the migration file was mistakenly not generated earlier. This migration was never part of an official release, but users following the latest master branch may have encountered it. diff --git a/rest_framework_simplejwt/authentication.py b/rest_framework_simplejwt/authentication.py index 060ddcc33..706196600 100644 --- a/rest_framework_simplejwt/authentication.py +++ b/rest_framework_simplejwt/authentication.py @@ -33,6 +33,10 @@ class JWTAuthentication(authentication.BaseAuthentication): www_authenticate_realm = "api" media_type = "application/json" + default_error_messages = { + "password_changed": _("The user's password has been changed."), + } + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.user_model = get_user_model() @@ -143,7 +147,8 @@ def get_user(self, validated_token: Token) -> AuthUser: api_settings.REVOKE_TOKEN_CLAIM ) != get_md5_hash_password(user.password): raise AuthenticationFailed( - _("The user's password has been changed."), code="password_changed" + self.default_error_messages["password_changed"], + code="password_changed", ) return user diff --git a/rest_framework_simplejwt/serializers.py b/rest_framework_simplejwt/serializers.py index 7fa717c67..04c8be43b 100644 --- a/rest_framework_simplejwt/serializers.py +++ b/rest_framework_simplejwt/serializers.py @@ -10,6 +10,7 @@ from .models import TokenUser from .settings import api_settings from .tokens import RefreshToken, SlidingToken, Token, UntypedToken +from .utils import get_md5_hash_password AuthUser = TypeVar("AuthUser", AbstractBaseUser, TokenUser) @@ -105,7 +106,8 @@ class TokenRefreshSerializer(serializers.Serializer): token_class = RefreshToken default_error_messages = { - "no_active_account": _("No active account found for the given token.") + "no_active_account": _("No active account found for the given token."), + "password_changed": _("The user's password has been changed."), } def validate(self, attrs: dict[str, Any]) -> dict[str, str]: @@ -113,19 +115,42 @@ def validate(self, attrs: dict[str, Any]) -> dict[str, str]: user_id = refresh.payload.get(api_settings.USER_ID_CLAIM, None) if user_id: - user = ( - get_user_model() - .objects.filter(**{api_settings.USER_ID_FIELD: user_id}) - .first() - ) - if not user or not api_settings.USER_AUTHENTICATION_RULE(user): + try: + user = get_user_model().objects.get( + **{api_settings.USER_ID_FIELD: user_id} + ) + except get_user_model().DoesNotExist: + # This handles the case where the user has been deleted. raise AuthenticationFailed( - self.error_messages["no_active_account"], - "no_active_account", + self.error_messages["no_active_account"], "no_active_account" ) - data = {"access": str(refresh.access_token)} + if not api_settings.USER_AUTHENTICATION_RULE(user): + raise AuthenticationFailed( + self.error_messages["no_active_account"], "no_active_account" + ) + if api_settings.CHECK_REVOKE_TOKEN: + if refresh.payload.get( + api_settings.REVOKE_TOKEN_CLAIM + ) != get_md5_hash_password(user.password): + # If the password has changed, we blacklist the token + # to prevent any further use. + if ( + "rest_framework_simplejwt.token_blacklist" + in settings.INSTALLED_APPS + ): + try: + refresh.blacklist() + except AttributeError: + pass + + raise AuthenticationFailed( + self.error_messages["password_changed"], + code="password_changed", + ) + + data = {"access": str(refresh.access_token)} if api_settings.ROTATE_REFRESH_TOKENS: if api_settings.BLACKLIST_AFTER_ROTATION: try: @@ -150,8 +175,49 @@ class TokenRefreshSlidingSerializer(serializers.Serializer): token = serializers.CharField() token_class = SlidingToken + default_error_messages = { + "no_active_account": _("No active account found for the given token."), + "password_changed": _("The user's password has been changed."), + } + def validate(self, attrs: dict[str, Any]) -> dict[str, str]: token = self.token_class(attrs["token"]) + user_id = token.payload.get(api_settings.USER_ID_CLAIM, None) + if user_id: + try: + user = get_user_model().objects.get( + **{api_settings.USER_ID_FIELD: user_id} + ) + except get_user_model().DoesNotExist: + # This handles the case where the user has been deleted. + raise AuthenticationFailed( + self.error_messages["no_active_account"], "no_active_account" + ) + + if not api_settings.USER_AUTHENTICATION_RULE(user): + raise AuthenticationFailed( + self.error_messages["no_active_account"], "no_active_account" + ) + + if api_settings.CHECK_REVOKE_TOKEN: + if token.payload.get( + api_settings.REVOKE_TOKEN_CLAIM + ) != get_md5_hash_password(user.password): + # If the password has changed, we blacklist the token + # to prevent any further use. + if ( + "rest_framework_simplejwt.token_blacklist" + in settings.INSTALLED_APPS + ): + try: + token.blacklist() + except AttributeError: + pass + + raise AuthenticationFailed( + self.error_messages["password_changed"], + code="password_changed", + ) # Check that the timestamp in the "refresh_exp" claim has not # passed diff --git a/tests/test_serializers.py b/tests/test_serializers.py index b3cdf4c72..ed8645e54 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -187,6 +187,15 @@ def test_it_should_produce_a_json_web_token_when_valid(self): class TestTokenRefreshSlidingSerializer(TestCase): + def setUp(self): + self.username = "test_user" + self.password = "test_password" + + self.user = User.objects.create_user( + username=self.username, + password=self.password, + ) + def test_it_should_not_validate_if_token_invalid(self): token = SlidingToken() del token["exp"] @@ -268,6 +277,74 @@ def test_it_should_update_token_exp_claim_if_everything_ok(self): self.assertTrue(old_exp < new_exp) + def test_it_should_raise_error_for_deleted_users(self): + token = SlidingToken.for_user(self.user) + self.user.delete() + + s = TokenRefreshSlidingSerializer(data={"token": str(token)}) + + # It should raise AuthenticationFailed instead of ObjectDoesNotExist + with self.assertRaises(drf_exceptions.AuthenticationFailed) as e: + s.is_valid() + + self.assertEqual(e.exception.get_codes(), "no_active_account") + + def test_it_should_raise_error_for_inactive_users(self): + token = SlidingToken.for_user(self.user) + self.user.is_active = False + self.user.save() + + s = TokenRefreshSlidingSerializer(data={"token": str(token)}) + + with self.assertRaises(drf_exceptions.AuthenticationFailed) as e: + s.is_valid() + + self.assertEqual(e.exception.get_codes(), "no_active_account") + + @override_api_settings( + CHECK_REVOKE_TOKEN=True, + REVOKE_TOKEN_CLAIM="hash_password", + BLACKLIST_AFTER_ROTATION=False, + ) + def test_sliding_token_should_fail_after_password_change(self): + """ + Tests that sliding token refresh fails if CHECK_REVOKE_TOKEN is True and the + user's password has changed. + """ + token = SlidingToken.for_user(self.user) + self.user.set_password("new_password") + self.user.save() + + s = TokenRefreshSlidingSerializer(data={"token": str(token)}) + + with self.assertRaises(drf_exceptions.AuthenticationFailed) as e: + s.is_valid(raise_exception=True) + + self.assertEqual(e.exception.get_codes(), "password_changed") + + @override_api_settings( + CHECK_REVOKE_TOKEN=True, + REVOKE_TOKEN_CLAIM="hash_password", + BLACKLIST_AFTER_ROTATION=True, + ) + def test_sliding_token_should_blacklist_after_password_change(self): + """ + Tests that if sliding token refresh fails due to a password change, the + offending token is blacklisted. + """ + token = SlidingToken.for_user(self.user) + self.user.set_password("new_password") + self.user.save() + + s = TokenRefreshSlidingSerializer(data={"token": str(token)}) + with self.assertRaises(drf_exceptions.AuthenticationFailed): + s.is_valid(raise_exception=True) + + # Check that the token is now in the blacklist + jti = token[api_settings.JTI_CLAIM] + self.assertTrue(OutstandingToken.objects.filter(jti=jti).exists()) + self.assertTrue(BlacklistedToken.objects.filter(token__jti=jti).exists()) + class TestTokenRefreshSerializer(TestCase): def setUp(self): @@ -285,10 +362,11 @@ def test_it_should_raise_error_for_deleted_users(self): s = TokenRefreshSerializer(data={"refresh": str(refresh)}) + # It should raise AuthenticationFailed instead of ObjectDoesNotExist with self.assertRaises(drf_exceptions.AuthenticationFailed) as e: s.is_valid() - self.assertIn("No active account", str(e.exception)) + self.assertEqual(e.exception.get_codes(), "no_active_account") def test_it_should_raise_error_for_inactive_users(self): refresh = RefreshToken.for_user(self.user) @@ -482,6 +560,50 @@ def test_blacklist_app_not_installed_should_pass(self): reload(tokens) reload(serializers) + @override_api_settings( + CHECK_REVOKE_TOKEN=True, + REVOKE_TOKEN_CLAIM="hash_password", + BLACKLIST_AFTER_ROTATION=False, + ) + def test_refresh_token_should_fail_after_password_change(self): + """ + Tests that token refresh fails if CHECK_REVOKE_TOKEN is True and the + user's password has changed. + """ + refresh = RefreshToken.for_user(self.user) + self.user.set_password("new_password") + self.user.save() + + s = TokenRefreshSerializer(data={"refresh": str(refresh)}) + + with self.assertRaises(drf_exceptions.AuthenticationFailed) as e: + s.is_valid(raise_exception=True) + + self.assertEqual(e.exception.get_codes(), "password_changed") + + @override_api_settings( + CHECK_REVOKE_TOKEN=True, + REVOKE_TOKEN_CLAIM="hash_password", + BLACKLIST_AFTER_ROTATION=True, + ) + def test_refresh_token_should_blacklist_after_password_change(self): + """ + Tests that if token refresh fails due to a password change, the + offending refresh token is blacklisted. + """ + refresh = RefreshToken.for_user(self.user) + self.user.set_password("new_password") + self.user.save() + + s = TokenRefreshSerializer(data={"refresh": str(refresh)}) + with self.assertRaises(drf_exceptions.AuthenticationFailed): + s.is_valid(raise_exception=True) + + # Check that the token is now in the blacklist + jti = refresh[api_settings.JTI_CLAIM] + self.assertTrue(OutstandingToken.objects.filter(jti=jti).exists()) + self.assertTrue(BlacklistedToken.objects.filter(token__jti=jti).exists()) + class TestTokenVerifySerializer(TestCase): def test_it_should_raise_token_error_if_token_invalid(self):