Skip to content

Commit 60b88f2

Browse files
committed
Add jwt mgmt capabilities
1 parent d15089a commit 60b88f2

File tree

4 files changed

+347
-2
lines changed

4 files changed

+347
-2
lines changed

descope/management/common.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ class MgmtV1:
7979
# jwt
8080
update_jwt_path = "/v1/mgmt/jwt/update"
8181
impersonate_path = "/v1/mgmt/impersonate"
82+
mgmt_sign_in = "/v1/mgmt/auth/signin"
83+
mgmt_sign_up = "/v1/mgmt/auth/signup"
84+
mgmt_sign_up_or_in = "/v1/mgmt/auth/signup-in"
8285

8386
# permission
8487
permission_create_path = "/v1/mgmt/permission/create"
@@ -146,6 +149,71 @@ class MgmtV1:
146149
project_list_projects = "/v1/mgmt/projects/list"
147150

148151

152+
class MgmtSignUpOptions:
153+
def __init__(
154+
self,
155+
custom_claims: Optional[dict] = None,
156+
):
157+
self.custom_claims = custom_claims
158+
159+
160+
class MgmtLoginOptions:
161+
def __init__(
162+
self,
163+
stepup: bool = False,
164+
mfa: bool = False,
165+
revoke_other_sessions: Optional[bool] = None,
166+
custom_claims: Optional[dict] = None,
167+
jwt: Optional[str] = None,
168+
):
169+
self.stepup = stepup
170+
self.custom_claims = custom_claims
171+
self.mfa = mfa
172+
self.revoke_other_sessions = revoke_other_sessions
173+
self.jwt = jwt
174+
175+
176+
def is_jwt_required(lgo: MgmtLoginOptions) -> bool:
177+
return lgo is not None and (lgo.stepup or lgo.mfa)
178+
179+
180+
class MgmtUserRequest:
181+
def __init__(
182+
self,
183+
name: Optional[str] = None,
184+
given_name: Optional[str] = None,
185+
middle_name: Optional[str] = None,
186+
family_name: Optional[str] = None,
187+
phone: Optional[str] = None,
188+
email: Optional[str] = None,
189+
email_verified: Optional[bool] = None,
190+
phone_verified: Optional[bool] = None,
191+
sso_app_id: Optional[str] = None,
192+
):
193+
self.name = name
194+
self.given_name = given_name
195+
self.middle_name = middle_name
196+
self.family_name = family_name
197+
self.phone = phone
198+
self.email = email
199+
self.email_verified = email_verified
200+
self.phone_verified = phone_verified
201+
self.sso_app_id = sso_app_id
202+
203+
def to_dict(self) -> dict:
204+
return {
205+
"name": self.name,
206+
"givenName": self.given_name,
207+
"middleName": self.middle_name,
208+
"familyName": self.family_name,
209+
"phone": self.phone,
210+
"email": self.email,
211+
"emailVerified": self.email_verified,
212+
"phoneVerified": self.phone_verified,
213+
"ssoAppId": self.sso_app_id,
214+
}
215+
216+
149217
class AssociatedTenant:
150218
"""
151219
Represents a tenant association for a User or Access Key. The tenant_id is required to denote

descope/management/jwt.py

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22

33
from descope._auth_base import AuthBase
44
from descope.exceptions import ERROR_TYPE_INVALID_ARGUMENT, AuthException
5-
from descope.management.common import MgmtV1
5+
from descope.management.common import (
6+
MgmtLoginOptions,
7+
MgmtSignUpOptions,
8+
MgmtUserRequest,
9+
MgmtV1,
10+
is_jwt_required,
11+
)
612

713

814
class JWT(AuthBase):
@@ -78,3 +84,91 @@ def impersonate(
7884
pswd=self._auth.management_key,
7985
)
8086
return response.json().get("jwt", "")
87+
88+
def sign_in(
89+
self, login_id: str, login_options: Optional[MgmtLoginOptions] = None
90+
) -> dict:
91+
""" """
92+
if not login_id:
93+
raise AuthException(
94+
400, ERROR_TYPE_INVALID_ARGUMENT, "login_id cannot be empty"
95+
)
96+
97+
if login_options is None:
98+
login_options = MgmtLoginOptions()
99+
100+
if is_jwt_required(login_options) and not login_options.jwt:
101+
raise AuthException(400, ERROR_TYPE_INVALID_ARGUMENT, "JWT is required")
102+
103+
response = self._auth.do_post(
104+
MgmtV1.mgmt_sign_in,
105+
{
106+
"loginId": login_id,
107+
"stepup": login_options.stepup,
108+
"mfa": login_options.mfa,
109+
"revokeOtherSessions": login_options.revoke_other_sessions,
110+
"customClaims": login_options.custom_claims,
111+
"jwt": login_options.jwt,
112+
},
113+
pswd=self._auth.management_key,
114+
)
115+
resp = response.json()
116+
jwt_response = self._auth.generate_jwt_response(resp, None, None)
117+
return jwt_response
118+
119+
def sign_up(
120+
self,
121+
login_id: str,
122+
user: Optional[MgmtUserRequest] = None,
123+
signup_options: Optional[MgmtSignUpOptions] = None,
124+
) -> dict:
125+
""" """
126+
return self._sign_up_internal(
127+
login_id, MgmtV1.mgmt_sign_up, user, signup_options
128+
)
129+
130+
def sign_up_or_in(
131+
self,
132+
login_id: str,
133+
user: Optional[MgmtUserRequest] = None,
134+
signup_options: Optional[MgmtSignUpOptions] = None,
135+
) -> dict:
136+
""" """
137+
return self._sign_up_internal(
138+
login_id, MgmtV1.mgmt_sign_up_or_in, user, signup_options
139+
)
140+
141+
def _sign_up_internal(
142+
self,
143+
login_id: str,
144+
endpoint: str,
145+
user: Optional[MgmtUserRequest] = None,
146+
signup_options: Optional[MgmtSignUpOptions] = None,
147+
) -> dict:
148+
""" """
149+
if user is None:
150+
user = MgmtUserRequest()
151+
152+
if not login_id:
153+
raise AuthException(
154+
400, ERROR_TYPE_INVALID_ARGUMENT, "login_id cannot be empty"
155+
)
156+
157+
if signup_options is None:
158+
signup_options = MgmtSignUpOptions()
159+
160+
response = self._auth.do_post(
161+
endpoint,
162+
{
163+
"loginId": login_id,
164+
"user": user.to_dict(),
165+
"emailVerified": user.email_verified,
166+
"phoneVerified": user.phone_verified,
167+
"ssoAppId": user.sso_app_id,
168+
"customClaims": signup_options.custom_claims,
169+
},
170+
pswd=self._auth.management_key,
171+
)
172+
resp = response.json()
173+
jwt_response = self._auth.generate_jwt_response(resp, None, None)
174+
return jwt_response

samples/management/jwt_mgmt.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import logging
2+
3+
from descope import AuthException, DescopeClient
4+
5+
logging.basicConfig(level=logging.INFO)
6+
7+
8+
def main():
9+
project_id = ""
10+
management_key = ""
11+
12+
try:
13+
descope_client = DescopeClient(
14+
project_id=project_id, management_key=management_key
15+
)
16+
user_login_id = "[email protected]"
17+
18+
try:
19+
logging.info("Going to create a new user by sign up or in method")
20+
resp = descope_client.mgmt.jwt.sign_up_or_in(user_login_id)
21+
logging.info(f"Response: {resp}")
22+
23+
except AuthException as e:
24+
logging.info(f"User SignUpOrIn failed {e}")
25+
26+
try:
27+
logging.info("Searching for created user")
28+
resp = descope_client.mgmt.jwt.sign_in(user_login_id)
29+
logging.info(f"Response: {resp}")
30+
31+
except AuthException as e:
32+
logging.info(f"User SignIn failed {e}")
33+
34+
except AuthException:
35+
raise
36+
37+
38+
if __name__ == "__main__":
39+
main()

tests/management/test_jwt.py

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from descope import AuthException, DescopeClient
66
from descope.common import DEFAULT_TIMEOUT_SECONDS
7-
from descope.management.common import MgmtV1
7+
from descope.management.common import MgmtLoginOptions, MgmtV1
88

99
from .. import common
1010

@@ -119,3 +119,147 @@ def test_impersonate(self):
119119
params=None,
120120
timeout=DEFAULT_TIMEOUT_SECONDS,
121121
)
122+
123+
def test_sign_in(self):
124+
client = DescopeClient(
125+
self.dummy_project_id,
126+
self.public_key_dict,
127+
False,
128+
self.dummy_management_key,
129+
)
130+
131+
# Test failed flows
132+
self.assertRaises(AuthException, client.mgmt.jwt.sign_in, "")
133+
134+
self.assertRaises(
135+
AuthException,
136+
client.mgmt.jwt.sign_in,
137+
"loginId",
138+
MgmtLoginOptions(mfa=True),
139+
)
140+
141+
# Test success flow
142+
with patch("requests.post") as mock_post:
143+
network_resp = mock.Mock()
144+
network_resp.ok = True
145+
network_resp.json.return_value = json.loads("""{"jwt": "response"}""")
146+
mock_post.return_value = network_resp
147+
client.mgmt.jwt.sign_in("loginId")
148+
expected_uri = f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_sign_in}"
149+
mock_post.assert_called_with(
150+
expected_uri,
151+
headers={
152+
**common.default_headers,
153+
"Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}",
154+
},
155+
json={
156+
"loginId": "loginId",
157+
"stepup": False,
158+
"mfa": False,
159+
"revokeOtherSessions": None,
160+
"customClaims": None,
161+
"jwt": None,
162+
},
163+
allow_redirects=False,
164+
verify=True,
165+
params=None,
166+
timeout=DEFAULT_TIMEOUT_SECONDS,
167+
)
168+
169+
def test_sign_up(self):
170+
client = DescopeClient(
171+
self.dummy_project_id,
172+
self.public_key_dict,
173+
False,
174+
self.dummy_management_key,
175+
)
176+
177+
# Test failed flows
178+
self.assertRaises(AuthException, client.mgmt.jwt.sign_up, "")
179+
180+
# Test success flow
181+
with patch("requests.post") as mock_post:
182+
network_resp = mock.Mock()
183+
network_resp.ok = True
184+
network_resp.json.return_value = json.loads("""{"jwt": "response"}""")
185+
mock_post.return_value = network_resp
186+
client.mgmt.jwt.sign_up("loginId")
187+
expected_uri = f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_sign_up}"
188+
mock_post.assert_called_with(
189+
expected_uri,
190+
headers={
191+
**common.default_headers,
192+
"Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}",
193+
},
194+
json={
195+
"loginId": "loginId",
196+
"user": {
197+
"name": None,
198+
"givenName": None,
199+
"middleName": None,
200+
"familyName": None,
201+
"phone": None,
202+
"email": None,
203+
"emailVerified": None,
204+
"phoneVerified": None,
205+
"ssoAppId": None,
206+
},
207+
"emailVerified": None,
208+
"phoneVerified": None,
209+
"ssoAppId": None,
210+
"customClaims": None,
211+
},
212+
allow_redirects=False,
213+
verify=True,
214+
params=None,
215+
timeout=DEFAULT_TIMEOUT_SECONDS,
216+
)
217+
218+
def test_sign_up_or_in(self):
219+
client = DescopeClient(
220+
self.dummy_project_id,
221+
self.public_key_dict,
222+
False,
223+
self.dummy_management_key,
224+
)
225+
226+
# Test failed flows
227+
self.assertRaises(AuthException, client.mgmt.jwt.sign_up_or_in, "")
228+
229+
# Test success flow
230+
with patch("requests.post") as mock_post:
231+
network_resp = mock.Mock()
232+
network_resp.ok = True
233+
network_resp.json.return_value = json.loads("""{"jwt": "response"}""")
234+
mock_post.return_value = network_resp
235+
client.mgmt.jwt.sign_up_or_in("loginId")
236+
expected_uri = f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_sign_up_or_in}"
237+
mock_post.assert_called_with(
238+
expected_uri,
239+
headers={
240+
**common.default_headers,
241+
"Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}",
242+
},
243+
json={
244+
"loginId": "loginId",
245+
"user": {
246+
"name": None,
247+
"givenName": None,
248+
"middleName": None,
249+
"familyName": None,
250+
"phone": None,
251+
"email": None,
252+
"emailVerified": None,
253+
"phoneVerified": None,
254+
"ssoAppId": None,
255+
},
256+
"emailVerified": None,
257+
"phoneVerified": None,
258+
"ssoAppId": None,
259+
"customClaims": None,
260+
},
261+
allow_redirects=False,
262+
verify=True,
263+
params=None,
264+
timeout=DEFAULT_TIMEOUT_SECONDS,
265+
)

0 commit comments

Comments
 (0)