1
1
import hashlib
2
+ import os
2
3
import secrets
3
4
4
5
import pytest
6
+ from id import (
7
+ detect_credential ,
8
+ )
5
9
6
10
from sigstore import dsse
11
+ from sigstore ._internal .rekor .client import STAGING_REKOR_URL
7
12
from sigstore ._internal .rekor .client_v2 import (
8
13
DEFAULT_KEY_DETAILS ,
9
- DEFAULT_REKOR_URL ,
10
- STAGING_REKOR_URL ,
11
14
Certificate ,
12
15
Hashed ,
13
16
LogEntry ,
17
20
v2 ,
18
21
v2_intoto ,
19
22
)
20
-
21
- # from sigstore.models import rekor_v1
23
+ from sigstore ._internal .trust import ClientTrustConfig
22
24
from sigstore ._utils import sha256_digest
23
- from sigstore .sign import ec
25
+ from sigstore .models import rekor_v1
26
+ from sigstore .oidc import _DEFAULT_AUDIENCE , IdentityToken
27
+ from sigstore .sign import SigningContext , ec
24
28
25
29
ALPHA_REKOR_V2_URL = "https://log2025-alpha1.rekor.sigstage.dev"
26
30
LOCAL_REKOR_V2_URL = "http://localhost:3000"
29
33
# TODO: add staging and production URLs when available,
30
34
# and local after using scaffolding/setup-sigstore-env action
31
35
@pytest .fixture (
36
+ scope = "session" ,
32
37
params = [
33
38
ALPHA_REKOR_V2_URL ,
34
39
pytest .param (STAGING_REKOR_URL , marks = pytest .mark .xfail ),
35
- pytest .param (DEFAULT_REKOR_URL , marks = pytest .mark .xfail ),
36
- pytest .param (LOCAL_REKOR_V2_URL , marks = pytest .mark .xfail ),
37
- ]
40
+ # pytest.param(DEFAULT_REKOR_URL, marks=pytest.mark.xfail),
41
+ # pytest.param(LOCAL_REKOR_V2_URL, marks=pytest.mark.xfail),
42
+ ],
38
43
)
39
44
def client (request ) -> RekorV2Client :
40
45
"""
@@ -44,37 +49,46 @@ def client(request) -> RekorV2Client:
44
49
return RekorV2Client (base_url = request .param )
45
50
46
51
47
- @pytest .fixture ()
48
- def sample_signer ( staging ) :
52
+ @pytest .fixture (scope = "session" )
53
+ def sample_cert_and_private_key () -> tuple [ Certificate , ec . EllipticCurvePrivateKey ] :
49
54
"""
50
- Returns a `Signer` .
55
+ Returns a sample Certificate and ec.EllipticCurvePrivateKey .
51
56
"""
52
- sign_ctx_cls , _ , id_token = staging
53
- with sign_ctx_cls ().signer (id_token ) as signer :
54
- return signer
57
+ # Detect env variable for local interactive tests.
58
+ token = os .getenv ("SIGSTORE_IDENTITY_TOKEN_staging" )
59
+ if not token :
60
+ # If the variable is not defined, try getting an ambient token.
61
+ token = detect_credential (_DEFAULT_AUDIENCE )
55
62
63
+ with SigningContext .from_trust_config (ClientTrustConfig .staging ()).signer (
64
+ IdentityToken (token )
65
+ ) as signer :
66
+ return signer ._signing_cert (), signer ._private_key
56
67
57
- @pytest .fixture ()
68
+
69
+ @pytest .fixture (scope = "session" )
58
70
def sample_hashed_rekord_request_materials (
59
- sample_signer ,
71
+ sample_cert_and_private_key ,
60
72
) -> tuple [Hashed , bytes , Certificate ]:
61
73
"""
62
74
Creates materials needed for `RekorV2Client._build_hashed_rekord_create_entry_request`.
63
75
"""
64
- cert = sample_signer . _signing_cert ()
76
+ cert , private_key = sample_cert_and_private_key
65
77
hashed_input = sha256_digest (secrets .token_bytes (32 ))
66
- signature = sample_signer . _private_key .sign (
78
+ signature = private_key .sign (
67
79
hashed_input .digest , ec .ECDSA (hashed_input ._as_prehashed ())
68
80
)
69
81
return hashed_input , signature , cert
70
82
71
83
72
- @pytest .fixture ()
73
- def sample_dsse_request_materials (sample_signer ) -> tuple [dsse .Envelope , Certificate ]:
84
+ @pytest .fixture (scope = "session" )
85
+ def sample_dsse_request_materials (
86
+ sample_cert_and_private_key ,
87
+ ) -> tuple [dsse .Envelope , Certificate ]:
74
88
"""
75
89
Creates materials needed for `RekorV2Client._build_dsse_create_entry_request`.
76
90
"""
77
- cert = sample_signer . _signing_cert ()
91
+ cert , private_key = sample_cert_and_private_key
78
92
stmt = (
79
93
dsse .StatementBuilder ()
80
94
.subjects (
@@ -92,49 +106,34 @@ def sample_dsse_request_materials(sample_signer) -> tuple[dsse.Envelope, Certifi
92
106
}
93
107
)
94
108
).build ()
95
- envelope = dsse ._sign (key = sample_signer . _private_key , stmt = stmt )
109
+ envelope = dsse ._sign (key = private_key , stmt = stmt )
96
110
return envelope , cert
97
111
98
112
99
- @pytest .fixture ()
113
+ @pytest .fixture (scope = "session" )
100
114
def sample_hashed_rekord_create_entry_request (
101
115
sample_hashed_rekord_request_materials ,
102
116
) -> v2 .CreateEntryRequest :
103
117
"""
104
118
Returns a sample `CreateEntryRequest` for for hashedrekor.
105
119
"""
106
120
hashed_input , signature , cert = sample_hashed_rekord_request_materials
107
- return RekorV2Client ._build_hashed_rekord_create_entry_request (
108
- artifact_hashed_input = hashed_input ,
109
- artifact_signature = signature ,
110
- signing_certificate = cert ,
121
+ return RekorV2Client ._build_hashed_rekord_request (
122
+ hashed_input = hashed_input ,
123
+ signature = signature ,
124
+ certificate = cert ,
111
125
)
112
126
113
127
114
- @pytest .fixture ()
128
+ @pytest .fixture (scope = "session" )
115
129
def sample_dsse_create_entry_request (
116
130
sample_dsse_request_materials ,
117
131
) -> v2 .CreateEntryRequest :
118
132
"""
119
133
Returns a sample `CreateEntryRequest` for for dsse.
120
134
"""
121
135
envelope , cert = sample_dsse_request_materials
122
- return RekorV2Client ._build_dsse_create_entry_request (
123
- envelope = envelope , signing_certificate = cert
124
- )
125
-
126
-
127
- @pytest .fixture (
128
- params = [
129
- sample_hashed_rekord_create_entry_request ,
130
- sample_dsse_create_entry_request ,
131
- ]
132
- )
133
- def sample_create_entry_request (request ) -> v2 .CreateEntryRequest :
134
- """
135
- Returns a sample `CreateEntryRequest`, for each of the the params in the supplied fixture.
136
- """
137
- return request .getfixturevalue (request .param .__name__ )
136
+ return RekorV2Client ._build_dsse_request (envelope = envelope , certificate = cert )
138
137
139
138
140
139
@pytest .mark .ambient_oidc
@@ -159,10 +158,10 @@ def test_build_hashed_rekord_create_entry_request(
159
158
),
160
159
)
161
160
)
162
- actual_request = RekorV2Client ._build_hashed_rekord_create_entry_request (
163
- artifact_hashed_input = hashed_input ,
164
- artifact_signature = signature ,
165
- signing_certificate = cert ,
161
+ actual_request = RekorV2Client ._build_hashed_rekord_request (
162
+ hashed_input = hashed_input ,
163
+ signature = signature ,
164
+ certificate = cert ,
166
165
)
167
166
assert expected_request == actual_request
168
167
@@ -196,18 +195,30 @@ def test_build_dsse_create_entry_request(sample_dsse_request_materials):
196
195
],
197
196
)
198
197
)
199
- actual_request = RekorV2Client ._build_dsse_create_entry_request (
200
- envelope = envelope , signing_certificate = cert
198
+ actual_request = RekorV2Client ._build_dsse_request (
199
+ envelope = envelope , certificate = cert
201
200
)
202
201
assert expected_request == actual_request
203
202
204
203
204
+ @pytest .mark .parametrize (
205
+ "sample_create_entry_request" ,
206
+ [
207
+ sample_hashed_rekord_create_entry_request .__name__ ,
208
+ sample_dsse_create_entry_request .__name__ ,
209
+ ],
210
+ )
205
211
@pytest .mark .ambient_oidc
206
- def test_create_entry (sample_create_entry_request , client ):
212
+ def test_create_entry (
213
+ request : pytest .FixtureRequest ,
214
+ sample_create_entry_request : str ,
215
+ client : RekorV2Client ,
216
+ ):
207
217
"""
208
218
Sends a request to RekorV2 and ensure's the response is parseable to a `LogEntry` and a `TransparencyLogEntry`.
209
219
"""
210
- log_entry = client .create_entry (sample_create_entry_request )
220
+ log_entry = client .create_entry (
221
+ request .getfixturevalue (sample_create_entry_request )
222
+ )
211
223
assert isinstance (log_entry , LogEntry )
212
- # TODO: Pending https://github.com/sigstore/sigstore-python/pull/1370
213
- # assert isinstance(log_entry._to_rekor(), rekor_v1.TransparencyLogEntry)
224
+ assert isinstance (log_entry ._to_rekor (), rekor_v1 .TransparencyLogEntry )
0 commit comments