|
| 1 | +# Copyright (c) 2023, VRAI Labs and/or its affiliates. All rights reserved. |
| 2 | +# |
| 3 | +# This software is licensed under the Apache License, Version 2.0 (the |
| 4 | +# "License") as published by the Apache Software Foundation. |
| 5 | +# |
| 6 | +# You may not use this file except in compliance with the License. You may |
| 7 | +# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 11 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 12 | +# License for the specific language governing permissions and limitations |
| 13 | +# under the License. |
| 14 | + |
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Union |
| 18 | + |
| 19 | +from supertokens_python.normalised_url_domain import NormalisedURLDomain |
| 20 | + |
| 21 | +from httpx import AsyncClient |
| 22 | +from supertokens_python.recipe.thirdparty.provider import Provider |
| 23 | +from supertokens_python.recipe.thirdparty.types import ( |
| 24 | + AccessTokenAPI, |
| 25 | + AuthorisationRedirectAPI, |
| 26 | + UserInfo, |
| 27 | + UserInfoEmail, |
| 28 | +) |
| 29 | + |
| 30 | +if TYPE_CHECKING: |
| 31 | + from supertokens_python.framework.request import BaseRequest |
| 32 | + |
| 33 | + |
| 34 | +class GitLab(Provider): |
| 35 | + def __init__( |
| 36 | + self, |
| 37 | + client_id: str, |
| 38 | + client_secret: str, |
| 39 | + scope: Union[None, List[str]] = None, |
| 40 | + authorisation_redirect: Union[ |
| 41 | + None, Dict[str, Union[str, Callable[[BaseRequest], str]]] |
| 42 | + ] = None, |
| 43 | + gitlab_base_url: str = "https://gitlab.com", |
| 44 | + is_default: bool = False, |
| 45 | + ): |
| 46 | + super().__init__("gitlab", is_default) |
| 47 | + default_scopes = ["read_user"] |
| 48 | + if scope is None: |
| 49 | + scope = default_scopes |
| 50 | + self.client_id = client_id |
| 51 | + self.client_secret = client_secret |
| 52 | + self.scopes = list(set(scope)) |
| 53 | + gitlab_base_url = NormalisedURLDomain(gitlab_base_url).get_as_string_dangerous() |
| 54 | + self.gitlab_base_url = gitlab_base_url |
| 55 | + self.access_token_api_url = f"{gitlab_base_url}/oauth/token" |
| 56 | + self.authorisation_redirect_url = f"{gitlab_base_url}/oauth/authorize" |
| 57 | + self.authorisation_redirect_params = {} |
| 58 | + if authorisation_redirect is not None: |
| 59 | + self.authorisation_redirect_params = authorisation_redirect |
| 60 | + |
| 61 | + async def get_profile_info( |
| 62 | + self, auth_code_response: Dict[str, Any], user_context: Dict[str, Any] |
| 63 | + ) -> UserInfo: |
| 64 | + access_token: str = auth_code_response["access_token"] |
| 65 | + headers = {"Authorization": f"Bearer {access_token}"} |
| 66 | + async with AsyncClient() as client: |
| 67 | + response = await client.get(f"{self.gitlab_base_url}/api/v4/user", headers=headers) # type: ignore |
| 68 | + user_info = response.json() |
| 69 | + user_id = str(user_info["id"]) |
| 70 | + email = user_info.get("email") |
| 71 | + if email is None: |
| 72 | + return UserInfo(user_id) |
| 73 | + is_email_verified = user_info.get("confirmed_at") is not None |
| 74 | + return UserInfo(user_id, UserInfoEmail(email, is_email_verified)) |
| 75 | + |
| 76 | + def get_authorisation_redirect_api_info( |
| 77 | + self, user_context: Dict[str, Any] |
| 78 | + ) -> AuthorisationRedirectAPI: |
| 79 | + params = { |
| 80 | + "scope": " ".join(self.scopes), |
| 81 | + "response_type": "code", |
| 82 | + "client_id": self.client_id, |
| 83 | + **self.authorisation_redirect_params, |
| 84 | + } |
| 85 | + return AuthorisationRedirectAPI(self.authorisation_redirect_url, params) |
| 86 | + |
| 87 | + def get_access_token_api_info( |
| 88 | + self, |
| 89 | + redirect_uri: str, |
| 90 | + auth_code_from_request: str, |
| 91 | + user_context: Dict[str, Any], |
| 92 | + ) -> AccessTokenAPI: |
| 93 | + params = { |
| 94 | + "client_id": self.client_id, |
| 95 | + "client_secret": self.client_secret, |
| 96 | + "grant_type": "authorization_code", |
| 97 | + "code": auth_code_from_request, |
| 98 | + "redirect_uri": redirect_uri, |
| 99 | + } |
| 100 | + return AccessTokenAPI(self.access_token_api_url, params) |
| 101 | + |
| 102 | + def get_redirect_uri(self, user_context: Dict[str, Any]) -> Union[None, str]: |
| 103 | + return None |
| 104 | + |
| 105 | + def get_client_id(self, user_context: Dict[str, Any]) -> str: |
| 106 | + return self.client_id |
0 commit comments