Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions apps/authorization/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,13 @@ def update_grants(*args, **kwargs):
tokens = AccessToken.objects.all()
for token in tokens:
if token.is_valid():
DataAccessGrant.objects.get_or_create(
beneficiary=token.user,
application=token.application,
)
try:
DataAccessGrant.objects.get_or_create(
beneficiary=token.user,
application=token.application,
)
except Exception as e:
print(f"update_grants: {e}")


def check_grants():
Expand Down
4 changes: 2 additions & 2 deletions apps/core/management/commands/create_test_feature_switches.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def handle(self, *args, **options):
for switch in WAFFLE_FEATURE_SWITCHES:
try:
Switch.objects.get(name=switch[0])
self._log("Feature switch already exists: %s" % (str(switch)))
# self._log("Feature switch already exists: %s" % (str(switch)))
except Switch.DoesNotExist:
Switch.objects.create(name=switch[0], active=switch[1], note=switch[2])
self._log("Feature switch created: %s" % (str(switch)))
Expand All @@ -46,7 +46,7 @@ def handle(self, *args, **options):

try:
flag_obj = Flag.objects.get(name=flag[0])
self._log("Feature flag already exists: %s" % (str(flag_obj)))
# self._log("Feature flag already exists: %s" % (str(flag_obj)))
except Flag.DoesNotExist:
flag_obj = Flag.objects.create(name=flag[0])
self._log("Feature flag created: %s" % (str(flag[0])))
Expand Down
49 changes: 26 additions & 23 deletions apps/dot_ext/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
SCOPES_TO_URL_BASE_PATH,
)

import os

from hhs_oauth_server.settings.base import MOCK_FHIR_ENDPOINT_HOSTNAME


Expand Down Expand Up @@ -576,16 +578,15 @@ def test_delete_token_success(self):

# This assertion is incorrectly crafted - it actually requires a local server started
# so that the fhir fetch data is called and hence generate cert file not found error.
# TODO: refactor test to not depend on a server up and running.

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + anna_token.token}
)
# 20251120 This test is now gated on a variable; if the variable does not exist, or
# is not set, the test will run. This is the desired behavior.
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + anna_token.token}
)

bob_tkn = self._create_test_token(bob, bob_application)
self.assertTrue(
Expand Down Expand Up @@ -638,24 +639,26 @@ def test_delete_token_success(self):

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + bob_tkn.token}
)
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + bob_tkn.token}
)

next_tkn = self._create_test_token(anna, anna_application)

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient',
headers={'authorization': 'Bearer ' + next_tkn.token},
)
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient',
headers={'authorization': 'Bearer ' + next_tkn.token},
)

# self.assertEqual(next_tkn.token, tkn.token)
self.assertTrue(
Expand Down
162 changes: 107 additions & 55 deletions apps/testclient/management/commands/create_test_user_and_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from datetime import timedelta, datetime
from django.conf import settings
from apps.authorization.models import update_grants
from apps.authorization.models import ArchivedDataAccessGrant, DataAccessGrant

# Imports for quieting things during startup.
from waffle.models import Switch


def create_group(name="BlueButton"):
Expand All @@ -24,57 +28,95 @@ def create_group(name="BlueButton"):
return g


def get_switch(name):
try:
sw = Switch.objects.get(name=name)
return sw.active
except Exception as e:
print(f"Could not get switch {name}: {e}")


def set_switch(name, b):
# DISABLE SOME WAFFLE SWITCHES
# We don't want email, etc.
sw, _ = Switch.objects.get_or_create(name=name)
sw.active = b
sw.save()

# usr would be a string if it is anything


def create_user(group, usr):
u_name = "fred"
first_name = "Fred"
last_name = "Flinstone"
email = "fred@example.com"
password = "foobarfoobarfoobar"
u_name = "rogersf"
first_name = "Fred"
last_name = "Rogers"
email = "fred@landofmakebelieve.gov"
password = "danielthetiger"
user_type = "BEN"

if usr is not None:
u_name = usr
first_name = "{}{}".format(usr, "First")
first_name = "{}{}".format(usr, "First")
last_name = "{}{}".format(usr, "Last")
email = "{}.{}@example.com".format(first_name, last_name)
email = "{}.{}@{}".format(first_name, last_name, email)
user_type = "DEV"

# This violates constraints on other tables.
usr_q = User.objects.filter(username=u_name)
if usr_q.exists():
# Delete any ADAGs for this user, or we will run into a
# constraint issue at startup.
count = ArchivedDataAccessGrant.objects.filter(beneficiary=usr_q.first()).delete()
print(f"Deleted {count} ADAGs for {u_name}")
count = DataAccessGrant.objects.filter(beneficiary=usr_q.first()).delete()
print(f"Deleted {count} ADAGs for {u_name}")

if User.objects.filter(username=u_name).exists():
User.objects.filter(username=u_name).delete()

u = None

if usr is not None:
u = User.objects.create_user(username=u_name,
first_name=first_name,
last_name=last_name,
email=email)
u.set_unusable_password()
try:
u, _ = User.objects.get_or_create(username=u_name,
first_name=first_name,
last_name=last_name,
email=email,
signals_to_disable=["post_save"])
u.set_unusable_password()
except Exception as e:
print(f"Did not create user: {e}")
else:
# create a sample user 'fred' for dev local that has a usable password
u = User.objects.create_user(username=u_name,
first_name=first_name,
last_name=last_name,
email=email,
password=password,)

UserProfile.objects.create(user=u,
user_type=user_type,
create_applications=True,
password_reset_question_1='1',
password_reset_answer_1='blue',
password_reset_question_2='2',
password_reset_answer_2='Frank',
password_reset_question_3='3',
password_reset_answer_3='Bentley')

u.groups.add(group)

if usr is None:
c, g_o_c = Crosswalk.objects.get_or_create(user=u,
fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2,
_user_id_hash="ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536")
try:
# get_or_create returns a tuple (v, bool)
u, _ = User.objects.get_or_create(username=u_name,
first_name=first_name,
last_name=last_name,
email=email,
password=password,)

UserProfile.objects.create(user=u,
user_type=user_type,
create_applications=True,
password_reset_question_1='1',
password_reset_answer_1='blue',
password_reset_question_2='2',
password_reset_answer_2='Frank',
password_reset_question_3='3',
password_reset_answer_3='Bentley')
except Exception as e:
print(f"Did not create user and profile: {e}")

if u is None:
print(f"Error creating user; exiting.")
else:
u.groups.add(group)

user_id_hash = "ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536"
Crosswalk.objects.filter(_user_id_hash=user_id_hash).delete()
c, _ = Crosswalk.objects.get_or_create(user=u,
fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2,
_user_id_hash=user_id_hash)
return u


Expand All @@ -86,26 +128,29 @@ def create_application(user, group, app, redirect):
if redirect:
redirect_uri = redirect

if not(redirect_uri.startswith("http://") or redirect_uri.startswith("https://")):
if not (redirect_uri.startswith("http://") or redirect_uri.startswith("https://")):
redirect_uri = "https://" + redirect_uri

a = Application.objects.create(name=app_name,
redirect_uris=redirect_uri,
user=user,
data_access_type="THIRTEEN_MONTH",
client_type="confidential",
authorization_grant_type="authorization-code")
try:
a = Application.objects.create(name=app_name,
redirect_uris=redirect_uri,
user=user,
data_access_type="THIRTEEN_MONTH",
client_type="confidential",
authorization_grant_type="authorization-code",)

titles = ["My Medicare and supplemental coverage information.",
"My Medicare claim information.",
"My general patient and demographic information.",
"Profile information including name and email."
]
titles = ["My Medicare and supplemental coverage information.",
"My Medicare claim information.",
"My general patient and demographic information.",
"Profile information including name and email."
]

for t in titles:
c = ProtectedCapability.objects.get(title=t)
a.scope.add(c)
return a
for t in titles:
c = ProtectedCapability.objects.get(title=t)
a.scope.add(c)
return a
except Exception as e:
print(f"Skipped creation of {app_name}: {e}")


def create_test_token(user, application):
Expand All @@ -121,7 +166,8 @@ def create_test_token(user, application):
t = AccessToken.objects.create(user=user, application=application,
token="sample-token-string",
expires=expires,
scope=' '.join(scope))
scope=' '.join(scope),)

return t


Expand All @@ -134,12 +180,15 @@ def add_arguments(self, parser):
parser.add_argument("-r", "--redirect", help="Redirect url of the application.")

def handle(self, *args, **options):
usr = options["user"]
app = options["app"]
usr = options.get("user", None)
app = options.get("app", None)
redirect = options["redirect"]

set_switch('outreach_email', False)

g = create_group()
u = create_user(g, usr)
print(f"Created user {u}")
a = create_application(u, g, app, redirect)
t = None
if usr is None and app is None:
Expand All @@ -150,3 +199,6 @@ def handle(self, *args, **options):
print("client_secret:", a.client_secret)
print("access_token:", t.token if t else "None")
print("redirect_uri:", a.redirect_uris)

# Restore switch to whatever it was.
set_switch('outreach_email', True)
Loading
Loading