-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathcredentials.py
More file actions
741 lines (649 loc) · 27.6 KB
/
credentials.py
File metadata and controls
741 lines (649 loc) · 27.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
"""
Authentication for HTTP Getters
-------------------------------
In order for Logprep to choose the correct authentication method the
:code:`LOGPREP_CREDENTIALS_FILE` environment variable has to be set.
This file should provide the credentials that are needed and can either be
in yaml or in json format.
To use the authentication, the given credentials file has to be
filled with the correct values that correspond to the method you want to use.
.. code-block:: yaml
:caption: Example for credentials file
getter:
"http://target.url":
# example for token given directly via file
token_file: <path/to/token/file> # won't be refreshed if expired
"http://target.url":
# example for token given directly inline
token: <token> # won't be refreshed if expired
"http://target.url":
# example for OAuth2 Client Credentials Grant
endpoint: <endpoint>
client_id: <id>
client_secret_file: <path/to/secret/file>
"http://target.url":
# example for OAuth2 Client Credentials Grant with inline secret
endpoint: <endpoint>
client_id: <id>
client_secret: <secret>
"http://target.url":
# example for OAuth2 Resource Owner Password Credentials Grant with
# authentication for a confidential client
endpoint: <endpoint>
username: <username>
password_file: <path/to/password/file>
client_id: <client_id> # optional if required
client_secret_file: <path/to/secret/file> # optional if require
"http://target.url":
# example for OAuth2 Resource Owner Password Credentials Grant for a
# public not confidential client
endpoint: <endpoint>
username: <username>
password_file: <path/to/password/file>
"http://target.url":
# example for OAuth2 Resource Owner Password Credentials Grant for a
# public not confidential client with inline password
endpoint: <endpoint>
username: <username>
password: <password>
"http://target.url":
# example for Basic Authentication
username: <username>
password_file: <path/to/password/file>
"http://target.url":
# example for Basic Authentication with inline password
username: <username>
password: <plaintext password> # will be overwritten if 'password_file' is given
"http://target.url":
# example for mTLS authentication
client_key: <path/to/client/key/file>
cert: <path/to/certificate/file>
"http://target.url":
# example for mTLS authentication with ca cert given
client_key: <path/to/client/key/file>
cert: <path/to/certificate/file>
ca_cert: <path/to/ca/cert>
input:
endpoints:
/firstendpoint:
username: <username>
password_file: <path/to/password/file>
/second*:
username: <username>
password: <password>
Options for the credentials file are:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: logprep.util.credentials.BasicAuthCredentials
:members: username, password
:no-index:
.. autoclass:: logprep.util.credentials.OAuth2ClientFlowCredentials
:members: endpoint, client_id, client_secret
:no-index:
.. autoclass:: logprep.util.credentials.OAuth2PasswordFlowCredentials
:members: endpoint, client_id, client_secret, username, password
:no-index:
.. autoclass:: logprep.util.credentials.MTLSCredentials
:members: client_key, cert, ca_cert
:no-index:
Authentication Process:
^^^^^^^^^^^^^^^^^^^^^^^
.. figure:: ../_images/Credentials.svg
:align: left
"""
import json
import logging
import os
from base64 import b64encode
from datetime import datetime, timedelta
from pathlib import Path
from urllib.parse import urlparse
import requests
from attrs import define, field, validators
from requests import HTTPError, Session
from requests.adapters import HTTPAdapter
from ruamel.yaml import YAML
from ruamel.yaml.error import YAMLError
from urllib3 import Retry
from logprep.factory_error import InvalidConfigurationError
from logprep.util.defaults import ENV_NAME_LOGPREP_CREDENTIALS_FILE
yaml = YAML(typ="safe", pure=True)
class CredentialsBadRequestError(Exception):
"""Raised when the API returns a 400 Bad Request error"""
class CredentialsEnvNotFoundError(Exception):
"""Raised when the API returns a 401 Not Found"""
class CredentialsFactory:
"""Factory class to create credentials for a given target URL."""
_logger = logging.getLogger("Credentials")
@classmethod
def from_target(cls, target_url: str) -> "Credentials | None":
"""Factory method to create a credentials object based on the credentials stored in the
environment variable :code:`LOGPREP_CREDENTIALS_FILE`.
Based on these credentials the expected authentication method is chosen and represented
by the corresponding credentials object.
Parameters
----------
target_url : str
target against which to authenticate with the given credentials
Returns
-------
credentials: Credentials
Credentials object representing the correct authorization method
"""
credentials_file_path = os.environ.get(ENV_NAME_LOGPREP_CREDENTIALS_FILE)
if credentials_file_path is None:
return None
credentials_file: CredentialsFileSchema = cls.get_content(Path(credentials_file_path))
domain = urlparse(target_url).netloc
scheme = urlparse(target_url).scheme
credential_mapping = credentials_file.getter.get(f"{scheme}://{domain}")
credentials = cls.from_dict(credential_mapping)
return credentials
@classmethod
def from_endpoint(cls, target_endpoint: str) -> "list[Credentials] | Credentials | None":
"""Factory method to create a credentials object based on the credentials stored in the
environment variable :code:`LOGPREP_CREDENTIALS_FILE`.
Based on these credentials the expected authentication method is chosen and represented
by the corresponding credentials object.
Parameters
----------
target_endpoint : str
get authentication parameters for given target_endpoint
Returns
-------
credentials: Credentials
Credentials object representing the correct authorization method
"""
credentials_file_path = os.environ.get(ENV_NAME_LOGPREP_CREDENTIALS_FILE)
if credentials_file_path is None:
return None
credentials_file: CredentialsFileSchema = cls.get_content(Path(credentials_file_path))
endpoint_credentials = credentials_file.input.get("endpoints")
if endpoint_credentials is None:
return None
credential_mapping: list | dict | None = endpoint_credentials.get(target_endpoint)
credentials: list[Credentials] | Credentials | None = None
if isinstance(credential_mapping, dict):
credentials = cls.from_dict(credential_mapping)
elif isinstance(credential_mapping, list):
credentials = cls.from_list(credential_mapping)
return credentials
@staticmethod
def get_content(file_path: Path) -> " CredentialsFileSchema":
"""gets content from credentials file
file can be either json or yaml
Parameters
----------
file_path : Path
path to credentials file given in :code:`LOGPREP_CREDENTIALS_FILE`
Returns
-------
file_content: dict
content from file
Raises
------
InvalidConfigurationError
raises when credentials have wrong type or when credentials file
is invalid
"""
try:
file_content = file_path.read_text(encoding="utf-8")
try:
return CredentialsFileSchema(**json.loads(file_content))
except (json.JSONDecodeError, ValueError):
return CredentialsFileSchema(**yaml.load(file_content))
except (TypeError, YAMLError) as error:
raise InvalidConfigurationError(
f"Invalid credentials file: {file_path} {error.args[0]}"
) from error
except FileNotFoundError as error:
raise InvalidConfigurationError(
f"Environment variable has wrong credentials file path: {file_path}"
) from error
@staticmethod
def _resolve_secret_content(credential_mapping: dict):
"""gets content from given secret_file in credentials file and updates
credentials_mapping with this content.
This file should only contain the content of the given secret e.g. the client secret.
Parameters
----------
credentials_mapping : dict
content from given credentials mapping
"""
secret_content = {
credential_type.removesuffix("_file"): Path(credential_content).read_text(
encoding="utf-8"
)
for credential_type, credential_content in credential_mapping.items()
if "_file" in credential_type
}
for credential_type in secret_content:
credential_mapping.pop(f"{credential_type}_file")
credential_mapping.update(secret_content)
@classmethod
def from_list(cls, credential_mapping: list[dict | None]) -> "list[Credentials] | None":
creds: list[Credentials] = []
for credential in credential_mapping:
cred = cls.from_dict(credential)
if isinstance(cred, Credentials):
creds.append(cred)
return creds if len(creds) > 0 else None
@classmethod
def from_dict(cls, credential_mapping: dict | None) -> "Credentials | None":
"""matches the given credentials of the credentials mapping
with the expected credential object"""
if credential_mapping:
cls._resolve_secret_content(credential_mapping)
try:
return cls._match_credentials(credential_mapping)
except TypeError as error:
raise InvalidConfigurationError(
f"Wrong type in given credentials file on argument: {error.args[0]}"
) from error
@classmethod
def _match_credentials(cls, credential_mapping: dict | None) -> "Credentials | None":
"""matches the given credentials of a given mapping to the expected credential object
Parameters
----------
credential_mapping : dict
mapping of given credentials used for authentication against target
Returns
-------
Credentials
expected credentials object representing the correct authentication method
"""
credentials: "Credentials | None" = None
match credential_mapping:
case {"token": token, **extra_params}:
if extra_params:
cls._logger.warning(
"Other parameters were given: %s but OAuth token authorization was chosen",
extra_params.keys(),
)
credentials = OAuth2TokenCredentials(**{"token": token})
case {
"client_key": client_key,
"cert": cert,
"ca_cert": ca_cert,
**extra_params,
}:
if extra_params:
cls._logger.warning(
"Other parameters were given: %s but mTLS authorization was chosen",
extra_params.keys(),
)
kwargs = {"client_key": client_key, "cert": cert, "ca_cert": ca_cert}
credentials = MTLSCredentials(**kwargs)
case {
"client_key": client_key,
"cert": cert,
**extra_params,
}:
if extra_params:
cls._logger.warning(
"Other parameters were given: %s but mTLS authorization was chosen",
extra_params.keys(),
)
kwargs = {"client_key": client_key, "cert": cert}
credentials = MTLSCredentials(**kwargs)
case {
"endpoint": endpoint,
"client_id": client_id,
"client_secret": client_secret,
"username": username,
"password": password,
**extra_params,
}:
if extra_params:
cls._logger.warning(
"Other parameters were given: %s but"
"OAuth password authorization for confidential clients was chosen",
extra_params.keys(),
)
kwargs = {
"endpoint": endpoint,
"client_id": client_id,
"client_secret": client_secret,
"username": username,
"password": password,
}
credentials = OAuth2PasswordFlowCredentials(**kwargs)
case {
"endpoint": endpoint,
"client_id": client_id,
"client_secret": client_secret,
**extra_params,
}:
if extra_params:
cls._logger.warning(
"Other parameters were given: %s but OAuth client authorization was chosen",
extra_params.keys(),
)
kwargs = {
"endpoint": endpoint,
"client_id": client_id,
"client_secret": client_secret,
}
credentials = OAuth2ClientFlowCredentials(**kwargs)
case {
"endpoint": endpoint,
"username": username,
"password": password,
**extra_params,
}:
if extra_params:
cls._logger.warning(
"Other parameters were given: %s but"
"OAuth password authorization was chosen",
extra_params.keys(),
)
kwargs = {
"endpoint": endpoint,
"username": username,
"password": password,
}
credentials = OAuth2PasswordFlowCredentials(**kwargs)
case {"username": username, "password": password, **extra_params}:
if extra_params:
cls._logger.warning(
"Other parameters were given but Basic authentication was chosen: %s",
extra_params.keys(),
)
kwargs = {
"username": username,
"password": password,
}
credentials = BasicAuthCredentials(**kwargs)
case _:
cls._logger.warning("No matching credentials authentication could be found.")
credentials = None
return credentials
@define(kw_only=True)
class AccessToken:
"""A simple dataclass to hold the token and its expiry time."""
token: str = field(validator=validators.instance_of(str), repr=False)
"""token used for authentication against the target"""
expiry_time: datetime = field(
validator=validators.instance_of(datetime), init=False, default=datetime.now()
)
"""time when token is expired"""
refresh_token: str | None = field(
validator=validators.instance_of((str, type(None))), default=None, repr=False
)
"""is used incase the token is expired"""
expires_in: int = field(
validator=validators.instance_of(int),
default=0,
converter=lambda x: 0 if x is None else int(x),
)
"""time the token stays valid"""
def __attrs_post_init__(self):
self.expiry_time = datetime.now() + timedelta(seconds=self.expires_in)
def __str__(self) -> str:
return self.token
@property
def is_expired(self) -> bool:
"""Checks if the token is already expired."""
if self.expires_in == 0:
return False
return datetime.now() > self.expiry_time
@define(kw_only=True)
class Credentials:
"""Abstract Base Class for Credentials"""
_logger = logging.getLogger("Credentials")
_session: Session | None = field(
validator=validators.instance_of((Session, type(None))), default=None
)
def get_session(self):
"""returns session with retry configuration"""
if self._session is None:
self._session = Session()
max_retries = 3
retries = Retry(total=max_retries, status_forcelist=[500, 502, 503, 504])
self._session.mount("https://", HTTPAdapter(max_retries=retries))
self._session.mount("http://", HTTPAdapter(max_retries=retries))
return self._session
def _no_authorization_header(self, session):
"""checks if authorization header already exists in the given request session"""
return session.headers.get("Authorization") is None
def _handle_bad_requests_errors(self, response):
"""handles requests with status code 400 and raises Error
Parameters
----------
response : Response
signifies the response from the post request sent while retrieving the token
Raises
------
CredentialsBadRequestError
raises error with status code 400
"""
try:
response.raise_for_status()
except HTTPError as error:
if response.status_code == 400:
raise CredentialsBadRequestError(
"Authentication failed with status"
f" code 400 Bad Request: {response.json().get('error')}"
) from error
raise
@define(kw_only=True)
class CredentialsFileSchema:
"""class for credentials file"""
input: dict = field(
validator=[
validators.instance_of(dict),
validators.deep_mapping(
key_validator=validators.in_(["endpoints"]),
value_validator=validators.instance_of(dict),
),
],
default={"endpoints": {}},
)
getter: dict = field(validator=validators.instance_of(dict), default={})
@define(kw_only=True)
class BasicAuthCredentials(Credentials):
"""Basic Authentication Credentials
This is used for authenticating with Basic Authentication"""
username: str = field(validator=validators.instance_of(str))
"""The username for the basic authentication."""
password: str = field(validator=validators.instance_of(str), repr=False)
"""The password for the basic authentication."""
def get_session(self) -> Session:
"""the request session used for basic authentication containing the username and password
which are set as the authentication parameters
:meta private:
Returns
-------
session: Session
session with username and password used for the authentication
"""
session = super().get_session()
session.auth = (self.username, self.password)
return session
@define(kw_only=True)
class OAuth2TokenCredentials(Credentials):
"""OAuth2 Bearer Token Credentials
This is used for authenticating with an API that uses OAuth2 Bearer Tokens.
The Token is not refreshed automatically. If it expires, the requests will
fail with http status code `401`.
"""
token: AccessToken = field(
validator=validators.instance_of(AccessToken),
converter=lambda token: AccessToken(**{"token": token}),
repr=False,
)
"""The OAuth2 Bearer Token. This is used to authenticate."""
def get_session(self) -> Session:
"""request session with Bearer Token set in the authorization header"""
session = super().get_session()
session.headers["Authorization"] = f"Bearer {self.token}"
return session
@define(kw_only=True)
class OAuth2PasswordFlowCredentials(Credentials):
"""OAuth2 Resource Owner Password Credentials Grant as described in
https://datatracker.ietf.org/doc/html/rfc6749#section-4.3
Token refresh is implemented as described in
https://datatracker.ietf.org/doc/html/rfc6749#section-6
"""
endpoint: str = field(validator=validators.instance_of(str))
"""The token endpoint for the OAuth2 server. This is used to request the token."""
password: str = field(validator=validators.instance_of(str), repr=False)
"""the password for the token request"""
username: str = field(validator=validators.instance_of(str))
"""the username for the token request"""
timeout: int = field(validator=validators.instance_of(int), default=1)
"""The timeout for the token request. Defaults to 1 second."""
client_id: str | None = field(validator=validators.instance_of((str, type(None))), default=None)
"""The client id for the token request. This is used to identify the client. (Optional)"""
client_secret: str | None = field(
validator=validators.instance_of((str, type(None))), default=None, repr=False
)
"""The client secret for the token request.
This is used to authenticate the client. (Optional)"""
_token: AccessToken | None = field(
validator=validators.instance_of((AccessToken, type(None))),
init=False,
repr=False,
)
def get_session(self) -> Session:
session = super().get_session()
payload = None
if self._no_authorization_header(session):
payload = {
"grant_type": "password",
"username": self.username,
"password": self.password,
}
session.headers["Authorization"] = f"Bearer {self._get_token(payload)}"
if self._token and self._token.is_expired and self._token.refresh_token is not None:
session = Session()
payload = {
"grant_type": "refresh_token",
"refresh_token": self._token.refresh_token,
}
session.headers["Authorization"] = f"Bearer {self._get_token(payload)}"
self._session = session
return session
def _get_token(self, payload: dict[str, str]) -> AccessToken:
"""sends a post request containing the payload to the token endpoint to retrieve
the token.
If status code 400 is received a Bad Request Error is raised.
Parameters
----------
payload : dict[str, str]
contains credentials and the OAuth2 grant type for the given token endpoint to retrieve
the token
Returns
-------
_token: AccessToken
returns access token to be used, refresh token to be used when
token is expired and the expiry time of the given access token
"""
headers: dict[str, str] = {}
if self.client_id and self.client_secret:
client_secrets = b64encode(
f"{self.client_id}:{self.client_secret}".encode("utf-8")
).decode("utf-8")
headers |= {"Authorization": f"Basic {client_secrets}"}
response = requests.post(
url=self.endpoint,
data=payload,
timeout=self.timeout,
headers=headers,
)
self._handle_bad_requests_errors(response)
token_response = response.json()
access_token = token_response.get("access_token")
refresh_token = token_response.get("refresh_token")
expires_in = token_response.get("expires_in")
kwargs = {
"token": access_token,
"refresh_token": refresh_token,
"expires_in": expires_in,
}
self._token = AccessToken(**kwargs)
return self._token
@define(kw_only=True)
class OAuth2ClientFlowCredentials(Credentials):
"""OAuth2 Client Credentials Flow Implementation as described in
https://datatracker.ietf.org/doc/html/rfc6749#section-1.3.4
"""
endpoint: str = field(validator=validators.instance_of(str))
"""The token endpoint for the OAuth2 server. This is used to request the token."""
client_id: str = field(validator=validators.instance_of(str))
"""The client id for the token request. This is used to identify the client."""
client_secret: str = field(validator=validators.instance_of(str), repr=False)
"""The client secret for the token request. This is used to authenticate the client."""
timeout: int = field(validator=validators.instance_of(int), default=1)
"""The timeout for the token request. Defaults to 1 second."""
_token: AccessToken | None = field(
validator=validators.instance_of((AccessToken, type(None))), init=False, repr=False
)
def get_session(self) -> Session:
"""Retrieves or creates session with token in authorization header.
If no authorization header is set yet, a post request containing only
the grant type as payload is sent to the token endpoint given in the
credentials file to retrieve the token.
The client secret and a client id given in the credentials file are used to
authenticate against the token endpoint.
Returns
-------
Session
a request session with the retrieved token set in the authorization header
"""
session = super().get_session()
if "Authorization" in session.headers and (not self._token or self._token.is_expired):
session.close()
session = Session()
if self._no_authorization_header(session):
session.headers["Authorization"] = f"Bearer {self._get_token()}"
self._session = session
return session
def _get_token(self) -> AccessToken:
"""send post request to token endpoint
to retrieve access token using the client credentials grant.
If received status code is 400 a Bad Request Error is raised.
Returns
-------
_token: AccessToken
AccessToken object containing the token, the refresh token and the expiry time
"""
payload = {
"grant_type": "client_credentials",
}
client_secrets = b64encode(f"{self.client_id}:{self.client_secret}".encode("utf-8")).decode(
"utf-8"
)
headers = {"Authorization": f"Basic {client_secrets}"}
response = requests.post(
url=self.endpoint,
data=payload,
timeout=self.timeout,
headers=headers,
)
self._handle_bad_requests_errors(response)
token_response = response.json()
access_token = token_response.get("access_token")
expires_in = token_response.get("expires_in")
kwargs = {
"token": access_token,
"expires_in": expires_in,
}
self._token = AccessToken(**kwargs)
return self._token
@define(kw_only=True)
class MTLSCredentials(Credentials):
"""class for mTLS authentication"""
client_key: str = field(validator=validators.instance_of(str))
"""path to the client key"""
cert: str = field(validator=validators.instance_of(str))
"""path to the client certificate"""
ca_cert: str | None = field(validator=validators.instance_of((str, type(None))), default=None)
"""path to a certification authority certificate"""
def get_session(self):
session = super().get_session()
if session.cert is None:
cert = (self.cert, self.client_key)
session.cert = cert
if self.ca_cert:
session.verify = self.ca_cert
return session