@@ -174,155 +174,6 @@ async def generate_pkce_pair(self) -> Dict[str, str]:
174174 "code_challenge" : code_challenge
175175 }
176176
177-
178- # def code_exchange(self, user_id: str, auth_code: str) -> None:
179- # """Exchange authorization code for tokens and store in session."""
180- # data = {
181- # "grant_type": "authorization_code",
182- # "code": auth_code,
183- # "redirect_uri": self.redirect_uri,
184- # "client_id": self.client_id,
185- # "client_secret": self.client_secret,
186- # }
187- # response = requests.post(self.token_url, data=data)
188- # response.raise_for_status()
189- # token_data = response.json()
190-
191- # user_info = {
192- # "client_id": self.client_id,
193- # "client_secret": self.client_secret,
194- # "token_url": self.token_url,
195- # }
196- # self.session_manager.set_user_data(user_id, user_info, token_data)
197-
198- # # def get_login_url(self, state: Optional[str] = None, scope: Optional[List[str]] = None, login_type: Optional[str] = None) -> str:
199- # # """
200- # # Get the login URL for user authentication.
201-
202- # # Args:
203- # # state (Optional[str]): A state parameter for CSRF protection.
204- # # scope (Optional[List[str]]): A list of scopes to request.
205-
206- # # Returns:
207- # # str: The login URL.
208- # # """
209- # # params = {
210- # # "client_id": self.client_id,
211- # # "response_type": "code",
212- # # "redirect_uri": self.redirect_uri,
213- # # "scope": " ".join(scope) if scope else "openid profile email",
214- # # "state": state or "",
215- # # }
216- # # if login_type:
217- # # params["login_type"] = login_type
218- # # return f"{self.auth_url}?{urlencode(params)}"
219-
220- # # def get_login_url_with_pkce(self, state: Optional[str] = None, scope: Optional[List[str]] = None) -> str:
221- # # """
222- # # Get the login URL for PKCE flow.
223-
224- # # Args:
225- # # state (Optional[str]): A state parameter for CSRF protection.
226- # # scope (Optional[List[str]]): A list of scopes to request.
227-
228- # # Returns:
229- # # str: The login URL with PKCE parameters.
230- # # """
231- # # code_verifier = self.generate_pkce_code_verifier()
232- # # code_challenge = self.generate_pkce_code_challenge(code_verifier)
233-
234- # # params = {
235- # # "client_id": self.client_id,
236- # # "response_type": "code",
237- # # "redirect_uri": self.redirect_uri,
238- # # "scope": " ".join(scope) if scope else "openid profile email",
239- # # "state": state or "",
240- # # "code_challenge": code_challenge,
241- # # "code_challenge_method": "S256",
242- # # }
243- # # return f"{self.auth_url}?{urlencode(params)}"
244-
245- # # def get_user_details(self, user_id: str) -> Dict[str, Any]:
246- # # """Retrieve user information using the stored token."""
247- # # token_manager = self.session_manager.user_sessions.get(user_id, {}).get("token_manager")
248- # # if not token_manager:
249- # # raise KindeRetrieveException("User not authenticated")
250-
251- # # access_token = token_manager.get_access_token()
252- # # headers = {"Authorization": f"Bearer {access_token}"}
253- # # response = requests.get(self.userinfo_url, headers=headers)
254- # # response.raise_for_status()
255- # # return response.json()
256-
257- # # def logout(self, params: Optional[Dict[str, Any]] = None) -> str:
258- # # """
259- # # Generate the logout URL.
260-
261- # # Args:
262- # # params (Optional[Dict[str, Any]]): A dictionary of query parameters to include in the logout URL.
263- # # Supported keys: state, post_logout_redirect_uri.
264-
265- # # Returns:
266- # # str: The logout URL.
267- # # """
268- # # # Default parameters
269- # # default_params = {
270- # # "client_id": self.client_id,
271- # # "logout_uri": self.redirect_uri,
272- # # "state": self.state or "",
273- # # }
274-
275- # # # Merge default parameters with user-provided parameters
276- # # if params:
277- # # if "state" in params:
278- # # default_params["state"] = params["state"]
279- # # if "post_logout_redirect_uri" in params:
280- # # default_params["post_logout_redirect_uri"] = params["post_logout_redirect_uri"]
281-
282- # # return f"{self.logout_url}?{urlencode(default_params)}"
283-
284- # # def generate_pkce_code_verifier(self) -> str:
285- # # """Generate a PKCE code verifier."""
286- # # return secrets.token_urlsafe(32)
287-
288- # # def generate_pkce_code_challenge(self, code_verifier: str) -> str:
289- # # """Generate a PKCE code challenge from the verifier."""
290- # # code_challenge = hashlib.sha256(code_verifier.encode()).digest()
291- # # return base64.urlsafe_b64encode(code_challenge).decode().rstrip("=")
292-
293- # # def get_tokens_for_core(self, user_id: str) -> Optional[Dict[str, str]]:
294- # """
295- # Retrieve tokens for the core module.
296-
297- # Args:
298- # user_id (str): The ID of the user whose tokens are being retrieved.
299-
300- # Returns:
301- # Optional[Dict[str, str]]: A dictionary containing the access token and refresh token (if available).
302- # Returns None if the user is not authenticated or the session is invalid.
303- # """
304- # session = self.session_manager.storage.get(user_id)
305- # if not session:
306- # return None
307-
308- # token_manager = session.get("token_manager")
309- # if not token_manager:
310- # return None
311-
312- # access_token = token_manager.get_access_token()
313- # if not access_token:
314- # return None
315-
316- # tokens = {
317- # "access_token": access_token,
318- # }
319-
320- # refresh_token = token_manager.tokens.get("refresh_token")
321- # if refresh_token:
322- # tokens["refresh_token"] = refresh_token
323-
324- # return tokens
325-
326177 async def generate_auth_url (
327178 self ,
328179 route_type : IssuerRouteTypes = IssuerRouteTypes .LOGIN ,
0 commit comments