1212from datetime import timedelta , datetime
1313from django .conf import settings
1414from apps .authorization .models import update_grants
15+ from apps .authorization .models import ArchivedDataAccessGrant , DataAccessGrant
1516
17+ # Imports for quieting things during startup.
18+ from waffle .models import Switch
19+
20+ from uuid import uuid4
1621
17- def create_group (name = "BlueButton" ):
1822
23+ def create_group (name = "BlueButton" ):
1924 g , created = Group .objects .get_or_create (name = name )
2025 if created :
2126 print ("%s group created" % (name ))
@@ -24,42 +29,29 @@ def create_group(name="BlueButton"):
2429 return g
2530
2631
27- def create_user (group , usr ):
28- u_name = "fred "
29- first_name = "Fred"
30- last_name = "Flinstone "
31- 32- password = "foobarfoobarfoobar"
32+ def create_user (the_group ):
33+ username = "rogersf "
34+ first_name = "Fred"
35+ last_name = "Rogers "
36+ 37+ password = uuid4 ()
3338 user_type = "BEN"
34-
35- if usr is not None :
36- u_name = usr
37- first_name = "{}{}" .format (usr , "First" )
38- last_name = "{}{}" .format (usr , "Last" )
39- email = "{}.{}@example.com" .format (first_name , last_name )
40- user_type = "DEV"
41-
42-
43- if User .objects .filter (username = u_name ).exists ():
44- User .objects .filter (username = u_name ).delete ()
45-
46- u = None
47-
48- if usr is not None :
49- u = User .objects .create_user (username = u_name ,
50- first_name = first_name ,
51- last_name = last_name ,
52- email = email )
53- u .set_unusable_password ()
54- else :
55- # create a sample user 'fred' for dev local that has a usable password
56- u = User .objects .create_user (username = u_name ,
57- first_name = first_name ,
58- last_name = last_name ,
59- email = email ,
60- password = password ,)
61-
62- UserProfile .objects .create (user = u ,
39+
40+ # We will do this over-and-over.
41+ # If we don't already exist, then create the user.
42+ if User .objects .filter (username = username ).exists ():
43+ print (f"👟 { username } already exists. Skipping test user creation." )
44+ return User .objects .get (username = username )
45+
46+ # If the user didn't exist, it is our first time through.
47+ # Create the user.
48+ user_obj = User .objects .create (username = username ,
49+ first_name = first_name ,
50+ last_name = last_name ,
51+ email = email ,
52+ password = password ,)
53+ user_obj .set_unusable_password ()
54+ UserProfile .objects .create (user = user_obj ,
6355 user_type = user_type ,
6456 create_applications = True ,
6557 password_reset_question_1 = '1' ,
@@ -68,33 +60,35 @@ def create_user(group, usr):
6860 password_reset_answer_2 = 'Frank' ,
6961 password_reset_question_3 = '3' ,
7062 password_reset_answer_3 = 'Bentley' )
63+ user_obj .groups .add (the_group )
7164
72- u .groups .add (group )
65+ # CROSSWALK
66+ # Removing any existing crosswalks for this artificial user.
67+ # Why? Just in case.
68+ user_id_hash = "ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536"
69+ Crosswalk .objects .filter (_user_id_hash = user_id_hash ).delete ()
70+ Crosswalk .objects .get_or_create (user = user_obj ,
71+ fhir_id_v2 = settings .DEFAULT_SAMPLE_FHIR_ID_V2 ,
72+ _user_id_hash = user_id_hash )
73+ return user_obj
7374
74- if usr is None :
75- c , g_o_c = Crosswalk .objects .get_or_create (user = u ,
76- fhir_id_v2 = settings .DEFAULT_SAMPLE_FHIR_ID_V2 ,
77- _user_id_hash = "ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536" )
78- return u
7975
76+ def create_application (user ):
77+ app_name = "TestApp"
78+ if Application .objects .filter (name = app_name ).exists ():
79+ return Application .objects .get (name = app_name )
80+
81+ # If the app doesn't exist, create the test app.
8082
81- def create_application (user , group , app , redirect ):
82- app_name = "TestApp" if app is None else app
8383 Application .objects .filter (name = app_name ).delete ()
8484 redirect_uri = "{}{}" .format (settings .HOSTNAME_URL , settings .TESTCLIENT_REDIRECT_URI )
8585
86- if redirect :
87- redirect_uri = redirect
88-
89- if not (redirect_uri .startswith ("http://" ) or redirect_uri .startswith ("https://" )):
90- redirect_uri = "https://" + redirect_uri
91-
92- a = Application .objects .create (name = app_name ,
93- redirect_uris = redirect_uri ,
94- user = user ,
95- data_access_type = "THIRTEEN_MONTH" ,
96- client_type = "confidential" ,
97- authorization_grant_type = "authorization-code" )
86+ the_app = Application .objects .create (name = app_name ,
87+ redirect_uris = redirect_uri ,
88+ user = user ,
89+ data_access_type = "THIRTEEN_MONTH" ,
90+ client_type = "confidential" ,
91+ authorization_grant_type = "authorization-code" ,)
9892
9993 titles = ["My Medicare and supplemental coverage information." ,
10094 "My Medicare claim information." ,
@@ -104,49 +98,64 @@ def create_application(user, group, app, redirect):
10498
10599 for t in titles :
106100 c = ProtectedCapability .objects .get (title = t )
107- a .scope .add (c )
108- return a
101+ the_app .scope .add (c )
102+
103+ return the_app
109104
110105
111- def create_test_token (user , application ):
106+ def create_test_token (the_user , the_app ):
112107
108+ # Set expiration one day from now.
113109 now = timezone .now ()
114110 expires = now + timedelta (days = 1 )
115111
116- scopes = application .scope .all ()
112+ scopes = the_app .scope .all ()
117113 scope = []
118114 for s in scopes :
119115 scope .append (s .slug )
120116
121- t = AccessToken .objects .create (user = user , application = application ,
117+ # We have to have a tokent with token="sample-token-string", because we
118+ # rely on it existing for unit tests. Which are actually integration tests.
119+ if AccessToken .objects .filter (token = "sample-token-string" ).exists ():
120+ t = AccessToken .objects .get (token = "sample-token-string" )
121+ t .expires = expires
122+ t .save ()
123+ else :
124+ AccessToken .objects .create (user = the_user ,
125+ application = the_app ,
126+ # This needs to be "sample-token-string", because
127+ # we have tests that rely on it.
122128 token = "sample-token-string" ,
123129 expires = expires ,
124- scope = ' ' .join (scope ))
125- return t
130+ scope = ' ' .join (scope ),)
131+
132+
133+ def get_switch (name ):
134+ try :
135+ sw = Switch .objects .get (name = name )
136+ return sw .active
137+ except Exception as e :
138+ print (f"Could not get switch { name } : { e } " )
139+
140+
141+ def set_switch (name , b ):
142+ sw , _ = Switch .objects .get_or_create (name = name )
143+ sw .active = b
144+ sw .save ()
126145
127146
128147class Command (BaseCommand ):
129148 help = 'Create a test user and application for the test client'
130149
131- def add_arguments (self , parser ):
132- parser .add_argument ("-u" , "--user" , help = "Name of the user to be created (unique)." )
133- parser .add_argument ("-a" , "--app" , help = "Name of the application to be created (unique)." )
134- parser .add_argument ("-r" , "--redirect" , help = "Redirect url of the application." )
135-
136150 def handle (self , * args , ** options ):
137- usr = options ["user" ]
138- app = options ["app" ]
139- redirect = options ["redirect" ]
140-
141- g = create_group ()
142- u = create_user (g , usr )
143- a = create_application (u , g , app , redirect )
144- t = None
145- if usr is None and app is None :
146- t = create_test_token (u , a )
147- update_grants ()
148- print ("Name:" , a .name )
149- print ("client_id:" , a .client_id )
150- print ("client_secret:" , a .client_secret )
151- print ("access_token:" , t .token if t else "None" )
152- print ("redirect_uri:" , a .redirect_uris )
151+
152+ set_switch ('outreach_email' , False )
153+
154+ the_group = create_group ()
155+ the_user = create_user (the_group )
156+ the_app = create_application (the_user )
157+ create_test_token (the_user , the_app )
158+ update_grants ()
159+
160+ # Restore switch to whatever it was.
161+ set_switch ('outreach_email' , True )
0 commit comments