Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies = [
"pydantic >= 2,< 3",
"pyjwt >= 2.1",
"pyOpenSSL >= 23.0.0",
"requests",
"urllib3 >= 2.6.3",
"rich >= 13,< 15",
"rfc8785 ~= 0.1.2",
"rfc3161-client >= 1.0.3,< 1.1.0",
Expand Down
6 changes: 2 additions & 4 deletions sigstore/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
subject to any stability guarantees.
"""

from requests import __version__ as requests_version
from sigstore._internal.http import USER_AGENT

from sigstore import __version__ as sigstore_version

USER_AGENT = f"sigstore-python/{sigstore_version} (python-requests/{requests_version})"
__all__ = ["USER_AGENT"]
47 changes: 16 additions & 31 deletions sigstore/_internal/fulcio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@
from dataclasses import dataclass
from urllib.parse import urljoin

import requests
from cryptography.hazmat.primitives import serialization
from cryptography.x509 import (
Certificate,
CertificateSigningRequest,
load_pem_x509_certificate,
)

from sigstore._internal import USER_AGENT
from sigstore._internal import http
from sigstore._utils import B64Str
from sigstore.oidc import IdentityToken

Expand Down Expand Up @@ -71,9 +70,8 @@ class FulcioClientError(Exception):


class _Endpoint(ABC):
def __init__(self, url: str, session: requests.Session) -> None:
def __init__(self, url: str) -> None:
self.url = url
self.session = session


def _serialize_cert_request(req: CertificateSigningRequest) -> str:
Expand Down Expand Up @@ -102,17 +100,20 @@ def post(
"Content-Type": "application/json",
"Accept": "application/pem-certificate-chain",
}
resp: requests.Response = self.session.post(
url=self.url, data=_serialize_cert_request(req), headers=headers
resp = http.post(
url=self.url, data=_serialize_cert_request(req).encode(), headers=headers
)
try:
resp.raise_for_status()
except requests.HTTPError as http_error:
except http.HTTPError as http_error:
# See if we can optionally add a message
if http_error.response:
text = json.loads(http_error.response.text)
if "message" in http_error.response.text:
raise FulcioClientError(text["message"]) from http_error
if http_error.body:
try:
text = json.loads(http_error.body)
if "message" in text:
raise FulcioClientError(text["message"]) from http_error
except (json.JSONDecodeError, KeyError):
pass
raise FulcioClientError from http_error

try:
Expand Down Expand Up @@ -141,10 +142,10 @@ class FulcioTrustBundle(_Endpoint):

def get(self) -> FulcioTrustBundleResponse:
"""Get the certificate chains from Fulcio"""
resp: requests.Response = self.session.get(self.url)
resp = http.get(self.url)
try:
resp.raise_for_status()
except requests.HTTPError as http_error:
except http.HTTPError as http_error:
raise FulcioClientError from http_error

trust_bundle_json = resp.json()
Expand All @@ -165,33 +166,17 @@ def __init__(self, url: str) -> None:
"""Initialize the client"""
_logger.debug(f"Fulcio client using URL: {url}")
self.url = url
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": USER_AGENT,
}
)

def __del__(self) -> None:
"""
Destroys the underlying network session.
"""
self.session.close()

@property
def signing_cert(self) -> FulcioSigningCert:
"""
Returns a model capable of interacting with Fulcio's signing certificate endpoints.
"""
return FulcioSigningCert(
urljoin(self.url, SIGNING_CERT_ENDPOINT), session=self.session
)
return FulcioSigningCert(urljoin(self.url, SIGNING_CERT_ENDPOINT))

@property
def trust_bundle(self) -> FulcioTrustBundle:
"""
Returns a model capable of interacting with Fulcio's trust bundle endpoints.
"""
return FulcioTrustBundle(
urljoin(self.url, TRUST_BUNDLE_ENDPOINT), session=self.session
)
return FulcioTrustBundle(urljoin(self.url, TRUST_BUNDLE_ENDPOINT))
204 changes: 204 additions & 0 deletions sigstore/_internal/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Copyright 2022 The Sigstore Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
HTTP client utilities for sigstore-python using urllib3.
"""

from __future__ import annotations

import json
from typing import Any

import urllib3

from sigstore import __version__ as sigstore_version

# Global PoolManager for all HTTP requests
_pool_manager: urllib3.PoolManager | None = None

# User-Agent header for all requests
USER_AGENT = f"sigstore-python/{sigstore_version} (urllib3/{urllib3.__version__})" # type: ignore[attr-defined]


def _get_pool_manager() -> urllib3.PoolManager:
"""
Get or create the global PoolManager instance.

Returns:
The global urllib3.PoolManager instance.
"""
global _pool_manager
if _pool_manager is None:
_pool_manager = urllib3.PoolManager(
headers={"User-Agent": USER_AGENT},
timeout=urllib3.Timeout(connect=30.0, read=30.0),
)
return _pool_manager


class HTTPError(Exception):
"""
Represents an HTTP error response.
"""

def __init__(self, status: int, reason: str, body: str | None = None):
"""
Create a new HTTPError.

Args:
status: HTTP status code
reason: HTTP status reason phrase
body: Optional response body
"""
self.status = status
self.reason = reason
self.body = body
super().__init__(f"HTTP {status}: {reason}")


class HTTPResponse:
"""
Wrapper around urllib3 HTTPResponse for easier usage.
"""

def __init__(self, response: urllib3.BaseHTTPResponse):
"""
Create a new HTTPResponse.

Args:
response: The underlying urllib3.HTTPResponse
"""
self._response = response
self.status_code = response.status
self.reason = response.reason
self._data = response.data

def raise_for_status(self) -> None:
"""
Raise an HTTPError if the response status indicates an error.

Raises:
HTTPError: If status code is 4xx or 5xx
"""
if 400 <= self.status_code < 600:
raise HTTPError(self.status_code, self.reason or "", self.text)

@property
def text(self) -> str:
"""
Get the response body as text.

Returns:
The response body decoded as UTF-8
"""
return self._response.data.decode("utf-8")

def json(self) -> Any:
"""
Parse the response body as JSON.

Returns:
The parsed JSON data
"""
return self._response.json()


def request(
method: str,
url: str,
*,
headers: dict[str, str] | None = None,
json_data: Any | None = None,
data: bytes | None = None,
params: dict[str, Any] | None = None,
timeout: float | None = None,
) -> HTTPResponse:
"""
Make an HTTP request using the global PoolManager.

Args:
method: HTTP method (GET, POST, etc.)
url: URL to request
headers: Optional additional headers
json_data: Optional JSON data to send (will be serialized)
data: Optional raw bytes to send
params: Optional query parameters
timeout: Optional timeout in seconds

Returns:
HTTPResponse object

Raises:
urllib3.exceptions.HTTPError: On connection errors
HTTPError: On HTTP error status codes (if raise_for_status is called)
"""
pool = _get_pool_manager()

# Build request headers
request_headers = {}
if json_data is not None:
request_headers["Content-Type"] = "application/json"
data = json.dumps(json_data).encode("utf-8")
if headers:
request_headers.update(headers)

# Build fields for query parameters
fields = None
if params:
fields = params
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fields seems like a useless variable here?


# Create timeout object if specified
timeout_obj = None
if timeout is not None:
timeout_obj = urllib3.Timeout(connect=timeout, read=timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this changes anything in practice but in urllib3 the timeout argument default value is not None but a separate placeholder, yet we always set None here.

That said, maybe we should set an actual timeout default here to make it clear?


response = pool.request(
method,
url,
headers=request_headers,
body=data,
fields=fields if method.upper() == "GET" else None,
timeout=timeout_obj,
)

return HTTPResponse(response)


def get(url: str, **kwargs: Any) -> HTTPResponse:
"""
Make a GET request.

Args:
url: URL to request
**kwargs: Additional arguments to pass to request()

Returns:
HTTPResponse object
"""
return request("GET", url, **kwargs)


def post(url: str, **kwargs: Any) -> HTTPResponse:
"""
Make a POST request.

Args:
url: URL to request
**kwargs: Additional arguments to pass to request()

Returns:
HTTPResponse object
"""
return request("POST", url, **kwargs)
23 changes: 10 additions & 13 deletions sigstore/_internal/rekor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
from abc import ABC, abstractmethod

import rekor_types
import requests
from cryptography.x509 import Certificate

from sigstore._internal import http
from sigstore._utils import base64_encode_pem_cert
from sigstore.dsse import Envelope
from sigstore.hashes import Hashed
Expand All @@ -45,20 +45,17 @@ class RekorClientError(Exception):
A generic error in the Rekor client.
"""

def __init__(self, http_error: requests.HTTPError):
def __init__(self, http_error: http.HTTPError):
"""
Create a new `RekorClientError` from the given `requests.HTTPError`.
Create a new `RekorClientError` from the given `http.HTTPError`.
"""
if http_error.response is not None:
try:
error = rekor_types.Error.model_validate_json(http_error.response.text)
super().__init__(f"{error.code}: {error.message}")
except Exception:
super().__init__(
f"Rekor returned an unknown error with HTTP {http_error.response.status_code}"
)
else:
super().__init__(f"Unexpected Rekor error: {http_error}")
try:
error = rekor_types.Error.model_validate_json(http_error.body or "")
super().__init__(f"{error.code}: {error.message}")
except Exception:
super().__init__(
f"Rekor returned an unknown error with HTTP {http_error.status}"
)


class RekorLogSubmitter(ABC):
Expand Down
Loading
Loading