18
18
19
19
from __future__ import annotations
20
20
21
+ import logging
22
+ from collections import defaultdict
21
23
from collections .abc import Iterable
22
24
from dataclasses import dataclass
23
25
from datetime import datetime , timezone
46
48
)
47
49
from sigstore_protobuf_specs .dev .sigstore .trustroot .v1 import (
48
50
Service ,
51
+ ServiceConfiguration ,
49
52
ServiceSelector ,
50
53
TransparencyLogInstance ,
51
54
)
56
59
TrustedRoot as _TrustedRoot ,
57
60
)
58
61
62
+ from sigstore ._internal .fulcio .client import FulcioClient
63
+ from sigstore ._internal .rekor .client import RekorClient
64
+ from sigstore ._internal .timestamp import TimestampAuthorityClient
59
65
from sigstore ._internal .tuf import DEFAULT_TUF_URL , STAGING_TUF_URL , TrustUpdater
60
66
from sigstore ._utils import (
61
67
KeyID ,
66
72
)
67
73
from sigstore .errors import Error , MetadataError , TUFError , VerificationError
68
74
75
+ # Versions supported by this client
76
+ REKOR_VERSIONS = [1 ]
77
+ TSA_VERSIONS = [1 ]
78
+ FULCIO_VERSIONS = [1 ]
79
+ OIDC_VERSIONS = [1 ]
80
+
81
+ _logger = logging .getLogger (__name__ )
82
+
69
83
70
84
def _is_timerange_valid (period : TimeRange | None , * , allow_expired : bool ) -> bool :
71
85
"""
@@ -189,8 +203,11 @@ def __init__(self, public_keys: list[_PublicKey] = []):
189
203
self ._keyring : dict [KeyID , Key ] = {}
190
204
191
205
for public_key in public_keys :
192
- key = Key (public_key )
193
- self ._keyring [key .key_id ] = key
206
+ try :
207
+ key = Key (public_key )
208
+ self ._keyring [key .key_id ] = key
209
+ except VerificationError as e :
210
+ _logger .warning (f"Failed to load a trusted root key: { e } " )
194
211
195
212
def verify (self , * , key_id : KeyID , signature : bytes , data : bytes ) -> None :
196
213
"""
@@ -323,28 +340,34 @@ def __init__(self, inner: _SigningConfig):
323
340
@api private
324
341
"""
325
342
self ._inner = inner
326
- self ._verify ()
327
-
328
- def _verify (self ) -> None :
329
- """
330
- Performs various feats of heroism to ensure that the signing config
331
- is well-formed.
332
- """
333
343
334
344
# must have a recognized media type.
335
345
try :
336
346
SigningConfig .SigningConfigType (self ._inner .media_type )
337
347
except ValueError :
338
348
raise Error (f"unsupported signing config format: { self ._inner .media_type } " )
339
349
340
- # currently not supporting other select modes
341
- # TODO: Support other modes ensuring tsa_urls() and tlog_urls() work
342
- if self ._inner .rekor_tlog_config .selector != ServiceSelector .ANY :
343
- raise Error (
344
- f"unsupported tlog selector { self ._inner .rekor_tlog_config .selector } "
345
- )
346
- if self ._inner .tsa_config .selector != ServiceSelector .ANY :
347
- raise Error (f"unsupported TSA selector { self ._inner .tsa_config .selector } " )
350
+ # Create lists of service protos that are valid, selected by the service
351
+ # configuration & supported by this client
352
+ self ._tlogs = self ._get_valid_services (
353
+ self ._inner .rekor_tlog_urls , REKOR_VERSIONS , self ._inner .rekor_tlog_config
354
+ )
355
+ if not self ._tlogs :
356
+ raise Error ("No valid Rekor transparency log found in signing config" )
357
+
358
+ self ._tsas = self ._get_valid_services (
359
+ self ._inner .tsa_urls , TSA_VERSIONS , self ._inner .tsa_config
360
+ )
361
+
362
+ self ._fulcios = self ._get_valid_services (
363
+ self ._inner .ca_urls , FULCIO_VERSIONS , None
364
+ )
365
+ if not self ._fulcios :
366
+ raise Error ("No valid Fulcio CA found in signing config" )
367
+
368
+ self ._oidcs = self ._get_valid_services (
369
+ self ._inner .oidc_urls , OIDC_VERSIONS , None
370
+ )
348
371
349
372
@classmethod
350
373
def from_file (
@@ -356,54 +379,73 @@ def from_file(
356
379
return cls (inner )
357
380
358
381
@staticmethod
359
- def _get_valid_service_url (services : list [Service ]) -> str | None :
382
+ def _get_valid_services (
383
+ services : list [Service ],
384
+ supported_versions : list [int ],
385
+ config : ServiceConfiguration | None ,
386
+ ) -> list [Service ]:
387
+ """Return supported services, taking SigningConfig restrictions into account"""
388
+
389
+ # split services by operator, only include valid services
390
+ services_by_operator : dict [str , list [Service ]] = defaultdict (list )
360
391
for service in services :
361
- if service .major_api_version != 1 :
392
+ if service .major_api_version not in supported_versions :
362
393
continue
363
394
364
395
if not _is_timerange_valid (service .valid_for , allow_expired = False ):
365
396
continue
366
- return service .url
367
- return None
368
397
369
- def get_tlog_urls (self ) -> list [str ]:
398
+ services_by_operator [service .operator ].append (service )
399
+
400
+ # build a list of services but make sure we only include one service per operator
401
+ # and use the highest version available for that operator
402
+ result : list [Service ] = []
403
+ for op_services in services_by_operator .values ():
404
+ op_services .sort (key = lambda s : s .major_api_version )
405
+ result .append (op_services [- 1 ])
406
+
407
+ # Depending on ServiceSelector, prune the result list
408
+ if not config or config .selector == ServiceSelector .ALL :
409
+ return result
410
+
411
+ if config .selector == ServiceSelector .UNDEFINED :
412
+ raise ValueError ("Undefined is not a valid signing config ServiceSelector" )
413
+
414
+ # handle EXACT and ANY selectors
415
+ count = config .count if config .selector == ServiceSelector .EXACT else 1
416
+ if len (result ) < count :
417
+ raise ValueError (
418
+ f"Expected { count } services in signing config, found { len (result )} "
419
+ )
420
+
421
+ return result [:count ]
422
+
423
+ def get_tlogs (self ) -> list [RekorClient ]:
370
424
"""
371
- Returns the rekor transparency logs that client should sign with.
372
- Currently only returns a single one but could in future return several
425
+ Returns the rekor transparency log clients to sign with.
373
426
"""
427
+ return [RekorClient (tlog .url ) for tlog in self ._tlogs ]
374
428
375
- url = self ._get_valid_service_url (self ._inner .rekor_tlog_urls )
376
- if not url :
377
- raise Error ("No valid Rekor transparency log found in signing config" )
378
- return [url ]
379
-
380
- def get_fulcio_url (self ) -> str :
429
+ def get_fulcio (self ) -> FulcioClient :
381
430
"""
382
- Returns url for the fulcio instance that client should use to get a
383
- signing certificate from
431
+ Returns a Fulcio client to get a signing certificate from
384
432
"""
385
- url = self ._get_valid_service_url (self ._inner .ca_urls )
386
- if not url :
387
- raise Error ("No valid Fulcio CA found in signing config" )
388
- return url
433
+ return FulcioClient (self ._fulcios [0 ].url )
389
434
390
435
def get_oidc_url (self ) -> str :
391
436
"""
392
437
Returns url for the OIDC provider that client should use to interactively
393
438
authenticate.
394
439
"""
395
- url = self ._get_valid_service_url (self ._inner .oidc_urls )
396
- if not url :
440
+ if not self ._oidcs :
397
441
raise Error ("No valid OIDC provider found in signing config" )
398
- return url
442
+ return self . _oidcs [ 0 ]. url
399
443
400
- def get_tsa_urls (self ) -> list [str ]:
444
+ def get_tsas (self ) -> list [TimestampAuthorityClient ]:
401
445
"""
402
- Returns timestamp authority API end points. Currently returns a single one
403
- but may return more in future.
446
+ Returns timestamp authority clients for urls configured in signing config.
404
447
"""
405
- url = self ._get_valid_service_url (self ._inner .tsa_urls )
406
- return [] if url is None else [url ]
448
+ return [TimestampAuthorityClient (s .url ) for s in self ._tsas ]
407
449
408
450
409
451
class TrustedRoot :
0 commit comments