Skip to content

Commit 71f5eec

Browse files
committed
CM-55782: Fixed comments
1 parent ef69a11 commit 71f5eec

File tree

4 files changed

+122
-166
lines changed

4 files changed

+122
-166
lines changed

cycode/cli/utils/get_api_client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ def _get_cycode_client(
2020
hide_response_log: bool,
2121
id_token: Optional[str] = None,
2222
) -> Union['ScanClient', 'ReportClient']:
23-
if id_token:
24-
if not client_id:
25-
raise click.ClickException('Cycode client id needed for OIDC authentication.')
23+
if client_id and id_token:
2624
return create_client_func(client_id, None, hide_response_log, id_token)
2725

28-
if not client_id or not client_secret:
26+
if not client_id or not id_token:
2927
oidc_client_id, oidc_id_token = _get_configured_oidc_credentials()
3028
if oidc_client_id and oidc_id_token:
3129
return create_client_func(oidc_client_id, None, hide_response_log, oidc_id_token)
30+
if oidc_id_token and not oidc_client_id:
31+
raise click.ClickException('Cycode client id needed for OIDC authentication.')
3232

33+
if not client_id or not client_secret:
3334
client_id, client_secret = _get_configured_credentials()
3435
if not client_id:
3536
raise click.ClickException('Cycode client id needed.')
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from abc import ABC, abstractmethod
2+
from threading import Lock
3+
from typing import Any, Optional
4+
5+
import arrow
6+
from requests import Response
7+
8+
from cycode.cli.user_settings.credentials_manager import CredentialsManager
9+
from cycode.cli.user_settings.jwt_creator import JwtCreator
10+
from cycode.cyclient.cycode_client import CycodeClient
11+
12+
_NGINX_PLAIN_ERRORS = [
13+
b'Invalid JWT Token',
14+
b'JWT Token Needed',
15+
b'JWT Token validation failed',
16+
]
17+
18+
19+
class BaseTokenAuthClient(CycodeClient, ABC):
20+
"""Base client for token-based authentication flows with cached JWTs."""
21+
22+
def __init__(self, client_id: str) -> None:
23+
super().__init__()
24+
self.client_id = client_id
25+
26+
self._credentials_manager = CredentialsManager()
27+
# load cached access token
28+
access_token, expires_in, creator = self._credentials_manager.get_access_token()
29+
30+
self._access_token = self._expires_in = None
31+
expected_creator = self._create_jwt_creator()
32+
if creator == expected_creator:
33+
# we must be sure that cached access token is created using the same client id and client secret.
34+
# because client id and client secret could be passed via command, via env vars or via config file.
35+
# we must not use cached access token if client id or client secret was changed.
36+
self._access_token = access_token
37+
self._expires_in = arrow.get(expires_in) if expires_in else None
38+
39+
self._lock = Lock()
40+
41+
def get_access_token(self) -> str:
42+
with self._lock:
43+
self.refresh_access_token_if_needed()
44+
return self._access_token
45+
46+
def invalidate_access_token(self, in_storage: bool = False) -> None:
47+
self._access_token = None
48+
self._expires_in = None
49+
50+
if in_storage:
51+
self._credentials_manager.update_access_token(None, None, None)
52+
53+
def refresh_access_token_if_needed(self) -> None:
54+
if self._access_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in:
55+
self.refresh_access_token()
56+
57+
def refresh_access_token(self) -> None:
58+
auth_response = self._request_new_access_token()
59+
self._access_token = auth_response['token']
60+
61+
self._expires_in = arrow.utcnow().shift(seconds=auth_response['expires_in'] * 0.8)
62+
63+
jwt_creator = self._create_jwt_creator()
64+
self._credentials_manager.update_access_token(self._access_token, self._expires_in.timestamp(), jwt_creator)
65+
66+
def get_request_headers(self, additional_headers: Optional[dict] = None, without_auth: bool = False) -> dict:
67+
headers = super().get_request_headers(additional_headers=additional_headers)
68+
69+
if not without_auth:
70+
headers = self._add_auth_header(headers)
71+
72+
return headers
73+
74+
def _add_auth_header(self, headers: dict) -> dict:
75+
headers['Authorization'] = f'Bearer {self.get_access_token()}'
76+
return headers
77+
78+
def _execute(
79+
self,
80+
*args,
81+
**kwargs,
82+
) -> Response:
83+
response = super()._execute(*args, **kwargs)
84+
85+
# backend returns 200 and plain text. no way to catch it with .raise_for_status()
86+
nginx_error_response = any(response.content.startswith(plain_error) for plain_error in _NGINX_PLAIN_ERRORS)
87+
if response.status_code == 200 and nginx_error_response:
88+
# if cached token is invalid, try to refresh it and retry the request
89+
self.refresh_access_token()
90+
response = super()._execute(*args, **kwargs)
91+
92+
return response
93+
94+
@abstractmethod
95+
def _create_jwt_creator(self) -> JwtCreator:
96+
"""Create a JwtCreator instance for the current credential type."""
97+
98+
@abstractmethod
99+
def _request_new_access_token(self) -> dict[str, Any]:
100+
"""Return the authentication payload with token and expires_in."""
101+
Lines changed: 8 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,24 @@
1-
from threading import Lock
2-
from typing import Optional
1+
from typing import Any
32

4-
import arrow
5-
from requests import Response
6-
7-
from cycode.cli.user_settings.credentials_manager import CredentialsManager
83
from cycode.cli.user_settings.jwt_creator import JwtCreator
9-
from cycode.cyclient.cycode_client import CycodeClient
10-
11-
_NGINX_PLAIN_ERRORS = [
12-
b'Invalid JWT Token',
13-
b'JWT Token Needed',
14-
b'JWT Token validation failed',
15-
]
4+
from cycode.cyclient.base_token_auth_client import BaseTokenAuthClient
165

176

18-
class CycodeOidcBasedClient(CycodeClient):
7+
class CycodeOidcBasedClient(BaseTokenAuthClient):
198
"""Send requests with JWT obtained via OIDC ID token."""
209

2110
def __init__(self, client_id: str, id_token: str) -> None:
22-
super().__init__()
23-
self.client_id = client_id
2411
self.id_token = id_token
12+
super().__init__(client_id)
2513

26-
self._credentials_manager = CredentialsManager()
27-
# load cached access token
28-
access_token, expires_in, creator = self._credentials_manager.get_access_token()
29-
30-
self._access_token = self._expires_in = None
31-
if creator == JwtCreator.create(client_id, id_token):
32-
# we must be sure that cached access token is created using the same client id and client secret.
33-
# because client id and client secret could be passed via command, via env vars or via config file.
34-
# we must not use cached access token if client id or client secret was changed.
35-
self._access_token = access_token
36-
self._expires_in = arrow.get(expires_in) if expires_in else None
37-
38-
self._lock = Lock()
39-
40-
def get_access_token(self) -> str:
41-
with self._lock:
42-
self.refresh_access_token_if_needed()
43-
return self._access_token
44-
45-
def invalidate_access_token(self, in_storage: bool = False) -> None:
46-
self._access_token = None
47-
self._expires_in = None
48-
49-
if in_storage:
50-
self._credentials_manager.update_access_token(None, None, None)
51-
52-
def refresh_access_token_if_needed(self) -> None:
53-
if self._access_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in:
54-
self.refresh_access_token()
55-
56-
def refresh_access_token(self) -> None:
14+
def _request_new_access_token(self) -> dict[str, Any]:
5715
auth_response = self.post(
5816
url_path='api/v1/auth/oidc/api-token',
5917
body={'client_id': self.client_id, 'id_token': self.id_token},
6018
without_auth=True,
6119
hide_response_content_log=True,
6220
)
63-
auth_response_data = auth_response.json()
64-
65-
self._access_token = auth_response_data['token']
66-
self._expires_in = arrow.utcnow().shift(seconds=auth_response_data['expires_in'] * 0.8)
67-
68-
jwt_creator = JwtCreator.create(self.client_id, self.id_token)
69-
self._credentials_manager.update_access_token(self._access_token, self._expires_in.timestamp(), jwt_creator)
70-
71-
def get_request_headers(self, additional_headers: Optional[dict] = None, without_auth: bool = False) -> dict:
72-
headers = super().get_request_headers(additional_headers=additional_headers)
73-
74-
if not without_auth:
75-
headers = self._add_auth_header(headers)
76-
77-
return headers
78-
79-
def _add_auth_header(self, headers: dict) -> dict:
80-
headers['Authorization'] = f'Bearer {self.get_access_token()}'
81-
return headers
82-
83-
def _execute(
84-
self,
85-
*args,
86-
**kwargs,
87-
) -> Response:
88-
response = super()._execute(*args, **kwargs)
89-
90-
# backend returns 200 and plain text. no way to catch it with .raise_for_status()
91-
nginx_error_response = any(response.content.startswith(plain_error) for plain_error in _NGINX_PLAIN_ERRORS)
92-
if response.status_code == 200 and nginx_error_response:
93-
# if cached token is invalid, try to refresh it and retry the request
94-
self.refresh_access_token()
95-
response = super()._execute(*args, **kwargs)
21+
return auth_response.json()
9622

97-
return response
23+
def _create_jwt_creator(self) -> JwtCreator:
24+
return JwtCreator.create(self.client_id, self.id_token)
Lines changed: 8 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,24 @@
1-
from threading import Lock
2-
from typing import Optional
1+
from typing import Any
32

4-
import arrow
5-
from requests import Response
6-
7-
from cycode.cli.user_settings.credentials_manager import CredentialsManager
83
from cycode.cli.user_settings.jwt_creator import JwtCreator
9-
from cycode.cyclient.cycode_client import CycodeClient
10-
11-
_NGINX_PLAIN_ERRORS = [
12-
b'Invalid JWT Token',
13-
b'JWT Token Needed',
14-
b'JWT Token validation failed',
15-
]
4+
from cycode.cyclient.base_token_auth_client import BaseTokenAuthClient
165

176

18-
class CycodeTokenBasedClient(CycodeClient):
7+
class CycodeTokenBasedClient(BaseTokenAuthClient):
198
"""Send requests with JWT."""
209

2110
def __init__(self, client_id: str, client_secret: str) -> None:
22-
super().__init__()
2311
self.client_secret = client_secret
24-
self.client_id = client_id
25-
26-
self._credentials_manager = CredentialsManager()
27-
# load cached access token
28-
access_token, expires_in, creator = self._credentials_manager.get_access_token()
29-
30-
self._access_token = self._expires_in = None
31-
if creator == JwtCreator.create(client_id, client_secret):
32-
# we must be sure that cached access token is created using the same client id and client secret.
33-
# because client id and client secret could be passed via command, via env vars or via config file.
34-
# we must not use cached access token if client id or client secret was changed.
35-
self._access_token = access_token
36-
self._expires_in = arrow.get(expires_in) if expires_in else None
37-
38-
self._lock = Lock()
39-
40-
def get_access_token(self) -> str:
41-
with self._lock:
42-
self.refresh_access_token_if_needed()
43-
return self._access_token
44-
45-
def invalidate_access_token(self, in_storage: bool = False) -> None:
46-
self._access_token = None
47-
self._expires_in = None
48-
49-
if in_storage:
50-
self._credentials_manager.update_access_token(None, None, None)
51-
52-
def refresh_access_token_if_needed(self) -> None:
53-
if self._access_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in:
54-
self.refresh_access_token()
12+
super().__init__(client_id)
5513

56-
def refresh_access_token(self) -> None:
14+
def _request_new_access_token(self) -> dict[str, Any]:
5715
auth_response = self.post(
5816
url_path='api/v1/auth/api-token',
5917
body={'clientId': self.client_id, 'secret': self.client_secret},
6018
without_auth=True,
6119
hide_response_content_log=True,
6220
)
63-
auth_response_data = auth_response.json()
64-
65-
self._access_token = auth_response_data['token']
66-
self._expires_in = arrow.utcnow().shift(seconds=auth_response_data['expires_in'] * 0.8)
67-
68-
jwt_creator = JwtCreator.create(self.client_id, self.client_secret)
69-
self._credentials_manager.update_access_token(self._access_token, self._expires_in.timestamp(), jwt_creator)
70-
71-
def get_request_headers(self, additional_headers: Optional[dict] = None, without_auth: bool = False) -> dict:
72-
headers = super().get_request_headers(additional_headers=additional_headers)
73-
74-
if not without_auth:
75-
headers = self._add_auth_header(headers)
76-
77-
return headers
78-
79-
def _add_auth_header(self, headers: dict) -> dict:
80-
headers['Authorization'] = f'Bearer {self.get_access_token()}'
81-
return headers
82-
83-
def _execute(
84-
self,
85-
*args,
86-
**kwargs,
87-
) -> Response:
88-
response = super()._execute(*args, **kwargs)
89-
90-
# backend returns 200 and plain text. no way to catch it with .raise_for_status()
91-
nginx_error_response = any(response.content.startswith(plain_error) for plain_error in _NGINX_PLAIN_ERRORS)
92-
if response.status_code == 200 and nginx_error_response:
93-
# if cached token is invalid, try to refresh it and retry the request
94-
self.refresh_access_token()
95-
response = super()._execute(*args, **kwargs)
21+
return auth_response.json()
9622

97-
return response
23+
def _create_jwt_creator(self) -> JwtCreator:
24+
return JwtCreator.create(self.client_id, self.client_secret)

0 commit comments

Comments
 (0)