Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ reqs-install:

reqs-install-dev:
pip install -r requirements/requirements.dev.txt --no-index --find-links ./vendor/

build-local:
cd dev-local ; make build-local ; cd ..

run-local:
cd dev-local ; make run-local ; cd ..
11 changes: 7 additions & 4 deletions apps/authorization/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,13 @@ def update_grants(*args, **kwargs):
tokens = AccessToken.objects.all()
for token in tokens:
if token.is_valid():
DataAccessGrant.objects.get_or_create(
beneficiary=token.user,
application=token.application,
)
try:
DataAccessGrant.objects.get_or_create(
beneficiary=token.user,
application=token.application,
)
except Exception as e:
print(f"update_grants: {e}")


def check_grants():
Expand Down
4 changes: 2 additions & 2 deletions apps/core/management/commands/create_test_feature_switches.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def handle(self, *args, **options):
for switch in WAFFLE_FEATURE_SWITCHES:
try:
Switch.objects.get(name=switch[0])
self._log("Feature switch already exists: %s" % (str(switch)))
# self._log("Feature switch already exists: %s" % (str(switch)))
except Switch.DoesNotExist:
Switch.objects.create(name=switch[0], active=switch[1], note=switch[2])
self._log("Feature switch created: %s" % (str(switch)))
Expand All @@ -46,7 +46,7 @@ def handle(self, *args, **options):

try:
flag_obj = Flag.objects.get(name=flag[0])
self._log("Feature flag already exists: %s" % (str(flag_obj)))
# self._log("Feature flag already exists: %s" % (str(flag_obj)))
except Flag.DoesNotExist:
flag_obj = Flag.objects.create(name=flag[0])
self._log("Feature flag created: %s" % (str(flag[0])))
Expand Down
49 changes: 26 additions & 23 deletions apps/dot_ext/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
SCOPES_TO_URL_BASE_PATH,
)

import os

from hhs_oauth_server.settings.base import MOCK_FHIR_ENDPOINT_HOSTNAME


Expand Down Expand Up @@ -576,16 +578,15 @@ def test_delete_token_success(self):

# This assertion is incorrectly crafted - it actually requires a local server started
# so that the fhir fetch data is called and hence generate cert file not found error.
# TODO: refactor test to not depend on a server up and running.

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + anna_token.token}
)
# 20251120 This test is now gated on a variable; if the variable does not exist, or
# is not set, the test will run. This is the desired behavior.
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + anna_token.token}
)

bob_tkn = self._create_test_token(bob, bob_application)
self.assertTrue(
Expand Down Expand Up @@ -638,24 +639,26 @@ def test_delete_token_success(self):

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + bob_tkn.token}
)
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + bob_tkn.token}
)

next_tkn = self._create_test_token(anna, anna_application)

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient',
headers={'authorization': 'Bearer ' + next_tkn.token},
)
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient',
headers={'authorization': 'Bearer ' + next_tkn.token},
)

# self.assertEqual(next_tkn.token, tkn.token)
self.assertTrue(
Expand Down
162 changes: 107 additions & 55 deletions apps/testclient/management/commands/create_test_user_and_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from datetime import timedelta, datetime
from django.conf import settings
from apps.authorization.models import update_grants
from apps.authorization.models import ArchivedDataAccessGrant, DataAccessGrant

# Imports for quieting things during startup.
from waffle.models import Switch


def create_group(name="BlueButton"):
Expand All @@ -24,57 +28,95 @@ def create_group(name="BlueButton"):
return g


def get_switch(name):
try:
sw = Switch.objects.get(name=name)
return sw.active
except Exception as e:
print(f"Could not get switch {name}: {e}")


def set_switch(name, b):
# DISABLE SOME WAFFLE SWITCHES
# We don't want email, etc.
sw, _ = Switch.objects.get_or_create(name=name)
sw.active = b
sw.save()

# usr would be a string if it is anything


def create_user(group, usr):
u_name = "fred"
first_name = "Fred"
last_name = "Flinstone"
email = "fred@example.com"
password = "foobarfoobarfoobar"
u_name = "rogersf"
first_name = "Fred"
last_name = "Rogers"
email = "fred@landofmakebelieve.gov"
password = "danielthetiger"
user_type = "BEN"

if usr is not None:
u_name = usr
first_name = "{}{}".format(usr, "First")
first_name = "{}{}".format(usr, "First")
last_name = "{}{}".format(usr, "Last")
email = "{}.{}@example.com".format(first_name, last_name)
email = "{}.{}@{}".format(first_name, last_name, email)
user_type = "DEV"

# This violates constraints on other tables.
usr_q = User.objects.filter(username=u_name)
if usr_q.exists():
# Delete any ADAGs for this user, or we will run into a
# constraint issue at startup.
count = ArchivedDataAccessGrant.objects.filter(beneficiary=usr_q.first()).delete()
print(f"Deleted {count} ADAGs for {u_name}")
count = DataAccessGrant.objects.filter(beneficiary=usr_q.first()).delete()
print(f"Deleted {count} ADAGs for {u_name}")

if User.objects.filter(username=u_name).exists():
User.objects.filter(username=u_name).delete()

u = None

if usr is not None:
u = User.objects.create_user(username=u_name,
first_name=first_name,
last_name=last_name,
email=email)
u.set_unusable_password()
try:
u, _ = User.objects.get_or_create(username=u_name,
first_name=first_name,
last_name=last_name,
email=email,
signals_to_disable=["post_save"])
u.set_unusable_password()
except Exception as e:
print(f"Did not create user: {e}")
else:
# create a sample user 'fred' for dev local that has a usable password
u = User.objects.create_user(username=u_name,
first_name=first_name,
last_name=last_name,
email=email,
password=password,)

UserProfile.objects.create(user=u,
user_type=user_type,
create_applications=True,
password_reset_question_1='1',
password_reset_answer_1='blue',
password_reset_question_2='2',
password_reset_answer_2='Frank',
password_reset_question_3='3',
password_reset_answer_3='Bentley')

u.groups.add(group)

if usr is None:
c, g_o_c = Crosswalk.objects.get_or_create(user=u,
fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2,
_user_id_hash="ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536")
try:
# get_or_create returns a tuple (v, bool)
u, _ = User.objects.get_or_create(username=u_name,
first_name=first_name,
last_name=last_name,
email=email,
password=password,)

UserProfile.objects.create(user=u,
user_type=user_type,
create_applications=True,
password_reset_question_1='1',
password_reset_answer_1='blue',
password_reset_question_2='2',
password_reset_answer_2='Frank',
password_reset_question_3='3',
password_reset_answer_3='Bentley')
except Exception as e:
print(f"Did not create user and profile: {e}")

if u is None:
print(f"Error creating user; exiting.")
else:
u.groups.add(group)

user_id_hash = "ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536"
Crosswalk.objects.filter(_user_id_hash=user_id_hash).delete()
c, _ = Crosswalk.objects.get_or_create(user=u,
fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2,
_user_id_hash=user_id_hash)
return u


Expand All @@ -86,26 +128,29 @@ def create_application(user, group, app, redirect):
if redirect:
redirect_uri = redirect

if not(redirect_uri.startswith("http://") or redirect_uri.startswith("https://")):
if not (redirect_uri.startswith("http://") or redirect_uri.startswith("https://")):
redirect_uri = "https://" + redirect_uri

a = Application.objects.create(name=app_name,
redirect_uris=redirect_uri,
user=user,
data_access_type="THIRTEEN_MONTH",
client_type="confidential",
authorization_grant_type="authorization-code")
try:
a = Application.objects.create(name=app_name,
redirect_uris=redirect_uri,
user=user,
data_access_type="THIRTEEN_MONTH",
client_type="confidential",
authorization_grant_type="authorization-code",)

titles = ["My Medicare and supplemental coverage information.",
"My Medicare claim information.",
"My general patient and demographic information.",
"Profile information including name and email."
]
titles = ["My Medicare and supplemental coverage information.",
"My Medicare claim information.",
"My general patient and demographic information.",
"Profile information including name and email."
]

for t in titles:
c = ProtectedCapability.objects.get(title=t)
a.scope.add(c)
return a
for t in titles:
c = ProtectedCapability.objects.get(title=t)
a.scope.add(c)
return a
except Exception as e:
print(f"Skipped creation of {app_name}: {e}")


def create_test_token(user, application):
Expand All @@ -121,7 +166,8 @@ def create_test_token(user, application):
t = AccessToken.objects.create(user=user, application=application,
token="sample-token-string",
expires=expires,
scope=' '.join(scope))
scope=' '.join(scope),)

return t


Expand All @@ -134,12 +180,15 @@ def add_arguments(self, parser):
parser.add_argument("-r", "--redirect", help="Redirect url of the application.")

def handle(self, *args, **options):
usr = options["user"]
app = options["app"]
usr = options.get("user", None)
app = options.get("app", None)
redirect = options["redirect"]

set_switch('outreach_email', False)

g = create_group()
u = create_user(g, usr)
print(f"Created user {u}")
a = create_application(u, g, app, redirect)
t = None
if usr is None and app is None:
Expand All @@ -150,3 +199,6 @@ def handle(self, *args, **options):
print("client_secret:", a.client_secret)
print("access_token:", t.token if t else "None")
print("redirect_uri:", a.redirect_uris)

# Restore switch to whatever it was.
set_switch('outreach_email', True)
Loading
Loading