|
17 | 17 | # Imports for quieting things during startup. |
18 | 18 | from waffle.models import Switch |
19 | 19 |
|
| 20 | +from uuid import uuid4 |
20 | 21 |
|
21 | | -def create_group(name="BlueButton"): |
22 | 22 |
|
| 23 | +def create_group(name="BlueButton"): |
23 | 24 | g, created = Group.objects.get_or_create(name=name) |
24 | 25 | if created: |
25 | 26 | print("%s group created" % (name)) |
26 | 27 | else: |
27 | 28 | print("%s group pre-existing. Create skipped." % (name)) |
28 | 29 | return g |
29 | 30 |
|
30 | | - |
31 | | -def get_switch(name): |
32 | | - try: |
33 | | - sw = Switch.objects.get(name=name) |
34 | | - return sw.active |
35 | | - except Exception as e: |
36 | | - print(f"Could not get switch {name}: {e}") |
37 | | - |
38 | | - |
39 | | -def set_switch(name, b): |
40 | | - # DISABLE SOME WAFFLE SWITCHES |
41 | | - # We don't want email, etc. |
42 | | - sw, _ = Switch.objects.get_or_create(name=name) |
43 | | - sw.active = b |
44 | | - sw.save() |
45 | | - |
46 | 31 | # usr would be a string if it is anything |
47 | 32 |
|
48 | 33 |
|
49 | | -def create_user(group, usr): |
50 | | - u_name = "rogersf" |
| 34 | +def create_user(the_group): |
| 35 | + username = "rogersf" |
51 | 36 | first_name = "Fred" |
52 | 37 | last_name = "Rogers" |
53 | | - email = "fred@landofmakebelieve.gov" |
54 | | - password = "danielthetiger" |
| 38 | + email = "mrrogers@landofmakebelieve.gov" |
| 39 | + password = uuid4() |
55 | 40 | user_type = "BEN" |
56 | 41 |
|
57 | | - if usr is not None: |
58 | | - u_name = usr |
59 | | - first_name = "{}{}".format(usr, "First") |
60 | | - last_name = "{}{}".format(usr, "Last") |
61 | | - email = "{}.{}@{}".format(first_name, last_name, email) |
62 | | - user_type = "DEV" |
63 | | - |
64 | | - # This violates constraints on other tables. |
65 | | - usr_q = User.objects.filter(username=u_name) |
66 | | - if usr_q.exists(): |
67 | | - # Delete any ADAGs for this user, or we will run into a |
68 | | - # constraint issue at startup. |
69 | | - count = ArchivedDataAccessGrant.objects.filter(beneficiary=usr_q.first()).delete() |
70 | | - print(f"Deleted {count} ADAGs for {u_name}") |
71 | | - count = DataAccessGrant.objects.filter(beneficiary=usr_q.first()).delete() |
72 | | - print(f"Deleted {count} ADAGs for {u_name}") |
73 | | - |
74 | | - User.objects.filter(username=u_name).delete() |
75 | | - |
76 | | - u = None |
77 | | - |
78 | | - if usr is not None: |
79 | | - try: |
80 | | - u, _ = User.objects.get_or_create(username=u_name, |
81 | | - first_name=first_name, |
82 | | - last_name=last_name, |
83 | | - email=email, |
84 | | - signals_to_disable=["post_save"]) |
85 | | - u.set_unusable_password() |
86 | | - except Exception as e: |
87 | | - print(f"Did not create user: {e}") |
88 | | - else: |
89 | | - # create a sample user 'fred' for dev local that has a usable password |
90 | | - try: |
91 | | - # get_or_create returns a tuple (v, bool) |
92 | | - u, _ = User.objects.get_or_create(username=u_name, |
93 | | - first_name=first_name, |
94 | | - last_name=last_name, |
95 | | - email=email, |
96 | | - password=password,) |
97 | | - |
98 | | - UserProfile.objects.create(user=u, |
99 | | - user_type=user_type, |
100 | | - create_applications=True, |
101 | | - password_reset_question_1='1', |
102 | | - password_reset_answer_1='blue', |
103 | | - password_reset_question_2='2', |
104 | | - password_reset_answer_2='Frank', |
105 | | - password_reset_question_3='3', |
106 | | - password_reset_answer_3='Bentley') |
107 | | - except Exception as e: |
108 | | - print(f"Did not create user and profile: {e}") |
109 | | - |
110 | | - if u is None: |
111 | | - print(f"Error creating user; exiting.") |
112 | | - else: |
113 | | - u.groups.add(group) |
114 | | - |
| 42 | + # We will do this over-and-over. |
| 43 | + # If we don't already exist, then create the user. |
| 44 | + if User.objects.filter(username=username).exists(): |
| 45 | + print(f"👟 {username} already exists. Skipping test user creation.") |
| 46 | + return User.objects.get(username=username) |
| 47 | + |
| 48 | + # If the user didn't exist, it is our first time through. |
| 49 | + # Create the user. |
| 50 | + user_obj = User.objects.create(username=username, |
| 51 | + first_name=first_name, |
| 52 | + last_name=last_name, |
| 53 | + email=email, |
| 54 | + password=password,) |
| 55 | + user_obj.set_unusable_password() |
| 56 | + UserProfile.objects.create(user=user_obj, |
| 57 | + user_type=user_type, |
| 58 | + create_applications=True, |
| 59 | + password_reset_question_1='1', |
| 60 | + password_reset_answer_1='blue', |
| 61 | + password_reset_question_2='2', |
| 62 | + password_reset_answer_2='Frank', |
| 63 | + password_reset_question_3='3', |
| 64 | + password_reset_answer_3='Bentley') |
| 65 | + user_obj.groups.add(the_group) |
| 66 | + |
| 67 | + # CROSSWALK |
| 68 | + # Removing any existing crosswalks for this artificial user. |
| 69 | + # Why? Just in case. |
115 | 70 | user_id_hash = "ee78989d1d9ba0b98f3cfbd52479f10c7631679c17563186f70fbef038cc9536" |
116 | 71 | Crosswalk.objects.filter(_user_id_hash=user_id_hash).delete() |
117 | | - c, _ = Crosswalk.objects.get_or_create(user=u, |
118 | | - fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2, |
119 | | - _user_id_hash=user_id_hash) |
120 | | - return u |
| 72 | + Crosswalk.objects.get_or_create(user=user_obj, |
| 73 | + fhir_id_v2=settings.DEFAULT_SAMPLE_FHIR_ID_V2, |
| 74 | + _user_id_hash=user_id_hash) |
| 75 | + return user_obj |
| 76 | + |
121 | 77 |
|
| 78 | +def create_application(user): |
| 79 | + app_name = "TestApp" |
| 80 | + if Application.objects.filter(name=app_name).exists(): |
| 81 | + return Application.objects.get(name=app_name) |
| 82 | + |
| 83 | + # If the app doesn't exist, create the test app. |
122 | 84 |
|
123 | | -def create_application(user, group, app, redirect): |
124 | | - app_name = "TestApp" if app is None else app |
125 | 85 | Application.objects.filter(name=app_name).delete() |
126 | 86 | redirect_uri = "{}{}".format(settings.HOSTNAME_URL, settings.TESTCLIENT_REDIRECT_URI) |
127 | 87 |
|
128 | | - if redirect: |
129 | | - redirect_uri = redirect |
| 88 | + the_app = Application.objects.create(name=app_name, |
| 89 | + redirect_uris=redirect_uri, |
| 90 | + user=user, |
| 91 | + data_access_type="THIRTEEN_MONTH", |
| 92 | + client_type="confidential", |
| 93 | + authorization_grant_type="authorization-code",) |
130 | 94 |
|
131 | | - if not (redirect_uri.startswith("http://") or redirect_uri.startswith("https://")): |
132 | | - redirect_uri = "https://" + redirect_uri |
| 95 | + titles = ["My Medicare and supplemental coverage information.", |
| 96 | + "My Medicare claim information.", |
| 97 | + "My general patient and demographic information.", |
| 98 | + "Profile information including name and email." |
| 99 | + ] |
133 | 100 |
|
134 | | - try: |
135 | | - a = Application.objects.create(name=app_name, |
136 | | - redirect_uris=redirect_uri, |
137 | | - user=user, |
138 | | - data_access_type="THIRTEEN_MONTH", |
139 | | - client_type="confidential", |
140 | | - authorization_grant_type="authorization-code",) |
141 | | - |
142 | | - titles = ["My Medicare and supplemental coverage information.", |
143 | | - "My Medicare claim information.", |
144 | | - "My general patient and demographic information.", |
145 | | - "Profile information including name and email." |
146 | | - ] |
147 | | - |
148 | | - for t in titles: |
149 | | - c = ProtectedCapability.objects.get(title=t) |
150 | | - a.scope.add(c) |
151 | | - return a |
152 | | - except Exception as e: |
153 | | - print(f"Skipped creation of {app_name}: {e}") |
| 101 | + for t in titles: |
| 102 | + c = ProtectedCapability.objects.get(title=t) |
| 103 | + the_app.scope.add(c) |
154 | 104 |
|
| 105 | + return the_app |
155 | 106 |
|
156 | | -def create_test_token(user, application): |
157 | 107 |
|
| 108 | +def create_test_token(the_user, the_app): |
| 109 | + |
| 110 | + # Set expiration one day from now. |
158 | 111 | now = timezone.now() |
159 | 112 | expires = now + timedelta(days=1) |
160 | 113 |
|
161 | | - scopes = application.scope.all() |
| 114 | + scopes = the_app.scope.all() |
162 | 115 | scope = [] |
163 | 116 | for s in scopes: |
164 | 117 | scope.append(s.slug) |
165 | 118 |
|
166 | | - t = AccessToken.objects.create(user=user, application=application, |
| 119 | + # We have to have a tokent with token="sample-token-string", because we |
| 120 | + # rely on it existing for unit tests. Which are actually integration tests. |
| 121 | + if AccessToken.objects.filter(token="sample-token-string").exists(): |
| 122 | + t = AccessToken.objects.get(token="sample-token-string") |
| 123 | + t.expires = expires |
| 124 | + t.save() |
| 125 | + else: |
| 126 | + AccessToken.objects.create(user=the_user, |
| 127 | + application=the_app, |
| 128 | + # This needs to be "sample-token-string", because |
| 129 | + # we have tests that rely on it. |
167 | 130 | token="sample-token-string", |
168 | 131 | expires=expires, |
169 | 132 | scope=' '.join(scope),) |
170 | 133 |
|
171 | | - return t |
| 134 | + |
| 135 | +def get_switch(name): |
| 136 | + try: |
| 137 | + sw = Switch.objects.get(name=name) |
| 138 | + return sw.active |
| 139 | + except Exception as e: |
| 140 | + print(f"Could not get switch {name}: {e}") |
| 141 | + |
| 142 | + |
| 143 | +def set_switch(name, b): |
| 144 | + sw, _ = Switch.objects.get_or_create(name=name) |
| 145 | + sw.active = b |
| 146 | + sw.save() |
172 | 147 |
|
173 | 148 |
|
174 | 149 | class Command(BaseCommand): |
175 | 150 | help = 'Create a test user and application for the test client' |
176 | 151 |
|
177 | | - def add_arguments(self, parser): |
178 | | - parser.add_argument("-u", "--user", help="Name of the user to be created (unique).") |
179 | | - parser.add_argument("-a", "--app", help="Name of the application to be created (unique).") |
180 | | - parser.add_argument("-r", "--redirect", help="Redirect url of the application.") |
181 | | - |
182 | 152 | def handle(self, *args, **options): |
183 | | - usr = options.get("user", None) |
184 | | - app = options.get("app", None) |
185 | | - redirect = options["redirect"] |
186 | 153 |
|
187 | 154 | set_switch('outreach_email', False) |
188 | 155 |
|
189 | | - g = create_group() |
190 | | - u = create_user(g, usr) |
191 | | - print(f"Created user {u}") |
192 | | - a = create_application(u, g, app, redirect) |
193 | | - t = None |
194 | | - if usr is None and app is None: |
195 | | - t = create_test_token(u, a) |
196 | | - update_grants() |
197 | | - print("Name:", a.name) |
198 | | - print("client_id:", a.client_id) |
199 | | - print("client_secret:", a.client_secret) |
200 | | - print("access_token:", t.token if t else "None") |
201 | | - print("redirect_uri:", a.redirect_uris) |
| 156 | + the_group = create_group() |
| 157 | + the_user = create_user(the_group) |
| 158 | + the_app = create_application(the_user) |
| 159 | + create_test_token(the_user, the_app) |
| 160 | + update_grants() |
202 | 161 |
|
203 | 162 | # Restore switch to whatever it was. |
204 | 163 | set_switch('outreach_email', True) |
0 commit comments