35
35
Certificate ,
36
36
load_der_x509_certificate ,
37
37
)
38
- from sigstore_protobuf_specs .dev .sigstore .common .v1 import PublicKey as _PublicKey
39
- from sigstore_protobuf_specs .dev .sigstore .common .v1 import (
40
- PublicKeyDetails as _PublicKeyDetails ,
41
- )
42
- from sigstore_protobuf_specs .dev .sigstore .common .v1 import TimeRange
43
- from sigstore_protobuf_specs .dev .sigstore .trustroot .v1 import (
44
- CertificateAuthority as _CertificateAuthority ,
45
- )
46
- from sigstore_protobuf_specs .dev .sigstore .trustroot .v1 import (
47
- ClientTrustConfig as _ClientTrustConfig ,
48
- )
49
- from sigstore_protobuf_specs .dev .sigstore .trustroot .v1 import (
50
- Service ,
51
- ServiceConfiguration ,
52
- ServiceSelector ,
53
- TransparencyLogInstance ,
54
- )
55
- from sigstore_protobuf_specs .dev .sigstore .trustroot .v1 import (
56
- SigningConfig as _SigningConfig ,
57
- )
58
- from sigstore_protobuf_specs .dev .sigstore .trustroot .v1 import (
59
- TrustedRoot as _TrustedRoot ,
60
- )
38
+ from sigstore_models .common import v1 as common_v1
39
+ from sigstore_models .trustroot import v1 as trustroot_v1
61
40
62
41
from sigstore ._internal .fulcio .client import FulcioClient
63
42
from sigstore ._internal .rekor import RekorLogSubmitter
83
62
_logger = logging .getLogger (__name__ )
84
63
85
64
86
- def _is_timerange_valid (period : TimeRange | None , * , allow_expired : bool ) -> bool :
65
+ def _is_timerange_valid (
66
+ period : common_v1 .TimeRange | None , * , allow_expired : bool
67
+ ) -> bool :
87
68
"""
88
69
Given a `period`, checks that the the current time is not before `start`. If
89
70
`allow_expired` is `False`, also checks that the current time is not after
@@ -116,19 +97,19 @@ class Key:
116
97
key_id : KeyID
117
98
118
99
_RSA_SHA_256_DETAILS : ClassVar = {
119
- _PublicKeyDetails .PKCS1_RSA_PKCS1V5 ,
120
- _PublicKeyDetails .PKIX_RSA_PKCS1V15_2048_SHA256 ,
121
- _PublicKeyDetails .PKIX_RSA_PKCS1V15_3072_SHA256 ,
122
- _PublicKeyDetails .PKIX_RSA_PKCS1V15_4096_SHA256 ,
100
+ common_v1 . PublicKeyDetails .PKCS1_RSA_PKCS1V5 ,
101
+ common_v1 . PublicKeyDetails .PKIX_RSA_PKCS1V15_2048_SHA256 ,
102
+ common_v1 . PublicKeyDetails .PKIX_RSA_PKCS1V15_3072_SHA256 ,
103
+ common_v1 . PublicKeyDetails .PKIX_RSA_PKCS1V15_4096_SHA256 ,
123
104
}
124
105
125
106
_EC_DETAILS_TO_HASH : ClassVar = {
126
- _PublicKeyDetails .PKIX_ECDSA_P256_SHA_256 : hashes .SHA256 (),
127
- _PublicKeyDetails .PKIX_ECDSA_P384_SHA_384 : hashes .SHA384 (),
128
- _PublicKeyDetails .PKIX_ECDSA_P521_SHA_512 : hashes .SHA512 (),
107
+ common_v1 . PublicKeyDetails .PKIX_ECDSA_P256_SHA_256 : hashes .SHA256 (),
108
+ common_v1 . PublicKeyDetails .PKIX_ECDSA_P384_SHA_384 : hashes .SHA384 (),
109
+ common_v1 . PublicKeyDetails .PKIX_ECDSA_P521_SHA_512 : hashes .SHA512 (),
129
110
}
130
111
131
- def __init__ (self , public_key : _PublicKey ) -> None :
112
+ def __init__ (self , public_key : common_v1 . PublicKey ) -> None :
132
113
"""
133
114
Construct a key from the given Sigstore PublicKey message.
134
115
"""
@@ -147,7 +128,7 @@ def __init__(self, public_key: _PublicKey) -> None:
147
128
key = load_der_public_key (
148
129
public_key .raw_bytes , types = (ec .EllipticCurvePublicKey ,)
149
130
)
150
- elif public_key .key_details == _PublicKeyDetails .PKIX_ED25519 :
131
+ elif public_key .key_details == common_v1 . PublicKeyDetails .PKIX_ED25519 :
151
132
hash_algorithm = None
152
133
key = load_der_public_key (
153
134
public_key .raw_bytes , types = (ed25519 .Ed25519PublicKey ,)
@@ -198,7 +179,7 @@ class Keyring:
198
179
Represents a set of keys, each of which is a potentially valid verifier.
199
180
"""
200
181
201
- def __init__ (self , public_keys : list [_PublicKey ] = []):
182
+ def __init__ (self , public_keys : list [common_v1 . PublicKey ] = []):
202
183
"""
203
184
Create a new `Keyring`, with `keys` as the initial set of verifying keys.
204
185
"""
@@ -263,7 +244,7 @@ class CertificateAuthority:
263
244
Certificate Authority used in a Trusted Root configuration.
264
245
"""
265
246
266
- def __init__ (self , inner : _CertificateAuthority ):
247
+ def __init__ (self , inner : trustroot_v1 . CertificateAuthority ):
267
248
"""
268
249
Construct a new `CertificateAuthority`.
269
250
@@ -278,7 +259,7 @@ def from_json(cls, path: str) -> CertificateAuthority:
278
259
"""
279
260
Create a CertificateAuthority directly from JSON.
280
261
"""
281
- inner = _CertificateAuthority () .from_json (Path (path ).read_bytes ())
262
+ inner = trustroot_v1 . CertificateAuthority .from_json (Path (path ).read_bytes ())
282
263
return cls (inner )
283
264
284
265
def _verify (self ) -> None :
@@ -335,7 +316,7 @@ def __str__(self) -> str:
335
316
"""Returns the variant's string value."""
336
317
return self .value
337
318
338
- def __init__ (self , inner : _SigningConfig ):
319
+ def __init__ (self , inner : trustroot_v1 . SigningConfig ):
339
320
"""
340
321
Construct a new `SigningConfig`.
341
322
@@ -377,19 +358,19 @@ def from_file(
377
358
path : str ,
378
359
) -> SigningConfig :
379
360
"""Create a new signing config from file"""
380
- inner = _SigningConfig () .from_json (Path (path ).read_bytes ())
361
+ inner = trustroot_v1 . SigningConfig .from_json (Path (path ).read_bytes ())
381
362
return cls (inner )
382
363
383
364
@staticmethod
384
365
def _get_valid_services (
385
- services : list [Service ],
366
+ services : list [trustroot_v1 . Service ],
386
367
supported_versions : list [int ],
387
- config : ServiceConfiguration | None ,
388
- ) -> list [Service ]:
368
+ config : trustroot_v1 . ServiceConfiguration | None ,
369
+ ) -> list [trustroot_v1 . Service ]:
389
370
"""Return supported services, taking SigningConfig restrictions into account"""
390
371
391
372
# split services by operator, only include valid services
392
- services_by_operator : dict [str , list [Service ]] = defaultdict (list )
373
+ services_by_operator : dict [str , list [trustroot_v1 . Service ]] = defaultdict (list )
393
374
for service in services :
394
375
if service .major_api_version not in supported_versions :
395
376
continue
@@ -401,20 +382,19 @@ def _get_valid_services(
401
382
402
383
# build a list of services but make sure we only include one service per operator
403
384
# and use the highest version available for that operator
404
- result : list [Service ] = []
385
+ result : list [trustroot_v1 . Service ] = []
405
386
for op_services in services_by_operator .values ():
406
387
op_services .sort (key = lambda s : s .major_api_version )
407
388
result .append (op_services [- 1 ])
408
389
409
390
# Depending on ServiceSelector, prune the result list
410
- if not config or config .selector == ServiceSelector .ALL :
391
+ if not config or config .selector == trustroot_v1 . ServiceSelector .ALL :
411
392
return result
412
393
413
- if config .selector == ServiceSelector .UNDEFINED :
414
- raise ValueError ("Undefined is not a valid signing config ServiceSelector" )
415
-
416
394
# handle EXACT and ANY selectors
417
- count = config .count if config .selector == ServiceSelector .EXACT else 1
395
+ count = (
396
+ config .count if config .selector == trustroot_v1 .ServiceSelector .EXACT else 1
397
+ )
418
398
if len (result ) < count :
419
399
raise ValueError (
420
400
f"Expected { count } services in signing config, found { len (result )} "
@@ -474,7 +454,7 @@ def __str__(self) -> str:
474
454
"""Returns the variant's string value."""
475
455
return self .value
476
456
477
- def __init__ (self , inner : _TrustedRoot ):
457
+ def __init__ (self , inner : trustroot_v1 . TrustedRoot ):
478
458
"""
479
459
Construct a new `TrustedRoot`.
480
460
@@ -501,12 +481,12 @@ def from_file(
501
481
path : str ,
502
482
) -> TrustedRoot :
503
483
"""Create a new trust root from file"""
504
- inner = _TrustedRoot () .from_json (Path (path ).read_bytes ())
484
+ inner = trustroot_v1 . TrustedRoot .from_json (Path (path ).read_bytes ())
505
485
return cls (inner )
506
486
507
487
def _get_tlog_keys (
508
- self , tlogs : list [TransparencyLogInstance ], purpose : KeyringPurpose
509
- ) -> Iterable [_PublicKey ]:
488
+ self , tlogs : list [trustroot_v1 . TransparencyLogInstance ], purpose : KeyringPurpose
489
+ ) -> Iterable [common_v1 . PublicKey ]:
510
490
"""
511
491
Yields an iterator of public keys for transparency log instances that
512
492
are suitable for `purpose`.
@@ -523,14 +503,18 @@ def _get_tlog_keys(
523
503
def rekor_keyring (self , purpose : KeyringPurpose ) -> RekorKeyring :
524
504
"""Return keyring with keys for Rekor."""
525
505
526
- keys : list [_PublicKey ] = list (self ._get_tlog_keys (self ._inner .tlogs , purpose ))
506
+ keys : list [common_v1 .PublicKey ] = list (
507
+ self ._get_tlog_keys (self ._inner .tlogs , purpose )
508
+ )
527
509
if len (keys ) == 0 :
528
510
raise MetadataError ("Did not find any Rekor keys in trusted root" )
529
511
return RekorKeyring (Keyring (keys ))
530
512
531
513
def ct_keyring (self , purpose : KeyringPurpose ) -> CTKeyring :
532
514
"""Return keyring with key for CTFE."""
533
- ctfes : list [_PublicKey ] = list (self ._get_tlog_keys (self ._inner .ctlogs , purpose ))
515
+ ctfes : list [common_v1 .PublicKey ] = list (
516
+ self ._get_tlog_keys (self ._inner .ctlogs , purpose )
517
+ )
534
518
if not ctfes :
535
519
raise MetadataError ("CTFE keys not found in trusted root" )
536
520
return CTKeyring (Keyring (ctfes ))
@@ -585,7 +569,7 @@ def from_json(cls, raw: str) -> ClientTrustConfig:
585
569
"""
586
570
Deserialize the given client trust config.
587
571
"""
588
- inner = _ClientTrustConfig () .from_json (raw )
572
+ inner = trustroot_v1 . ClientTrustConfig .from_json (raw )
589
573
return cls (inner )
590
574
591
575
@classmethod
@@ -626,48 +610,33 @@ def from_tuf(
626
610
updater = TrustUpdater (url , offline )
627
611
628
612
tr_path = updater .get_trusted_root_path ()
629
- inner_tr = _TrustedRoot () .from_json (Path (tr_path ).read_bytes ())
613
+ inner_tr = trustroot_v1 . TrustedRoot .from_json (Path (tr_path ).read_bytes ())
630
614
631
615
try :
632
616
sc_path = updater .get_signing_config_path ()
633
- inner_sc = _SigningConfig () .from_json (Path (sc_path ).read_bytes ())
617
+ inner_sc = trustroot_v1 . SigningConfig .from_json (Path (sc_path ).read_bytes ())
634
618
except TUFError as e :
635
619
# TUF repo may not have signing config yet: hard code values for prod:
636
620
# https://github.com/sigstore/sigstore-python/issues/1388
637
621
if url == DEFAULT_TUF_URL :
638
622
embedded = read_embedded ("signing_config.v0.2.json" , url )
639
- inner_sc = _SigningConfig () .from_json (embedded )
623
+ inner_sc = trustroot_v1 . SigningConfig .from_json (embedded )
640
624
else :
641
625
raise e
642
626
643
627
return cls (
644
- _ClientTrustConfig (
645
- ClientTrustConfig .ClientTrustConfigType .CONFIG_0_1 ,
646
- inner_tr ,
647
- inner_sc ,
628
+ trustroot_v1 . ClientTrustConfig (
629
+ media_type = ClientTrustConfig .ClientTrustConfigType .CONFIG_0_1 . value ,
630
+ trusted_root = inner_tr ,
631
+ signing_config = inner_sc ,
648
632
)
649
633
)
650
634
651
- def __init__ (self , inner : _ClientTrustConfig ) -> None :
635
+ def __init__ (self , inner : trustroot_v1 . ClientTrustConfig ) -> None :
652
636
"""
653
637
@api private
654
638
"""
655
639
self ._inner = inner
656
- self ._verify ()
657
-
658
- def _verify (self ) -> None :
659
- """
660
- Performs various feats of heroism to ensure that the client trust config
661
- is well-formed.
662
- """
663
-
664
- # The client trust config must have a recognized media type.
665
- try :
666
- ClientTrustConfig .ClientTrustConfigType (self ._inner .media_type )
667
- except ValueError :
668
- raise Error (
669
- f"unsupported client trust config format: { self ._inner .media_type } "
670
- )
671
640
672
641
@property
673
642
def trusted_root (self ) -> TrustedRoot :
0 commit comments