Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ reqs-install:

reqs-install-dev:
pip install -r requirements/requirements.dev.txt --no-index --find-links ./vendor/

build-local:
cd dev-local ; make build-local ; cd ..

run-local:
cd dev-local ; make run-local ; cd ..
1 change: 0 additions & 1 deletion apps/authorization/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class Meta:
fields = ('id', 'name', 'logo_uri', 'tos_uri', 'policy_uri', 'contacts')

def get_contacts(self, obj):
print(obj)
application = Application.objects.get(id=obj.id)
return application.support_email or ""

Expand Down
3 changes: 0 additions & 3 deletions apps/core/management/commands/create_test_feature_switches.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def handle(self, *args, **options):
for switch in WAFFLE_FEATURE_SWITCHES:
try:
Switch.objects.get(name=switch[0])
self._log("Feature switch already exists: %s" % (str(switch)))
except Switch.DoesNotExist:
Switch.objects.create(name=switch[0], active=switch[1], note=switch[2])
self._log("Feature switch created: %s" % (str(switch)))
Expand All @@ -46,7 +45,6 @@ def handle(self, *args, **options):

try:
flag_obj = Flag.objects.get(name=flag[0])
self._log("Feature flag already exists: %s" % (str(flag_obj)))
except Flag.DoesNotExist:
flag_obj = Flag.objects.create(name=flag[0])
self._log("Feature flag created: %s" % (str(flag[0])))
Expand All @@ -62,7 +60,6 @@ def handle(self, *args, **options):
flag_obj.save()
self._log("User {} added to feature flag: {}".format(u, flag))
except Exception as e:
print(e)
self._log("Exception when adding user {} to feature flag: {}".format(u, flag))
except User.DoesNotExist:
# assuming test users exist before creating flags associated with them
Expand Down
49 changes: 26 additions & 23 deletions apps/dot_ext/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
SCOPES_TO_URL_BASE_PATH,
)

import os

from hhs_oauth_server.settings.base import MOCK_FHIR_ENDPOINT_HOSTNAME


Expand Down Expand Up @@ -576,16 +578,15 @@ def test_delete_token_success(self):

# This assertion is incorrectly crafted - it actually requires a local server started
# so that the fhir fetch data is called and hence generate cert file not found error.
# TODO: refactor test to not depend on a server up and running.

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + anna_token.token}
)
# 20251120 This test is now gated on a variable; if the variable does not exist, or
# is not set, the test will run. This is the desired behavior.
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + anna_token.token}
)

bob_tkn = self._create_test_token(bob, bob_application)
self.assertTrue(
Expand Down Expand Up @@ -638,24 +639,26 @@ def test_delete_token_success(self):

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + bob_tkn.token}
)
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient', headers={'authorization': 'Bearer ' + bob_tkn.token}
)

next_tkn = self._create_test_token(anna, anna_application)

# Post Django 2.2: An OSError exception is expected when trying to reach the
# backend FHIR server and proves authentication worked.
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient',
headers={'authorization': 'Bearer ' + next_tkn.token},
)
if os.getenv("RUNNING_IN_LOCAL_STACK", None) != "true":
with self.assertRaisesRegexp(
OSError, 'Could not find the TLS certificate file'
):
response = self.client.get(
'/v1/fhir/Patient',
headers={'authorization': 'Bearer ' + next_tkn.token},
)

# self.assertEqual(next_tkn.token, tkn.token)
self.assertTrue(
Expand Down
5 changes: 2 additions & 3 deletions apps/fhir/bluebutton/tests/test_wellknown_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,10 @@ def test_smart_configuration_missing_fields_in_v3(self):
# is commented above for reference.

@skipIf((not settings.RUN_ONLINE_TESTS), "Can't reach external sites.")
# This overrides the switch and sets it to true, always.
# We should only run the test if we have v3 enabled.
@override_switch('v3_endpoints', active=True)
def test_fhir_metadata_extensions_have_v3(self):
response = self.client.get(f'{BASEURL}/v3/fhir/metadata')
the_url = f'{BASEURL}/v3/fhir/metadata'
response = self.client.get(the_url)
self.assertEqual(response.status_code, 200)
json = response.json()
self.assertIn('v3', json['implementation']['url'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
from datetime import timedelta, datetime
from django.conf import settings
from apps.authorization.models import update_grants
from apps.authorization.models import ArchivedDataAccessGrant, DataAccessGrant

# Imports for quieting things during startup.
from waffle.models import Switch

from uuid import uuid4

def create_group(name="BlueButton"):

def create_group(name="BlueButton"):
g, created = Group.objects.get_or_create(name=name)
if created:
print("%s group created" % (name))
Expand All @@ -24,42 +29,29 @@ def create_group(name="BlueButton"):
return g


def create_user(group, usr):
u_name = "fred"
first_name = "Fred"
last_name = "Flinstone"
email = "[email protected]"
password = "foobarfoobarfoobar"
def create_user(the_group):
username = "rogersf"
first_name = "Fred"
last_name = "Rogers"
email = "[email protected]"
password = uuid4()
user_type = "BEN"

if usr is not None:
u_name = usr
first_name = "{}{}".format(usr, "First")
last_name = "{}{}".format(usr, "Last")
email = "{}.{}@example.com".format(first_name, last_name)
user_type = "DEV"


if User.objects.filter(username=u_name).exists():
User.objects.filter(username=u_name).delete()

u = None

if usr is not None:
u = User.objects.create_user(username=u_name,
first_name=first_name,
last_name=last_name,
email=email)
u.set_unusable_password()
else:
# create a sample user 'fred' for dev local that has a usable password
u = User.objects.create_user(username=u_name,
first_name=first_name,
last_name=last_name,
email=email,
password=password,)

UserProfile.objects.create(user=u,

# We will do this over-and-over.
# If we don't already exist, then create the user.
if User.objects.filter(username=username).exists():
print(f"👟 {username} already exists. Skipping test user creation.")
return User.objects.get(username=username)

# If the user didn't exist, it is our first time through.
# Create the user.
user_obj = User.objects.create(username=username,
first_name=first_name,
last_name=last_name,
email=email,
password=password,)
user_obj.set_unusable_password()
UserProfile.objects.create(user=user_obj,
user_type=user_type,
create_applications=True,
password_reset_question_1='1',
Expand All @@ -68,33 +60,35 @@ def create_user(group, usr):
password_reset_answer_2='Frank',
password_reset_question_3='3',
password_reset_answer_3='Bentley')
user_obj.groups.add(the_group)

u.groups.add(group)
# CROSSWALK
# Removing any existing crosswalks for this artificial user.
# Why? Just in case.
user_id_hash = "ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536"
Crosswalk.objects.filter(_user_id_hash=user_id_hash).delete()
Crosswalk.objects.get_or_create(user=user_obj,
fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2,
_user_id_hash=user_id_hash)
return user_obj

if usr is None:
c, g_o_c = Crosswalk.objects.get_or_create(user=u,
fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2,
_user_id_hash="ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536")
return u

def create_application(user):
app_name = "TestApp"
if Application.objects.filter(name=app_name).exists():
return Application.objects.get(name=app_name)

# If the app doesn't exist, create the test app.

def create_application(user, group, app, redirect):
app_name = "TestApp" if app is None else app
Application.objects.filter(name=app_name).delete()
redirect_uri = "{}{}".format(settings.HOSTNAME_URL, settings.TESTCLIENT_REDIRECT_URI)

if redirect:
redirect_uri = redirect

if not(redirect_uri.startswith("http://") or redirect_uri.startswith("https://")):
redirect_uri = "https://" + redirect_uri

a = Application.objects.create(name=app_name,
redirect_uris=redirect_uri,
user=user,
data_access_type="THIRTEEN_MONTH",
client_type="confidential",
authorization_grant_type="authorization-code")
the_app = Application.objects.create(name=app_name,
redirect_uris=redirect_uri,
user=user,
data_access_type="THIRTEEN_MONTH",
client_type="confidential",
authorization_grant_type="authorization-code",)

titles = ["My Medicare and supplemental coverage information.",
"My Medicare claim information.",
Expand All @@ -104,49 +98,64 @@ def create_application(user, group, app, redirect):

for t in titles:
c = ProtectedCapability.objects.get(title=t)
a.scope.add(c)
return a
the_app.scope.add(c)

return the_app


def create_test_token(user, application):
def create_test_token(the_user, the_app):

# Set expiration one day from now.
now = timezone.now()
expires = now + timedelta(days=1)

scopes = application.scope.all()
scopes = the_app.scope.all()
scope = []
for s in scopes:
scope.append(s.slug)

t = AccessToken.objects.create(user=user, application=application,
# We have to have a tokent with token="sample-token-string", because we
# rely on it existing for unit tests. Which are actually integration tests.
if AccessToken.objects.filter(token="sample-token-string").exists():
t = AccessToken.objects.get(token="sample-token-string")
t.expires = expires
t.save()
else:
AccessToken.objects.create(user=the_user,
application=the_app,
# This needs to be "sample-token-string", because
# we have tests that rely on it.
token="sample-token-string",
expires=expires,
scope=' '.join(scope))
return t
scope=' '.join(scope),)


def get_switch(name):
try:
sw = Switch.objects.get(name=name)
return sw.active
except Exception as e:
print(f"Could not get switch {name}: {e}")


def set_switch(name, b):
sw, _ = Switch.objects.get_or_create(name=name)
sw.active = b
sw.save()


class Command(BaseCommand):
help = 'Create a test user and application for the test client'

def add_arguments(self, parser):
parser.add_argument("-u", "--user", help="Name of the user to be created (unique).")
parser.add_argument("-a", "--app", help="Name of the application to be created (unique).")
parser.add_argument("-r", "--redirect", help="Redirect url of the application.")

def handle(self, *args, **options):
usr = options["user"]
app = options["app"]
redirect = options["redirect"]

g = create_group()
u = create_user(g, usr)
a = create_application(u, g, app, redirect)
t = None
if usr is None and app is None:
t = create_test_token(u, a)
update_grants()
print("Name:", a.name)
print("client_id:", a.client_id)
print("client_secret:", a.client_secret)
print("access_token:", t.token if t else "None")
print("redirect_uri:", a.redirect_uris)

set_switch('outreach_email', False)

the_group = create_group()
the_user = create_user(the_group)
the_app = create_application(the_user)
create_test_token(the_user, the_app)
update_grants()

# Restore switch to whatever it was.
set_switch('outreach_email', True)
Loading
Loading