Skip to content

Commit dbceaeb

Browse files
jkuramonpetgrave64woodruffw
authored
Parallel sign with fixed tests (#1485)
Co-authored-by: Ramon Petgrave <[email protected]> Co-authored-by: William Woodruff <[email protected]>
1 parent 3adc3d4 commit dbceaeb

File tree

7 files changed

+220
-108
lines changed

7 files changed

+220
-108
lines changed

sigstore/_cli.py

Lines changed: 78 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import logging
2121
import os
2222
import sys
23+
from concurrent import futures
2324
from dataclasses import dataclass
2425
from pathlib import Path
25-
from typing import Any, NoReturn, TextIO, Union
26+
from typing import Any, NoReturn, Union
2627

2728
from cryptography.hazmat.primitives.serialization import Encoding
2829
from cryptography.x509 import load_pem_x509_certificate
@@ -56,7 +57,7 @@
5657
Issuer,
5758
detect_credential,
5859
)
59-
from sigstore.sign import SigningContext
60+
from sigstore.sign import Signer, SigningContext
6061
from sigstore.verify import (
6162
Verifier,
6263
policy,
@@ -636,6 +637,57 @@ def _get_identity_token(args: argparse.Namespace) -> None:
636637
_invalid_arguments(args, "No identity token supplied or detected!")
637638

638639

640+
def _sign_file_threaded(
641+
signer: Signer,
642+
predicate_type: str | None,
643+
predicate: dict[str, Any] | None,
644+
file: Path,
645+
outputs: SigningOutputs,
646+
) -> None:
647+
"""sign method to be called from signing thread"""
648+
_logger.debug(f"signing for {file.name}")
649+
with file.open(mode="rb") as io:
650+
# The input can be indefinitely large, so we perform a streaming
651+
# digest and sign the prehash rather than buffering it fully.
652+
digest = sha256_digest(io)
653+
try:
654+
if predicate is None:
655+
result = signer.sign_artifact(input_=digest)
656+
else:
657+
subject = Subject(name=file.name, digest={"sha256": digest.digest.hex()})
658+
statement_builder = StatementBuilder(
659+
subjects=[subject],
660+
predicate_type=predicate_type,
661+
predicate=predicate,
662+
)
663+
result = signer.sign_dsse(statement_builder.build())
664+
except ExpiredIdentity as exp_identity:
665+
_logger.error("Signature failed: identity token has expired")
666+
raise exp_identity
667+
668+
except ExpiredCertificate as exp_certificate:
669+
_logger.error("Signature failed: Fulcio signing certificate has expired")
670+
raise exp_certificate
671+
672+
_logger.info(
673+
f"Transparency log entry created at index: {result.log_entry.log_index}"
674+
)
675+
676+
if outputs.signature is not None:
677+
signature = base64.b64encode(result.signature).decode()
678+
with outputs.signature.open(mode="w") as io:
679+
print(signature, file=io)
680+
681+
if outputs.certificate is not None:
682+
cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode()
683+
with outputs.certificate.open(mode="w") as io:
684+
print(cert_pem, file=io)
685+
686+
if outputs.bundle is not None:
687+
with outputs.bundle.open(mode="w") as io:
688+
print(result.to_json(), file=io)
689+
690+
639691
def _sign_common(
640692
args: argparse.Namespace, output_map: OutputMap, predicate: dict[str, Any] | None
641693
) -> None:
@@ -666,63 +718,37 @@ def _sign_common(
666718
if not identity:
667719
_invalid_arguments(args, "No identity token supplied or detected!")
668720

669-
with signing_ctx.signer(identity) as signer:
670-
for file, outputs in output_map.items():
671-
_logger.debug(f"signing for {file.name}")
672-
with file.open(mode="rb") as io:
673-
# The input can be indefinitely large, so we perform a streaming
674-
# digest and sign the prehash rather than buffering it fully.
675-
digest = sha256_digest(io)
676-
try:
677-
if predicate is None:
678-
result = signer.sign_artifact(input_=digest)
679-
else:
680-
subject = Subject(
681-
name=file.name, digest={"sha256": digest.digest.hex()}
682-
)
683-
predicate_type = args.predicate_type
684-
statement_builder = StatementBuilder(
685-
subjects=[subject],
686-
predicate_type=predicate_type,
687-
predicate=predicate,
688-
)
689-
result = signer.sign_dsse(statement_builder.build())
690-
except ExpiredIdentity as exp_identity:
691-
print("Signature failed: identity token has expired")
692-
raise exp_identity
693-
694-
except ExpiredCertificate as exp_certificate:
695-
print("Signature failed: Fulcio signing certificate has expired")
696-
raise exp_certificate
697-
698-
print("Using ephemeral certificate:")
699-
cert = result.signing_certificate
700-
cert_pem = cert.public_bytes(Encoding.PEM).decode()
701-
print(cert_pem)
702-
703-
print(
704-
f"Transparency log entry created at index: {result.log_entry.log_index}"
705-
)
721+
# Not all commands provide --predicate-type
722+
predicate_type = getattr(args, "predicate_type", None)
706723

707-
sig_output: TextIO
708-
if outputs.signature is not None:
709-
sig_output = outputs.signature.open("w")
710-
else:
711-
sig_output = sys.stdout
724+
with signing_ctx.signer(identity) as signer:
725+
print("Using ephemeral certificate:")
726+
cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode()
727+
print(cert_pem)
728+
729+
# sign in threads: this is relevant for especially Rekor v2 as otherwise we wait
730+
# for log inclusion for each signature separately
731+
with futures.ThreadPoolExecutor() as executor:
732+
jobs = [
733+
executor.submit(
734+
_sign_file_threaded,
735+
signer,
736+
predicate_type,
737+
predicate,
738+
file,
739+
outputs,
740+
)
741+
for file, outputs in output_map.items()
742+
]
743+
for job in futures.as_completed(jobs):
744+
job.result()
712745

713-
signature = base64.b64encode(result.signature).decode()
714-
print(signature, file=sig_output)
746+
for file, outputs in output_map.items():
715747
if outputs.signature is not None:
716748
print(f"Signature written to {outputs.signature}")
717-
718749
if outputs.certificate is not None:
719-
with outputs.certificate.open(mode="w") as io:
720-
print(cert_pem, file=io)
721750
print(f"Certificate written to {outputs.certificate}")
722-
723751
if outputs.bundle is not None:
724-
with outputs.bundle.open(mode="w") as io:
725-
print(result.to_json(), file=io)
726752
print(f"Sigstore bundle written to {outputs.bundle}")
727753

728754

sigstore/_internal/rekor/client.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,20 @@ def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo:
7373

7474

7575
class _Endpoint(ABC):
76-
def __init__(self, url: str, session: requests.Session) -> None:
76+
def __init__(self, url: str, session: requests.Session | None = None) -> None:
77+
# Note that _Endpoint may not be thread be safe if the same Session is provided
78+
# to an _Endpoint in multiple threads
7779
self.url = url
80+
if session is None:
81+
session = requests.Session()
82+
session.headers.update(
83+
{
84+
"Content-Type": "application/json",
85+
"Accept": "application/json",
86+
"User-Agent": USER_AGENT,
87+
}
88+
)
89+
7890
self.session = session
7991

8092

@@ -210,20 +222,6 @@ def __init__(self, url: str) -> None:
210222
Create a new `RekorClient` from the given URL.
211223
"""
212224
self.url = f"{url}/api/v1"
213-
self.session = requests.Session()
214-
self.session.headers.update(
215-
{
216-
"Content-Type": "application/json",
217-
"Accept": "application/json",
218-
"User-Agent": USER_AGENT,
219-
}
220-
)
221-
222-
def __del__(self) -> None:
223-
"""
224-
Terminates the underlying network session.
225-
"""
226-
self.session.close()
227225

228226
@classmethod
229227
def production(cls) -> RekorClient:
@@ -246,7 +244,8 @@ def log(self) -> RekorLog:
246244
"""
247245
Returns a `RekorLog` adapter for making requests to a Rekor log.
248246
"""
249-
return RekorLog(f"{self.url}/log", session=self.session)
247+
248+
return RekorLog(f"{self.url}/log")
250249

251250
def create_entry(self, request: EntryRequestBody) -> LogEntry:
252251
"""

sigstore/_internal/rekor/client_v2.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,6 @@ def __init__(self, base_url: str) -> None:
5454
Create a new `RekorV2Client` from the given URL.
5555
"""
5656
self.url = f"{base_url}/api/v2"
57-
self.session = requests.Session()
58-
self.session.headers.update(
59-
{
60-
"Content-Type": "application/json",
61-
"Accept": "application/json",
62-
"User-Agent": USER_AGENT,
63-
}
64-
)
65-
66-
def __del__(self) -> None:
67-
"""
68-
Terminates the underlying network session.
69-
"""
70-
self.session.close()
7157

7258
def create_entry(self, payload: EntryRequestBody) -> LogEntry:
7359
"""
@@ -78,7 +64,19 @@ def create_entry(self, payload: EntryRequestBody) -> LogEntry:
7864
https://github.com/sigstore/rekor-tiles/blob/main/CLIENTS.md#handling-longer-requests
7965
"""
8066
_logger.debug(f"proposed: {json.dumps(payload)}")
81-
resp = self.session.post(
67+
68+
# Use a short lived session to avoid potential issues with multi-threading:
69+
# Session thread-safety is ambiguous
70+
session = requests.Session()
71+
session.headers.update(
72+
{
73+
"Content-Type": "application/json",
74+
"Accept": "application/json",
75+
"User-Agent": USER_AGENT,
76+
}
77+
)
78+
79+
resp = session.post(
8280
f"{self.url}/log/entries",
8381
json=payload,
8482
)

sigstore/_internal/timestamp.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,6 @@ def __init__(self, url: str) -> None:
6868
Create a new `TimestampAuthorityClient` from the given URL.
6969
"""
7070
self.url = url
71-
self.session = requests.Session()
72-
self.session.headers.update(
73-
{
74-
"Content-Type": "application/timestamp-query",
75-
"User-Agent": USER_AGENT,
76-
}
77-
)
78-
79-
def __del__(self) -> None:
80-
"""
81-
Terminates the underlying network session.
82-
"""
83-
self.session.close()
8471

8572
def request_timestamp(self, signature: bytes) -> TimeStampResponse:
8673
"""
@@ -104,9 +91,18 @@ def request_timestamp(self, signature: bytes) -> TimeStampResponse:
10491
msg = f"invalid request: {error}"
10592
raise TimestampError(msg)
10693

94+
# Use single use session to avoid potential Session thread safety issues
95+
session = requests.Session()
96+
session.headers.update(
97+
{
98+
"Content-Type": "application/timestamp-query",
99+
"User-Agent": USER_AGENT,
100+
}
101+
)
102+
107103
# Send it to the TSA for signing
108104
try:
109-
response = self.session.post(
105+
response = session.post(
110106
self.url,
111107
data=timestamp_request.as_bytes(),
112108
timeout=CLIENT_TIMEOUT,

test/assets/integration/b.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
DO NOT MODIFY ME!
2+
3+
this is "b.txt", a sample input for sigstore-python's unit tests.
4+
5+
DO NOT MODIFY ME!

test/assets/integration/c.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
DO NOT MODIFY ME!
2+
3+
this is "c.txt", a sample input for sigstore-python's unit tests.
4+
5+
DO NOT MODIFY ME!

0 commit comments

Comments
 (0)