1- from typing import Union , Any , List , Optional , Dict
2- from requests import Response
1+ from typing import Union , Any , List , Dict
32import os
43import requests
54import logging
5+ from datetime import datetime , timedelta
66
77logging .basicConfig (level = logging .INFO )
88logger : logging .Logger = logging .getLogger (__name__ )
99logger .setLevel (logging .DEBUG )
1010
1111KRATOS_ADMIN_URL = os .getenv ("KRATOS_ADMIN_URL" )
1212
13+ # user_id -> {"identity" -> full identity, "simple" -> {"id": str, "mail": str, "firstName": str, "lastName": str}}
14+ # "collected" -> timestamp
15+ KRATOS_IDENTITY_CACHE : Dict [str , Any ] = {}
16+ KRATOS_IDENTITY_CACHE_TIMEOUT = timedelta (minutes = 30 )
17+
18+
19+ def __get_cached_values () -> Dict [str , Dict [str , Any ]]:
20+ global KRATOS_IDENTITY_CACHE
21+ if not KRATOS_IDENTITY_CACHE or len (KRATOS_IDENTITY_CACHE ) == 0 :
22+ __refresh_identity_cache ()
23+ elif (
24+ KRATOS_IDENTITY_CACHE ["collected" ] + KRATOS_IDENTITY_CACHE_TIMEOUT
25+ < datetime .now ()
26+ ):
27+ __refresh_identity_cache ()
28+ return KRATOS_IDENTITY_CACHE
29+
30+
31+ def __refresh_identity_cache ():
32+ global KRATOS_IDENTITY_CACHE
33+ request = requests .get (f"{ KRATOS_ADMIN_URL } /identities" )
34+ if request .ok :
35+ collected = datetime .now ()
36+ KRATOS_IDENTITY_CACHE = {
37+ identity ["id" ]: {
38+ "identity" : identity ,
39+ "simple" : __parse_identity_to_simple (identity ),
40+ }
41+ for identity in request .json ()
42+ }
43+ KRATOS_IDENTITY_CACHE ["collected" ] = collected
44+ else :
45+ KRATOS_IDENTITY_CACHE = {}
46+
47+
48+ def __get_identity (user_id : str , only_simple : bool = True ) -> Dict [str , Any ]:
49+ if not isinstance (user_id , str ):
50+ user_id = str (user_id )
51+ cache = __get_cached_values ()
52+ if user_id in cache :
53+ if only_simple :
54+ return cache [user_id ]["simple" ]
55+ return cache [user_id ]
56+
57+ if len (user_id ) == 36 :
58+ # check not new entry outside cache
59+ request = requests .get (f"{ KRATOS_ADMIN_URL } /identities/{ user_id } " )
60+ if request .ok :
61+ identity = request .json ()
62+ if identity ["id" ] == user_id :
63+ KRATOS_IDENTITY_CACHE [user_id ] = {
64+ "identity" : identity ,
65+ "simple" : __parse_identity_to_simple (identity ),
66+ }
67+ if only_simple :
68+ return KRATOS_IDENTITY_CACHE [user_id ]["simple" ]
69+ return KRATOS_IDENTITY_CACHE [user_id ]
70+ # e.g. if id "GOLD_STAR" is requested => wont be in cache but expects a dummy dict
71+ if only_simple :
72+ return __parse_identity_to_simple ({"id" : user_id })
73+ return {
74+ "identity" : {
75+ "id" : user_id ,
76+ "traits" : {"email" : None , "name" : {"first" : None , "last" : None }},
77+ }
78+ }
79+
80+
81+ def __parse_identity_to_simple (identity : Dict [str , Any ]) -> Dict [str , str ]:
82+ r = {
83+ "id" : identity ["id" ],
84+ "mail" : None ,
85+ "firstName" : None ,
86+ "lastName" : None ,
87+ }
88+ if "traits" in identity :
89+ r ["mail" ] = identity ["traits" ]["email" ]
90+ if "name" in identity ["traits" ]:
91+ r ["firstName" ] = identity ["traits" ]["name" ]["first" ]
92+ r ["lastName" ] = identity ["traits" ]["name" ]["last" ]
93+ return r
94+
1395
1496def get_userid_from_mail (user_mail : str ) -> str :
15- for identity in requests .get (f"{ KRATOS_ADMIN_URL } /identities" ).json ():
16- if identity ["traits" ]["email" ] == user_mail :
17- return identity ["id" ]
97+ values = __get_cached_values ()
98+ for key in values :
99+ if key == "collected" :
100+ continue
101+ if values [key ]["simple" ]["mail" ] == user_mail :
102+ return key
18103 return None
19104
20105
21106def resolve_user_mail_by_id (user_id : str ) -> str :
22- res : Response = requests .get ("{}/identities/{}" .format (KRATOS_ADMIN_URL , user_id ))
23- data : Any = res .json ()
24- if res .status_code == 200 and data ["traits" ]:
25- return data ["traits" ]["email" ]
107+ i = __get_identity (user_id )
108+ if i :
109+ return i ["mail" ]
26110 return None
27111
28112
29113def resolve_user_name_by_id (user_id : str ) -> str :
30- res : Response = requests . get ( "{}/identities/{}" . format ( KRATOS_ADMIN_URL , user_id ) )
31- data : Any = res . json ()
32- if res . status_code == 200 and data [ "traits" ]:
33- return data ["traits" ]["name" ]
114+ i = __get_identity ( user_id , False )
115+ if i :
116+ i = i [ "identity" ]
117+ return i ["traits" ]["name" ]
34118 return None
35119
36120
37121def resolve_all_user_ids (
38122 relevant_ids : List [str ], as_list : bool = True
39- ) -> List [Dict [str , str ]]:
123+ ) -> Union [ Dict [ str , Dict [ str , str ]], List [Dict [str , str ] ]]:
40124 final = [] if as_list else {}
41125 for id in relevant_ids :
42- r = requests .get (f"{ KRATOS_ADMIN_URL } /identities/{ id } " ).json ()
43- d = {
44- "id" : id ,
45- "mail" : None ,
46- "firstName" : None ,
47- "lastName" : None ,
48- }
49- if "traits" in r :
50- traits = r ["traits" ]
51- d ["mail" ] = traits ["email" ]
52- d ["firstName" ] = traits ["name" ]["first" ]
53- d ["lastName" ] = traits ["name" ]["last" ]
126+ i = __get_identity (id )
54127 if as_list :
55- final .append (d )
128+ final .append (i )
56129 else :
57- final [id ] = d
130+ final [id ] = i
58131 return final
59132
60133
@@ -63,25 +136,16 @@ def expand_user_mail_name(
63136) -> List [Dict [str , str ]]:
64137 final = []
65138 for user in users :
66- r = requests .get (f"{ KRATOS_ADMIN_URL } /identities/{ user [user_id_key ]} " ).json ()
67- d = {
68- "mail" : None ,
69- "firstName" : None ,
70- "lastName" : None ,
71- }
72- if "traits" in r :
73- traits = r ["traits" ]
74- d ["mail" ] = traits ["email" ]
75- d ["firstName" ] = traits ["name" ]["first" ]
76- d ["lastName" ] = traits ["name" ]["last" ]
77- user = {** user , ** d }
139+ i = __get_identity (user [user_id_key ])
140+ user = {** user , ** i }
78141 final .append (user )
79142 return final
80143
81144
82145def resolve_user_name_and_email_by_id (user_id : str ) -> dict :
83- res : Response = requests .get ("{}/identities/{}" .format (KRATOS_ADMIN_URL , user_id ))
84- data : Any = res .json ()
85- if res .status_code == 200 and data ["traits" ]:
86- return data ["traits" ]["name" ], data ["traits" ]["email" ]
146+ i = __get_identity (user_id , False )
147+ if i :
148+ i = i ["identity" ]
149+ if i and "traits" in i and i ["traits" ]:
150+ return i ["traits" ]["name" ], i ["traits" ]["email" ]
87151 return None
0 commit comments