diff --git a/apps/accounts/views/oauth2_profile.py b/apps/accounts/views/oauth2_profile.py index f79c5eb3c..cd1050e58 100644 --- a/apps/accounts/views/oauth2_profile.py +++ b/apps/accounts/views/oauth2_profile.py @@ -10,6 +10,7 @@ from apps.fhir.bluebutton.permissions import ApplicationActivePermission from apps.versions import Versions +from apps.wellknown.permissions import V3EarlyAdopterWellKnownPermission def _get_userinfo(user, version=Versions.NOT_AN_API_VERSION): @@ -41,7 +42,8 @@ def _get_userinfo(user, version=Versions.NOT_AN_API_VERSION): @authentication_classes([OAuth2Authentication]) @permission_classes([ApplicationActivePermission, TokenHasProtectedCapability, - DataAccessGrantPermission]) + DataAccessGrantPermission, + V3EarlyAdopterWellKnownPermission]) @protected_resource() # Django OAuth Toolkit -> resource_owner = AccessToken def _openidconnect_userinfo(request, version=Versions.NOT_AN_API_VERSION): # NOTE: The **kwargs are not used anywhere down the callchain, and are being ignored. diff --git a/apps/authorization/tests/test_data_access_grant_permissions.py b/apps/authorization/tests/test_data_access_grant_permissions.py index 7d8d38505..f26b3d5d9 100644 --- a/apps/authorization/tests/test_data_access_grant_permissions.py +++ b/apps/authorization/tests/test_data_access_grant_permissions.py @@ -13,7 +13,7 @@ from apps.authorization.models import ( DataAccessGrant, ) -from waffle.testutils import override_switch +from waffle.testutils import override_switch, override_flag from apps.fhir.bluebutton.tests.test_fhir_resources_read_search_w_validation import ( get_response_json, ) @@ -145,6 +145,7 @@ def setUp(self): self.client = Client() @override_switch('v3_endpoints', active=True) + @override_flag('v3_early_adopter', active=True) def _assert_call_all_fhir_endpoints( self, access_token=None, diff --git a/apps/dot_ext/tests/test_authorization.py b/apps/dot_ext/tests/test_authorization.py index 932571ff6..7626c06c7 100644 --- a/apps/dot_ext/tests/test_authorization.py +++ b/apps/dot_ext/tests/test_authorization.py @@ -6,15 +6,18 @@ from datetime import datetime from dateutil.relativedelta import relativedelta # from oauth2_provider.compat import parse_qs, urlparse -from urllib.parse import parse_qs, urlparse +from oauthlib.oauth2.rfc6749.errors import AccessDeniedError as AccessDeniedTokenCustomError from oauth2_provider.models import get_access_token_model, get_refresh_token_model from django.http import HttpRequest from django.urls import reverse from django.test import Client +from unittest.mock import patch, MagicMock +from urllib.parse import parse_qs, urlencode, urlparse from waffle.testutils import override_switch from apps.test import BaseApiTest from ..models import Application, ArchivedToken +from apps.dot_ext.views import AuthorizationView, TokenView from apps.authorization.models import DataAccessGrant, ArchivedDataAccessGrant from http import HTTPStatus @@ -1020,3 +1023,103 @@ def _execute_token_endpoint(self, token_path): c = Client() response = c.post(token_path, data=token_request_data) self.assertEqual(response.status_code, 200) + + @patch('apps.dot_ext.views.authorization.get_user_model') + @patch('apps.dot_ext.views.authorization.get_application_model') + @patch('apps.dot_ext.views.authorization.get_waffle_flag_model') + def test_permission_denied_raised_for_authorize_app_not_in_flag( + self, + mock_get_flag_model, + mock_get_application_model, + mock_get_user_model + ): + # Unit test to show that we will raise an AccessDeniedTokenCustomError + # when the validate_v3_authorization_request of AuthorizationView function is called + # when the v3_early_adopter flag is not active for an application_user + # set up a fake request + # TODO: When we enable v3 endpoints for all applications, remove this test + query_params = urlencode({'client_id': 'FAKE_CLIENT_ID'}) + request = HttpRequest() + request.META['QUERY_STRING'] = query_params + + # Mock the required objects/queries around flag/application/user + fake_flag = MagicMock() + fake_flag.id = 123 + fake_flag.name = 'v3_early_adopter' + fake_flag.is_active_for_user.return_value = False + mock_get_flag_model.return_value.get.return_value = fake_flag + + fake_application = MagicMock() + fake_application.id = 42 + fake_application.user_id = 999 + fake_application.name = 'TestApp' + mock_manager_app = MagicMock() + mock_manager_app.get.return_value = fake_application + mock_get_application_model.return_value.objects = mock_manager_app + + fake_user = MagicMock() + fake_user.id = 999 + mock_manager_user = MagicMock() + mock_manager_user.get.return_value = fake_user + mock_get_user_model.return_value.objects = mock_manager_user + + # Create an instance of the view + view_instance = AuthorizationView() + view_instance.request = request + + with self.assertRaises(AccessDeniedTokenCustomError): + view_instance.validate_v3_authorization_request() + + @patch('apps.dot_ext.views.authorization.get_user_model') + @patch('apps.dot_ext.views.authorization.get_application_model') + @patch('apps.dot_ext.views.authorization.get_refresh_token_model') + @patch('apps.dot_ext.views.authorization.get_waffle_flag_model') + def test_permission_denied_raised_for_refresh_token_app_not_in_flag( + self, + mock_get_flag_model, + mock_get_refresh_token_model, + mock_get_application_model, + mock_get_user_model + ): + # BB2-4250Unit test to show that we will raise an PermissionDenied + # when the validate_v3_token_call of TokenView function is called + # when the v3_early_adopter flag is not active for an application_user + # TODO: When we enable v3 endpoints for all applications, remove this test + token_value = 'FAKE_REFRESH_TOKEN' + body = urlencode({'refresh_token': token_value}).encode('utf-8') + request = HttpRequest() + request._body = body + + # Mock the required objects/queries around flag/refresh_token/application/user + fake_flag = MagicMock() + fake_flag.id = 123 + fake_flag.name = 'v3_early_adopter' + fake_flag.is_active_for_user.return_value = False + mock_get_flag_model.return_value.get.return_value = fake_flag + + fake_refresh_token = MagicMock() + fake_refresh_token.token = token_value + fake_refresh_token.application_id = 42 + mock_manager_refresh = MagicMock() + mock_manager_refresh.get.return_value = fake_refresh_token + mock_get_refresh_token_model.return_value.objects = mock_manager_refresh + + fake_application = MagicMock() + fake_application.id = 42 + fake_application.user_id = 999 + fake_application.name = 'TestApp' + mock_manager_app = MagicMock() + mock_manager_app.get.return_value = fake_application + mock_get_application_model.return_value.objects = mock_manager_app + + fake_user = MagicMock() + fake_user.id = 999 + mock_manager_user = MagicMock() + mock_manager_user.get.return_value = fake_user + mock_get_user_model.return_value.objects = mock_manager_user + + # Create an instance of the view + view_instance = TokenView() + + with self.assertRaises(AccessDeniedTokenCustomError): + view_instance.validate_v3_token_call(request) diff --git a/apps/dot_ext/views/authorization.py b/apps/dot_ext/views/authorization.py index 9a372fe2d..64a1edb68 100644 --- a/apps/dot_ext/views/authorization.py +++ b/apps/dot_ext/views/authorization.py @@ -4,23 +4,28 @@ from functools import wraps from time import strftime +from django.conf import settings +from django.contrib.auth import get_user_model from django.contrib.auth.views import redirect_to_login from django.http import JsonResponse from django.http.response import HttpResponse, HttpResponseBadRequest from django.template.response import TemplateResponse +from django.core.exceptions import ObjectDoesNotExist from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from django.views.decorators.debug import sensitive_post_parameters from apps.dot_ext.constants import TOKEN_ENDPOINT_V3_KEY +from oauthlib.oauth2.rfc6749.errors import AccessDeniedError as AccessDeniedTokenCustomError from oauth2_provider.exceptions import OAuthToolkitError -from oauth2_provider.views.base import app_authorized, get_access_token_model +from oauth2_provider.views.base import app_authorized +from oauth2_provider.models import get_refresh_token_model, get_access_token_model from oauth2_provider.views.base import AuthorizationView as DotAuthorizationView from oauth2_provider.views.base import TokenView as DotTokenView from oauth2_provider.views.base import RevokeTokenView as DotRevokeTokenView from oauth2_provider.views.introspect import ( IntrospectTokenView as DotIntrospectTokenView, ) -from waffle import switch_is_active +from waffle import switch_is_active, get_waffle_flag_model from oauth2_provider.models import get_application_model from oauthlib.oauth2 import AccessDeniedError from oauthlib.oauth2.rfc6749.errors import InvalidClientError, InvalidGrantError, InvalidRequestError @@ -28,6 +33,7 @@ import html from apps.dot_ext.scopes import CapabilitiesScopes import apps.logging.request_logger as bb2logging +from apps.versions import Versions from ..signals import beneficiary_authorized_application from ..forms import SimpleAllowForm @@ -41,6 +47,7 @@ ) from ..models import Approval from ..utils import ( + get_api_version_number_from_url, remove_application_user_pair_tokens_data_access, validate_app_is_active, json_response_from_oauth2_error, @@ -84,6 +91,7 @@ class AuthorizationView(DotAuthorizationView): # TODO: rename this so that it isn't the same as self.version (works but confusing) # this needs to be here for urls.py as_view(version) calls, but don't use it version = 0 + # Variable to help reduce the amount of times validate_v3_authorization_request is called form_class = SimpleAllowForm login_url = "/mymedicare/login" @@ -116,6 +124,8 @@ def _check_for_required_params(self, request): error_message = "State parameter should have a minimum of 16 characters" return JsonResponse({"status_code": 400, "message": error_message}, status=400) + # BB2-4250: This code will not execute if the application is not in the v3_early_adopter flag + # so it will not be modified as part of BB2-4250 if switch_is_active('v3_endpoints') and v3: if 'scope' not in request.GET: missing_params.append("scope") @@ -140,6 +150,18 @@ def dispatch(self, request, *args, **kwargs): initially create an AuthFlowUuid object for authorization flow tracing in logs. """ + path_info = self.request.__dict__.get('path_info') + version = get_api_version_number_from_url(path_info) + # If it is not version 3, we don't need to check anything, just return + if version == Versions.V3: + try: + self.validate_v3_authorization_request() + except AccessDeniedTokenCustomError as e: + return JsonResponse( + {'status_code': 403, 'message': str(e)}, + status=403, + ) + # TODO: Should the client_id match a valid application here before continuing, instead of after matching to FHIR_ID? if not kwargs.get('is_subclass_approvalview', False): # Create new authorization flow trace UUID in session and AuthFlowUuid instance, if subclass is not ApprovalView @@ -224,6 +246,28 @@ def get(self, request, *args, **kwargs): kwargs['code_challenge_method'] = request.GET.get('code_challenge_method', None) return super().get(request, *args, **kwargs) + def validate_v3_authorization_request(self): + flag = get_waffle_flag_model().get('v3_early_adopter') + req_meta = self.request.META + url_query = parse_qs(req_meta.get('QUERY_STRING')) + client_id = url_query.get('client_id', [None]) + try: + application = get_application_model().objects.get(client_id=client_id[0]) + application_user = get_user_model().objects.get(id=application.user_id) + + if flag.id is None or flag.is_active_for_user(application_user): + # Update the class variable to ensure subsequent calls to dispatch don't call this function + # more times than is needed + return + else: + raise AccessDeniedTokenCustomError( + description=settings.APPLICATION_DOES_NOT_HAVE_V3_ENABLED_YET.format(application.name) + ) + except ObjectDoesNotExist: + raise AccessDeniedTokenCustomError( + description='Unable to verify permission.' + ) + def form_valid(self, form): client_id = form.cleaned_data["client_id"] application = get_application_model().objects.get(client_id=client_id) @@ -262,6 +306,7 @@ def form_valid(self, form): refresh_token_delete_cnt = 0 try: + if not scopes: # Since the create_authorization_response will re-inject scopes even when none are # valid, we want to pre-emptively treat this as an error case @@ -410,13 +455,48 @@ def validate_token_endpoint_request_body(self, request): description=f"Invalid parameters in request: {invalid_parameters}" ) + def validate_v3_token_call(self, request) -> None: + flag = get_waffle_flag_model().get('v3_early_adopter') + + try: + url_query = parse_qs(request._body.decode('utf-8')) + refresh_token_from_request = url_query.get('refresh_token', [None]) + refresh_token = get_refresh_token_model().objects.get(token=refresh_token_from_request[0]) + application = get_application_model().objects.get(id=refresh_token.application_id) + application_user = get_user_model().objects.get(id=application.user_id) + + if flag.id is None or flag.is_active_for_user(application_user): + return + else: + raise AccessDeniedTokenCustomError( + description=settings.APPLICATION_DOES_NOT_HAVE_V3_ENABLED_YET.format(application.name) + ) + except ObjectDoesNotExist: + raise AccessDeniedTokenCustomError( + description='Unable to verify permission.' + ) + @method_decorator(sensitive_post_parameters("password")) def post(self, request, *args, **kwargs): + path_info = self.request.__dict__.get('path_info') + version = get_api_version_number_from_url(path_info) + url_query = parse_qs(request._body.decode('utf-8')) + grant_type = url_query.get('grant_type', [None]) try: + # If it is not version 3, we don't need to check that the application is in the v3_early_adopter flag, + # just continue with standard validation. + # Also, we only want to execute this on refresh_token grant types, not authorization_code + if version == Versions.V3 and grant_type[0] and grant_type[0] == 'refresh_token': + self.validate_v3_token_call(request) self.validate_token_endpoint_request_body(request) app = validate_app_is_active(request) except (InvalidClientError, InvalidGrantError, InvalidRequestError) as error: return json_response_from_oauth2_error(error) + except AccessDeniedTokenCustomError as e: + return JsonResponse( + {'status_code': 403, 'message': str(e)}, + status=403, + ) url, headers, body, status = self.create_token_response(request) diff --git a/apps/fhir/bluebutton/permissions.py b/apps/fhir/bluebutton/permissions.py index cf3a62022..957697c7d 100644 --- a/apps/fhir/bluebutton/permissions.py +++ b/apps/fhir/bluebutton/permissions.py @@ -2,8 +2,11 @@ from django.conf import settings from django.contrib.auth import get_user_model +from oauth2_provider.views.base import get_access_token_model +from oauth2_provider.models import get_application_model from rest_framework import permissions, exceptions -from rest_framework.exceptions import AuthenticationFailed +from rest_framework.exceptions import AuthenticationFailed, PermissionDenied +from waffle import get_waffle_flag_model from .constants import ALLOWED_RESOURCE_TYPES from apps.versions import Versions, VersionNotMatched @@ -106,3 +109,22 @@ def has_permission(self, request, view): ) return True + + +class V3EarlyAdopterPermission(permissions.BasePermission): + def has_permission(self, request, view): + # if it is not version 3, we do not need to check the waffle switch or flag + if view.version < Versions.V3: + return True + + token = get_access_token_model().objects.get(token=request._auth) + application = get_application_model().objects.get(id=token.application_id) + application_user = get_user_model().objects.get(id=application.user_id) + flag = get_waffle_flag_model().get('v3_early_adopter') + + if flag.id is None or flag.is_active_for_user(application_user): + return True + else: + raise PermissionDenied( + settings.APPLICATION_DOES_NOT_HAVE_V3_ENABLED_YET.format(application.name) + ) diff --git a/apps/fhir/bluebutton/tests/test_wellknown_endpoints.py b/apps/fhir/bluebutton/tests/test_wellknown_endpoints.py index 719f4ef56..505d73277 100644 --- a/apps/fhir/bluebutton/tests/test_wellknown_endpoints.py +++ b/apps/fhir/bluebutton/tests/test_wellknown_endpoints.py @@ -3,10 +3,9 @@ from collections import namedtuple as NT from django.conf import settings from django.test.client import Client -from httmock import all_requests, HTTMock from oauth2_provider.models import get_access_token_model from unittest import skipIf -from waffle.testutils import override_switch +from waffle.testutils import override_switch, override_flag # Introduced in bb2-4184 # Rudimentary tests to make sure endpoints exist and are returning @@ -52,29 +51,30 @@ def setUp(self): @skipIf((not settings.RUN_ONLINE_TESTS), 'Can\'t reach external sites.') @override_switch('v3_endpoints', active=True) + @override_flag('v3_early_adopter', active=False) + def test_userinfo_returns_403(self): + first_access_token = self.create_token('John', 'Smith', fhir_id_v2=FHIR_ID_V2) + ac = AccessToken.objects.get(token=first_access_token) + ac.save() + + response = self.client.get( + f'{BASEURL}/v3/connect/userinfo', + Authorization='Bearer %s' % (first_access_token)) + self.assertEqual(response.status_code, 403) + self.assertEqual(response.json()['detail'], settings.APPLICATION_DOES_NOT_HAVE_V3_ENABLED_YET.format('John_Smith_test')) + + @skipIf((not settings.RUN_ONLINE_TESTS), 'Can\'t reach external sites.') + @override_switch('v3_endpoints', active=True) + @override_flag('v3_early_adopter', active=True) def test_userinfo_returns_200(self): first_access_token = self.create_token('John', 'Smith', fhir_id_v2=FHIR_ID_V2) ac = AccessToken.objects.get(token=first_access_token) ac.save() - @all_requests - def catchall(url, req): - return {'status_code': 200, - 'content': { - 'sub': FHIR_ID_V2, - 'name': ' ', - 'given_name': '', - 'family_name': '', - 'email': '', - 'iat': '2025-10-14T18:01:01.660Z', - 'patient': FHIR_ID_V2 - }} - - with HTTMock(catchall): - response = self.client.get( - f'{BASEURL}/v3/connect/userinfo', - Authorization='Bearer %s' % (first_access_token)) - self.assertEqual(response.status_code, 200) + response = self.client.get( + f'{BASEURL}/v3/connect/userinfo', + Authorization='Bearer %s' % (first_access_token)) + self.assertEqual(response.status_code, 200) # This makes sure URLs return 200s. @skipIf((not settings.RUN_ONLINE_TESTS), "Can't reach external sites.") diff --git a/apps/fhir/bluebutton/views/read.py b/apps/fhir/bluebutton/views/read.py index 2513d30b9..2b1f9d689 100644 --- a/apps/fhir/bluebutton/views/read.py +++ b/apps/fhir/bluebutton/views/read.py @@ -2,7 +2,12 @@ from apps.authorization.permissions import DataAccessGrantPermission from apps.capabilities.permissions import TokenHasProtectedCapability -from ..permissions import (ReadCrosswalkPermission, ResourcePermission, ApplicationActivePermission) +from ..permissions import ( + ReadCrosswalkPermission, + ResourcePermission, + ApplicationActivePermission, + V3EarlyAdopterPermission +) from apps.fhir.bluebutton.views.generic import FhirDataView @@ -21,6 +26,7 @@ class ReadView(FhirDataView): ReadCrosswalkPermission, DataAccessGrantPermission, TokenHasProtectedCapability, + V3EarlyAdopterPermission, ] def __init__(self, version=1): diff --git a/apps/fhir/bluebutton/views/search.py b/apps/fhir/bluebutton/views/search.py index b1cd87ec4..42601164a 100644 --- a/apps/fhir/bluebutton/views/search.py +++ b/apps/fhir/bluebutton/views/search.py @@ -16,7 +16,12 @@ from apps.fhir.bluebutton.views.generic import FhirDataView from apps.authorization.permissions import DataAccessGrantPermission from apps.capabilities.permissions import TokenHasProtectedCapability -from ..permissions import (SearchCrosswalkPermission, ResourcePermission, ApplicationActivePermission) +from ..permissions import ( + SearchCrosswalkPermission, + ResourcePermission, + ApplicationActivePermission, + V3EarlyAdopterPermission +) class HasSearchScope(permissions.BasePermission): @@ -42,7 +47,8 @@ class SearchView(FhirDataView): SearchCrosswalkPermission, DataAccessGrantPermission, TokenHasProtectedCapability, - HasSearchScope + HasSearchScope, + V3EarlyAdopterPermission, ] # Regex to match a valid _lastUpdated value that can begin with lt, le, gt and ge operators @@ -180,6 +186,8 @@ def filter_parameters(self, request): query_schema = getattr(self, "QUERY_SCHEMA", {}) + # BB2-4250: Does not seem that this code will execute given the new permission class + # so leaving it as is if waffle.switch_is_active('v3_endpoints'): query_schema['_tag'] = self.validate_tag() # _tag if presents, is a string value diff --git a/apps/integration_tests/integration_test_fhir_resources.py b/apps/integration_tests/integration_test_fhir_resources.py index dc68c5833..38a5f1e53 100644 --- a/apps/integration_tests/integration_test_fhir_resources.py +++ b/apps/integration_tests/integration_test_fhir_resources.py @@ -2,10 +2,12 @@ from django.conf import settings from django.contrib.staticfiles.testing import StaticLiveServerTestCase +from http import HTTPStatus from oauth2_provider.models import AccessToken from rest_framework.test import APIClient from waffle.testutils import override_switch +from apps.core.models import Flag from apps.test import BaseApiTest from apps.testclient.utils import extract_last_page_index from .common_utils import validate_json_schema @@ -36,6 +38,9 @@ FHIR_RES_TYPE_EOB = "ExplanationOfBenefit" FHIR_RES_TYPE_PATIENT = "Patient" FHIR_RES_TYPE_COVERAGE = "Coverage" +V3_403_DETAIL = 'This application, John_Doe_test, does not yet have access to v3 endpoints. ' \ + 'If you are the app maintainer, please contact the Blue Button API team. If you are a Medicare Beneficiary ' \ + 'and need assistance, please contact the support team for the application you are trying to access.' def dump_content(json_str, file_name): @@ -759,3 +764,65 @@ def _err_response_caused_by_illegalarguments(self, v2=False): # This now returns 400 after BB2-2063 work. # for both v1 and v2 self.assertEqual(response.status_code, 400) + + @override_switch('v3_endpoints', active=True) + def test_patient_read_endpoint_v3_403(self): + ''' + test patient read v3 throwing a 403 when an app is not in the flag + TODO - Should be removed when v3_early_adopter flag is deleted and v3 is available for all apps + ''' + self._call_v3_endpoint_to_assert_403(FHIR_RES_TYPE_PATIENT, settings.DEFAULT_SAMPLE_FHIR_ID_V3, False, None) + + @override_switch('v3_endpoints', active=True) + def test_coverage_read_endpoint_v3_403(self): + ''' + test coverage read v3 throwing a 403 when an app is not in the flag + TODO - Should be removed when v3_early_adopter flag is deleted and v3 is available for all apps + ''' + self._call_v3_endpoint_to_assert_403(FHIR_RES_TYPE_COVERAGE, 'part-a-99999999999999', False, None) + + @override_switch('v3_endpoints', active=True) + def test_eob_read_endpoint_v3_403(self): + ''' + test eob read v3 throwing a 403 when an app is not in the flag + TODO - Should be removed when v3_early_adopter flag is deleted and v3 is available for all apps + ''' + self._call_v3_endpoint_to_assert_403(FHIR_RES_TYPE_EOB, 'outpatient--9999999999999', False, None) + + @override_switch('v3_endpoints', active=True) + def test_patient_search_endpoint_v3_403(self): + ''' + test patient search v3 throwing a 403 when an app is not in the flag + TODO - Should be removed when v3_early_adopter flag is deleted and v3 is available for all apps + ''' + self._call_v3_endpoint_to_assert_403(FHIR_RES_TYPE_PATIENT, settings.DEFAULT_SAMPLE_FHIR_ID_V3, True, '_id=') + + @override_switch('v3_endpoints', active=True) + def test_coverage_search_endpoint_v3_403(self): + ''' + test coverage search v3 throwing a 403 when an app is not in the flag + TODO - Should be removed when v3_early_adopter flag is deleted and v3 is available for all apps + ''' + self._call_v3_endpoint_to_assert_403(FHIR_RES_TYPE_COVERAGE, settings.DEFAULT_SAMPLE_FHIR_ID_V3, True, 'beneficiary=') + + @override_switch('v3_endpoints', active=True) + def test_eob_search_endpoint_v3_403(self): + ''' + test eob search v3 throwing a 403 when an app is not in the flag + TODO - Should be removed when v3_early_adopter flag is deleted and v3 is available for all apps + ''' + self._call_v3_endpoint_to_assert_403(FHIR_RES_TYPE_EOB, settings.DEFAULT_SAMPLE_FHIR_ID_V3, True, 'patient=') + + def _call_v3_endpoint_to_assert_403(self, resource_type: str, resource_value: str, search: bool, search_param: str): + client = APIClient() + + # Authenticate + self._setup_apiclient(client) + Flag.objects.create(name='v3_early_adopter', everyone=None) + if search: + endpoint_url = "{}/v3/fhir/{}/?{}{}".format(self.live_server_url, resource_type, search_param, resource_value) + else: + endpoint_url = "{}/v3/fhir/{}/{}".format(self.live_server_url, resource_type, resource_value) + response = client.get(endpoint_url) + self.assertEqual(response.status_code, HTTPStatus.FORBIDDEN) + self.assertEqual(response.json()['detail'], V3_403_DETAIL) diff --git a/apps/wellknown/permissions.py b/apps/wellknown/permissions.py new file mode 100644 index 000000000..ccd92aa5c --- /dev/null +++ b/apps/wellknown/permissions.py @@ -0,0 +1,36 @@ +import logging + +from django.conf import settings +from django.contrib.auth import get_user_model +from oauth2_provider.views.base import get_access_token_model +from oauth2_provider.models import get_application_model +from rest_framework import permissions +from rest_framework.exceptions import PermissionDenied +from waffle import get_waffle_flag_model +from apps.versions import Versions + +import apps.logging.request_logger as bb2logging + +logger = logging.getLogger(bb2logging.HHS_SERVER_LOGNAME_FMT.format(__name__)) + + +class V3EarlyAdopterWellKnownPermission(permissions.BasePermission): + # BB2-4250: Handling to ensure this only returns successfully if the flag is enabled for the application + # associated with the user making the call + def has_permission(self, request, view): + version = view.kwargs.get('version') + # if it is not version 3, we do not need to check the waffle switch or flag + if version < Versions.V3: + return True + + token = get_access_token_model().objects.get(token=request._auth) + application = get_application_model().objects.get(id=token.application_id) + application_user = get_user_model().objects.get(id=application.user_id) + flag = get_waffle_flag_model().get('v3_early_adopter') + + if flag.id is None or flag.is_active_for_user(application_user): + return True + else: + raise PermissionDenied( + settings.APPLICATION_DOES_NOT_HAVE_V3_ENABLED_YET.format(application.name) + ) diff --git a/hhs_oauth_server/settings/base.py b/hhs_oauth_server/settings/base.py index df12667d8..37b829549 100644 --- a/hhs_oauth_server/settings/base.py +++ b/hhs_oauth_server/settings/base.py @@ -643,6 +643,13 @@ 'and consent to share their data.' ) +APPLICATION_DOES_NOT_HAVE_V3_ENABLED_YET = ( + 'This application, {}, does not yet have access to v3 endpoints.' + ' If you are the app maintainer, please contact the Blue Button API team.' + ' If you are a Medicare Beneficiary and need assistance, please contact' + ' the support team for the application you are trying to access.' +) + FHIR_CLIENT_CERTSTORE = env( "DJANGO_FHIR_CERTSTORE", os.path.join(BASE_DIR, os.environ.get("DJANGO_FHIR_CERTSTORE_REL", "../certstore")),