|
6 | 6 | from datetime import datetime |
7 | 7 | from dateutil.relativedelta import relativedelta |
8 | 8 | # from oauth2_provider.compat import parse_qs, urlparse |
9 | | -from urllib.parse import parse_qs, urlparse |
| 9 | +from oauthlib.oauth2.rfc6749.errors import AccessDeniedError as AccessDeniedTokenCustomError |
10 | 10 | from oauth2_provider.models import get_access_token_model, get_refresh_token_model |
| 11 | +from django.core.exceptions import PermissionDenied |
11 | 12 | from django.http import HttpRequest |
12 | 13 | from django.urls import reverse |
13 | 14 | from django.test import Client |
14 | | -from apps.core.models import Flag |
15 | | -from waffle.testutils import override_switch, override_flag |
| 15 | +from unittest.mock import patch, MagicMock |
| 16 | +from urllib.parse import parse_qs, urlencode, urlparse |
| 17 | +from waffle.testutils import override_switch |
16 | 18 |
|
17 | 19 | from apps.test import BaseApiTest |
18 | 20 | from ..models import Application, ArchivedToken |
| 21 | +from apps.dot_ext.views import AuthorizationView, TokenView |
19 | 22 | from apps.authorization.models import DataAccessGrant, ArchivedDataAccessGrant |
20 | 23 | from http import HTTPStatus |
21 | 24 |
|
@@ -1022,65 +1025,100 @@ def _execute_token_endpoint(self, token_path): |
1022 | 1025 | response = c.post(token_path, data=token_request_data) |
1023 | 1026 | self.assertEqual(response.status_code, 200) |
1024 | 1027 |
|
1025 | | - @override_switch('v3_endpoints', active=True) |
1026 | | - @override_flag('v3_early_adopter', active=True) |
1027 | | - def test_v3_token_endpoint_with_early_adopter_flag_enabled(self): |
1028 | | - self._execute_token_endpoint('/v3/o/token/') |
| 1028 | + @patch('apps.dot_ext.views.authorization.get_user_model') |
| 1029 | + @patch('apps.dot_ext.views.authorization.get_application_model') |
| 1030 | + @patch('apps.dot_ext.views.authorization.get_waffle_flag_model') |
| 1031 | + def test_permission_denied_raised_for_authorize_app_not_in_flag( |
| 1032 | + self, |
| 1033 | + mock_get_flag_model, |
| 1034 | + mock_get_application_model, |
| 1035 | + mock_get_user_model |
| 1036 | + ): |
| 1037 | + # Unit test to show that we will raise an AccessDeniedTokenCustomError |
| 1038 | + # when the validate_v3_authorization_request of AuthorizationView function is called |
| 1039 | + # when the v3_early_adopter flag is not active for an application_user |
| 1040 | + # set up a fake request |
| 1041 | + query_params = urlencode({'client_id': 'FAKE_CLIENT_ID'}) |
| 1042 | + request = HttpRequest() |
| 1043 | + request.META['QUERY_STRING'] = query_params |
1029 | 1044 |
|
1030 | | - @override_switch('v3_endpoints', active=True) |
1031 | | - # @override_flag('v3_early_adopter', active=False) |
1032 | | - def test_v3_token_endpoint_with_early_adopter_flag_disabled(self): |
1033 | | - self._execute_token_endpoint_for_flag_test('/v3/o/token/') |
| 1045 | + # Mock the required objects/queries around flag/application/user |
| 1046 | + fake_flag = MagicMock() |
| 1047 | + fake_flag.id = 123 |
| 1048 | + fake_flag.name = 'v3_early_adopter' |
| 1049 | + fake_flag.is_active_for_user.return_value = False |
| 1050 | + mock_get_flag_model.return_value.get.return_value = fake_flag |
1034 | 1051 |
|
1035 | | - # @override_switch('v3_endpoints', active=True) |
1036 | | - # @override_flag('v3_early_adopter', active=True) |
1037 | | - # def test_v3_token_endpoint_without_trailling_slash(self): |
1038 | | - # self._execute_token_endpoint('/v3/o/token') |
| 1052 | + fake_application = MagicMock() |
| 1053 | + fake_application.id = 42 |
| 1054 | + fake_application.user_id = 999 |
| 1055 | + fake_application.name = 'TestApp' |
| 1056 | + mock_manager_app = MagicMock() |
| 1057 | + mock_manager_app.get.return_value = fake_application |
| 1058 | + mock_get_application_model.return_value.objects = mock_manager_app |
1039 | 1059 |
|
1040 | | - def _execute_token_endpoint_for_flag_test(self, token_path): |
1041 | | - Flag.objects.create(name='v3_early_adopter', everyone=None) |
1042 | | - redirect_uri = 'http://localhost' |
1043 | | - # create a user |
1044 | | - user = self._create_user('anna', '123456') |
1045 | | - capability_a = self._create_capability('Capability A', []) |
1046 | | - capability_b = self._create_capability('Capability B', []) |
1047 | | - print("USER ID: ", user.id) |
1048 | | - # create an application and add capabilities |
1049 | | - application = self._create_application( |
1050 | | - 'an app', |
1051 | | - grant_type=Application.GRANT_AUTHORIZATION_CODE, |
1052 | | - client_type=Application.CLIENT_CONFIDENTIAL, |
1053 | | - redirect_uris=redirect_uri, |
1054 | | - user_id=user.id) |
1055 | | - application.scope.add(capability_a, capability_b) |
1056 | | - # user logs in |
| 1060 | + fake_user = MagicMock() |
| 1061 | + fake_user.id = 999 |
| 1062 | + mock_manager_user = MagicMock() |
| 1063 | + mock_manager_user.get.return_value = fake_user |
| 1064 | + mock_get_user_model.return_value.objects = mock_manager_user |
| 1065 | + |
| 1066 | + # Create an instance of the view |
| 1067 | + view_instance = AuthorizationView() |
| 1068 | + view_instance.request = request |
| 1069 | + |
| 1070 | + with self.assertRaises(AccessDeniedTokenCustomError): |
| 1071 | + view_instance.validate_v3_authorization_request() |
| 1072 | + |
| 1073 | + @patch('apps.dot_ext.views.authorization.get_user_model') |
| 1074 | + @patch('apps.dot_ext.views.authorization.get_application_model') |
| 1075 | + @patch('apps.dot_ext.views.authorization.get_refresh_token_model') |
| 1076 | + @patch('apps.dot_ext.views.authorization.get_waffle_flag_model') |
| 1077 | + def test_permission_denied_raised_for_refresh_token_app_not_in_flag( |
| 1078 | + self, |
| 1079 | + mock_get_flag_model, |
| 1080 | + mock_get_refresh_token_model, |
| 1081 | + mock_get_application_model, |
| 1082 | + mock_get_user_model |
| 1083 | + ): |
| 1084 | + # Unit test to show that we will raise an PermissionDenied |
| 1085 | + # when the validate_v3_token_call of TokenView function is called |
| 1086 | + # when the v3_early_adopter flag is not active for an application_user |
| 1087 | + token_value = 'FAKE_REFRESH_TOKEN' |
| 1088 | + body = urlencode({'refresh_token': token_value}).encode('utf-8') |
1057 | 1089 | request = HttpRequest() |
1058 | | - self.client.login(request=request, username='anna', password='123456') |
1059 | | - # post the authorization form with only one scope selected |
1060 | | - payload = { |
1061 | | - 'client_id': application.client_id, |
1062 | | - 'response_type': 'code', |
1063 | | - 'redirect_uri': redirect_uri, |
1064 | | - 'scope': ['capability-a'], |
1065 | | - 'expires_in': 86400, |
1066 | | - 'allow': True, |
1067 | | - "state": "0123456789abcdef", |
1068 | | - 'refresh_token': 'asdfj23h4q98wuafidj' |
1069 | | - } |
1070 | | - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) |
1071 | | - self.client.logout() |
1072 | | - self.assertEqual(response.status_code, 302) |
1073 | | - # now extract the authorization code and use it to request an access_token |
1074 | | - query_dict = parse_qs(urlparse(response['Location']).query) |
1075 | | - authorization_code = query_dict.pop('code') |
1076 | | - token_request_data = { |
1077 | | - 'grant_type': 'authorization_code', |
1078 | | - 'code': authorization_code, |
1079 | | - 'redirect_uri': redirect_uri, |
1080 | | - 'client_id': application.client_id, |
1081 | | - 'client_secret': application.client_secret_plain, |
1082 | | - } |
1083 | | - c = Client() |
1084 | | - print("token path: ", token_path) |
1085 | | - response = c.post(token_path, data=token_request_data) |
1086 | | - self.assertEqual(response.status_code, 403) |
| 1090 | + request._body = body |
| 1091 | + |
| 1092 | + # Mock the required objects/queries around flag/refresh_token/application/user |
| 1093 | + fake_flag = MagicMock() |
| 1094 | + fake_flag.id = 123 |
| 1095 | + fake_flag.name = 'v3_early_adopter' |
| 1096 | + fake_flag.is_active_for_user.return_value = False |
| 1097 | + mock_get_flag_model.return_value.get.return_value = fake_flag |
| 1098 | + |
| 1099 | + fake_refresh_token = MagicMock() |
| 1100 | + fake_refresh_token.token = token_value |
| 1101 | + fake_refresh_token.application_id = 42 |
| 1102 | + mock_manager_refresh = MagicMock() |
| 1103 | + mock_manager_refresh.get.return_value = fake_refresh_token |
| 1104 | + mock_get_refresh_token_model.return_value.objects = mock_manager_refresh |
| 1105 | + |
| 1106 | + fake_application = MagicMock() |
| 1107 | + fake_application.id = 42 |
| 1108 | + fake_application.user_id = 999 |
| 1109 | + fake_application.name = 'TestApp' |
| 1110 | + mock_manager_app = MagicMock() |
| 1111 | + mock_manager_app.get.return_value = fake_application |
| 1112 | + mock_get_application_model.return_value.objects = mock_manager_app |
| 1113 | + |
| 1114 | + fake_user = MagicMock() |
| 1115 | + fake_user.id = 999 |
| 1116 | + mock_manager_user = MagicMock() |
| 1117 | + mock_manager_user.get.return_value = fake_user |
| 1118 | + mock_get_user_model.return_value.objects = mock_manager_user |
| 1119 | + |
| 1120 | + # Create an instance of the view |
| 1121 | + view_instance = TokenView() |
| 1122 | + |
| 1123 | + with self.assertRaises(PermissionDenied): |
| 1124 | + view_instance.validate_v3_token_call(request) |
0 commit comments