Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions hcloud/zones/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def update_rrset(
DNS API is in beta, breaking changes may occur within minor releases.
See https://docs.hetzner.cloud/changelog#2025-10-07-dns-beta for more details.
"""
return self._client.update_rrset(rrset, labels=labels)
return self._client.update_rrset(rrset=rrset, labels=labels)

def delete_rrset(
self,
Expand All @@ -369,7 +369,7 @@ def delete_rrset(
DNS API is in beta, breaking changes may occur within minor releases.
See https://docs.hetzner.cloud/changelog#2025-10-07-dns-beta for more details.
"""
return self._client.delete_rrset(rrset)
return self._client.delete_rrset(rrset=rrset)

def change_rrset_protection(
self,
Expand All @@ -389,7 +389,7 @@ def change_rrset_protection(
DNS API is in beta, breaking changes may occur within minor releases.
See https://docs.hetzner.cloud/changelog#2025-10-07-dns-beta for more details.
"""
return self._client.change_rrset_protection(rrset, change=change)
return self._client.change_rrset_protection(rrset=rrset, change=change)

def change_rrset_ttl(
self,
Expand All @@ -408,7 +408,7 @@ def change_rrset_ttl(
DNS API is in beta, breaking changes may occur within minor releases.
See https://docs.hetzner.cloud/changelog#2025-10-07-dns-beta for more details.
"""
return self._client.change_rrset_ttl(rrset, ttl=ttl)
return self._client.change_rrset_ttl(rrset=rrset, ttl=ttl)

def add_rrset_records(
self,
Expand All @@ -429,7 +429,7 @@ def add_rrset_records(
DNS API is in beta, breaking changes may occur within minor releases.
See https://docs.hetzner.cloud/changelog#2025-10-07-dns-beta for more details.
"""
return self._client.add_rrset_records(rrset, records=records, ttl=ttl)
return self._client.add_rrset_records(rrset=rrset, records=records, ttl=ttl)

def remove_rrset_records(
self,
Expand All @@ -448,7 +448,7 @@ def remove_rrset_records(
DNS API is in beta, breaking changes may occur within minor releases.
See https://docs.hetzner.cloud/changelog#2025-10-07-dns-beta for more details.
"""
return self._client.remove_rrset_records(rrset, records=records)
return self._client.remove_rrset_records(rrset=rrset, records=records)

def set_rrset_records(
self,
Expand All @@ -467,7 +467,7 @@ def set_rrset_records(
DNS API is in beta, breaking changes may occur within minor releases.
See https://docs.hetzner.cloud/changelog#2025-10-07-dns-beta for more details.
"""
return self._client.set_rrset_records(rrset, records=records)
return self._client.set_rrset_records(rrset=rrset, records=records)


class BoundZoneRRSet(BoundModelBase, ZoneRRSet):
Expand Down
106 changes: 30 additions & 76 deletions tests/unit/certificates/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,88 +12,42 @@
ManagedCertificateStatus,
)

from ..conftest import BoundModelTestCase

class TestBoundCertificate:
@pytest.fixture()
def bound_certificate(self, client: Client):
return BoundCertificate(client.certificates, data=dict(id=14))

def test_bound_certificate_init(self, certificate_response):
bound_certificate = BoundCertificate(
client=mock.MagicMock(), data=certificate_response["certificate"]
)

assert bound_certificate.id == 2323
assert bound_certificate.name == "My Certificate"
assert bound_certificate.type == "managed"
assert (
bound_certificate.fingerprint
== "03:c7:55:9b:2a:d1:04:17:09:f6:d0:7f:18:34:63:d4:3e:5f"
)
assert bound_certificate.certificate == "-----BEGIN CERTIFICATE-----\n..."
assert len(bound_certificate.domain_names) == 3
assert bound_certificate.domain_names[0] == "example.com"
assert bound_certificate.domain_names[1] == "webmail.example.com"
assert bound_certificate.domain_names[2] == "www.example.com"
assert isinstance(bound_certificate.status, ManagedCertificateStatus)
assert bound_certificate.status.issuance == "failed"
assert bound_certificate.status.renewal == "scheduled"
assert bound_certificate.status.error.code == "error_code"
assert bound_certificate.status.error.message == "error message"

def test_update(
self,
request_mock: mock.MagicMock,
bound_certificate,
response_update_certificate,
):
request_mock.return_value = response_update_certificate

certificate = bound_certificate.update(name="New name")
class TestBoundCertificate(BoundModelTestCase):
methods = [
BoundCertificate.update,
BoundCertificate.delete,
BoundCertificate.retry_issuance,
]

request_mock.assert_called_with(
method="PUT",
url="/certificates/14",
json={"name": "New name"},
)

assert certificate.id == 2323
assert certificate.name == "New name"

def test_delete(
self,
request_mock: mock.MagicMock,
bound_certificate,
action_response,
):
request_mock.return_value = action_response

delete_success = bound_certificate.delete()

request_mock.assert_called_with(
method="DELETE",
url="/certificates/14",
)

assert delete_success is True

def test_retry_issuance(
self,
request_mock: mock.MagicMock,
bound_certificate,
response_retry_issuance_action,
):
request_mock.return_value = response_retry_issuance_action

action = bound_certificate.retry_issuance()
@pytest.fixture()
def resource_client(self, client: Client):
return client.certificates

request_mock.assert_called_with(
method="POST",
url="/certificates/14/actions/retry",
@pytest.fixture()
def bound_model(self, resource_client, certificate_response):
return BoundCertificate(
resource_client, data=certificate_response["certificate"]
)

assert action.id == 14
assert action.command == "issue_certificate"
def test_init(self, bound_model: BoundCertificate):
o = bound_model
assert o.id == 2323
assert o.name == "My Certificate"
assert o.type == "managed"
assert o.fingerprint == "03:c7:55:9b:2a:d1:04:17:09:f6:d0:7f:18:34:63:d4:3e:5f"
assert o.certificate == "-----BEGIN CERTIFICATE-----\n..."
assert len(o.domain_names) == 3
assert o.domain_names[0] == "example.com"
assert o.domain_names[1] == "webmail.example.com"
assert o.domain_names[2] == "www.example.com"
assert isinstance(o.status, ManagedCertificateStatus)
assert o.status.issuance == "failed"
assert o.status.renewal == "scheduled"
assert o.status.error.code == "error_code"
assert o.status.error.message == "error message"


class TestCertificatesClient:
Expand Down
109 changes: 99 additions & 10 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from __future__ import annotations

from collections.abc import Generator
import inspect
from typing import Callable, ClassVar, TypedDict
from unittest import mock
from warnings import warn

import pytest

Expand Down Expand Up @@ -134,11 +134,100 @@ def action_list_response(action1_running, action2_running):
}


@pytest.fixture()
def hetzner_client() -> Generator[Client]:
warn("DEPRECATED")
client = Client(token="token")
patcher = mock.patch.object(client, "request")
patcher.start()
yield client
patcher.stop()
def build_kwargs_mock(func: Callable) -> dict[str, mock.Mock]:
"""
Generate a kwargs dict that may be passed to the provided function for testing purposes.
"""
s = inspect.signature(func)

kwargs = {}
for name, param in s.parameters.items():
if name in ("self",):
continue

if param.kind in (param.POSITIONAL_OR_KEYWORD, param.KEYWORD_ONLY):
kwargs[name] = mock.Mock()
continue

# Ignore **kwargs
if param.kind in (param.VAR_KEYWORD,):
continue

raise NotImplementedError(f"unsupported parameter kind: {param.kind}")

return kwargs


def pytest_generate_tests(metafunc: pytest.Metafunc):
"""
Magic function to generate a test for each bound model method.
"""
if "bound_model_method" in metafunc.fixturenames:
metafunc.parametrize("bound_model_method", metafunc.cls.methods)


class BoundModelTestOptions(TypedDict):
sub_resource: bool


class BoundModelTestCase:
methods: ClassVar[list[Callable | tuple[Callable, BoundModelTestOptions]]]

def test_method_list(self, bound_model):
"""
Ensure the list of bound model methods is up to date.
"""
# Unpack methods
methods = [m[0] if isinstance(m, tuple) else m for m in self.__class__.methods]

members_count = 0
members_missing = []
for name, member in inspect.getmembers(
bound_model,
lambda m: inspect.ismethod(m)
and m.__func__ in bound_model.__class__.__dict__.values(),
):
# Actions methods are already tested in TestBoundModelActions.
if name in ("__init__", "get_actions", "get_actions_list"):
continue

if member.__func__ in methods:
members_count += 1
else:
members_missing.append(member.__func__.__qualname__)

assert not members_missing, "untested methods:\n" + ",\n".join(members_missing)
assert members_count == len(self.__class__.methods)

def test_method(
self,
resource_client,
bound_model,
bound_model_method: Callable | tuple[Callable, BoundModelTestOptions],
):
options = BoundModelTestOptions()
if isinstance(bound_model_method, tuple):
bound_model_method, options = bound_model_method

# Check if the resource client has a method named after the bound model method.
assert hasattr(resource_client, bound_model_method.__name__)

# Mock the resource client method.
resource_client_method_mock = mock.MagicMock()
setattr(
resource_client,
bound_model_method.__name__,
resource_client_method_mock,
)

kwargs = build_kwargs_mock(bound_model_method)

# Call the bound model method
result = getattr(bound_model, bound_model_method.__name__)(**kwargs)

if options.get("sub_resource"):
resource_client_method_mock.assert_called_with(**kwargs)
else:
resource_client_method_mock.assert_called_with(bound_model, **kwargs)

assert result is resource_client_method_mock.return_value
Loading