Skip to content

Commit be8bfdb

Browse files
committed
Add tests for cli
1 parent 7f4bae1 commit be8bfdb

File tree

1 file changed

+385
-0
lines changed

1 file changed

+385
-0
lines changed

tests/test_cli.py

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
import builtins
2+
import http.client
3+
import types
4+
from datetime import datetime, timezone
5+
from unittest.mock import Mock
6+
7+
import pytest
8+
import requests
9+
10+
import certapi.cli as cli
11+
from certapi.challenge_solver.InmemoryChallengeSolver import InMemoryChallengeSolver
12+
from certapi.errors import CertApiException
13+
14+
15+
class FakeKey:
16+
def to_pem(self):
17+
return "KEY"
18+
19+
20+
class FakeKeyStore:
21+
def __init__(self, base_dir):
22+
self.keys_dir = f"{base_dir}/keys"
23+
self.certs_dir = f"{base_dir}/certs"
24+
self.saved = []
25+
26+
def save_key(self, key, name):
27+
self.saved.append(("key", name))
28+
return "key-id"
29+
30+
def save_cert(self, key_id, cert, domains, name=None):
31+
self.saved.append(("cert", name, tuple(domains)))
32+
return "cert-id"
33+
34+
35+
class FakeAcmeCertIssuer:
36+
def __init__(self, key_store, solver, account_key_name):
37+
self.key_store = key_store
38+
self.solver = solver
39+
self.account_key_name = account_key_name
40+
self.setup_called = False
41+
self.raise_error = False
42+
43+
@classmethod
44+
def with_keystore(cls, key_store, solver, account_key_name="acme_account"):
45+
return cls(key_store, solver, account_key_name)
46+
47+
def setup(self):
48+
self.setup_called = True
49+
50+
def generate_key_and_cert_for_domains(self, domains, key_type="rsa"):
51+
if self.raise_error:
52+
raise CertApiException("boom", {"reason": "bad"}, step="test")
53+
return FakeKey(), "CERT"
54+
55+
56+
class FakeCert:
57+
def __init__(self, not_valid_after):
58+
self.not_valid_after = not_valid_after
59+
60+
61+
def test_is_root_true_false(monkeypatch):
62+
monkeypatch.setattr(cli.os, "geteuid", lambda: 0)
63+
assert cli.is_root() is True
64+
65+
dummy_os = types.SimpleNamespace()
66+
monkeypatch.setattr(cli, "os", dummy_os)
67+
assert cli.is_root() is False
68+
69+
70+
def test_find_process_on_port_success(monkeypatch):
71+
monkeypatch.setattr(cli.subprocess, "check_output", lambda *args, **kwargs: b"123\n456\n")
72+
assert cli.find_process_on_port(80) == ["123", "456"]
73+
74+
75+
def test_find_process_on_port_empty(monkeypatch):
76+
monkeypatch.setattr(cli.subprocess, "check_output", lambda *args, **kwargs: b"")
77+
assert cli.find_process_on_port(80) == []
78+
79+
80+
def test_find_process_on_port_error(monkeypatch):
81+
def raise_error(*args, **kwargs):
82+
raise cli.subprocess.CalledProcessError(1, "lsof")
83+
84+
monkeypatch.setattr(cli.subprocess, "check_output", raise_error)
85+
assert cli.find_process_on_port(80) == []
86+
87+
88+
def test_start_http_challenge_server_responses():
89+
solver = InMemoryChallengeSolver()
90+
solver.save_challenge("token-ok", "value", "example.com")
91+
solver.save_challenge("token-bytes", b"binary", "example.com")
92+
93+
server, _ = cli._start_http_challenge_server(solver, port=0)
94+
try:
95+
port = server.server_address[1]
96+
97+
conn = http.client.HTTPConnection("localhost", port)
98+
conn.request("GET", "/.well-known/acme-challenge/token-ok")
99+
response = conn.getresponse()
100+
data = response.read().decode("utf-8")
101+
assert response.status == 200
102+
assert data == "value"
103+
conn.close()
104+
105+
conn = http.client.HTTPConnection("localhost", port)
106+
conn.request("GET", "/.well-known/acme-challenge/token-bytes")
107+
response = conn.getresponse()
108+
data = response.read().decode("utf-8")
109+
assert response.status == 200
110+
assert data == "binary"
111+
conn.close()
112+
113+
conn = http.client.HTTPConnection("localhost", port)
114+
conn.request("GET", "/not-a-challenge")
115+
response = conn.getresponse()
116+
assert response.status == 404
117+
response.read()
118+
conn.close()
119+
120+
conn = http.client.HTTPConnection("localhost", port)
121+
conn.request("GET", "/.well-known/acme-challenge/")
122+
response = conn.getresponse()
123+
assert response.status == 400
124+
response.read()
125+
conn.close()
126+
127+
conn = http.client.HTTPConnection("localhost", port)
128+
conn.request("GET", "/.well-known/acme-challenge/missing")
129+
response = conn.getresponse()
130+
assert response.status == 404
131+
response.read()
132+
conn.close()
133+
finally:
134+
server.shutdown()
135+
server.server_close()
136+
137+
138+
def test_resolve_cloudflare_api_key(monkeypatch):
139+
monkeypatch.delenv("CLOUDFLARE_API_KEY", raising=False)
140+
monkeypatch.delenv("CLOUDFLARE_API_TOKEN", raising=False)
141+
assert cli._resolve_cloudflare_api_key() is None
142+
143+
monkeypatch.setenv("CLOUDFLARE_API_TOKEN", "token")
144+
assert cli._resolve_cloudflare_api_key() == "token"
145+
146+
monkeypatch.setenv("CLOUDFLARE_API_KEY", "key")
147+
assert cli._resolve_cloudflare_api_key() == "key"
148+
149+
150+
def test_ensure_port_80_available_not_root(monkeypatch, capsys):
151+
monkeypatch.setattr(cli, "is_root", lambda: False)
152+
with pytest.raises(SystemExit) as exc:
153+
cli._ensure_port_80_available()
154+
assert exc.value.code == 1
155+
assert "Must be run as root" in capsys.readouterr().out
156+
157+
158+
def test_ensure_port_80_available_quit(monkeypatch):
159+
monkeypatch.setattr(cli, "is_root", lambda: True)
160+
monkeypatch.setattr(cli, "find_process_on_port", lambda port: ["123"])
161+
monkeypatch.setattr(builtins, "input", lambda _: "q")
162+
163+
with pytest.raises(SystemExit) as exc:
164+
cli._ensure_port_80_available()
165+
assert exc.value.code == 1
166+
167+
168+
def test_ensure_port_80_available_retries(monkeypatch):
169+
monkeypatch.setattr(cli, "is_root", lambda: True)
170+
calls = iter([["123"], []])
171+
monkeypatch.setattr(cli, "find_process_on_port", lambda port: next(calls))
172+
monkeypatch.setattr(builtins, "input", lambda _: "")
173+
174+
cli._ensure_port_80_available()
175+
176+
177+
def test_obtain_certificate_dns_challenge(monkeypatch, capsys):
178+
fake_issuer = FakeAcmeCertIssuer(FakeKeyStore("/etc/ssl"), Mock(), "acme_account")
179+
180+
def fake_with_keystore(key_store, solver, account_key_name="acme_account"):
181+
fake_issuer.key_store = key_store
182+
fake_issuer.solver = solver
183+
fake_issuer.account_key_name = account_key_name
184+
return fake_issuer
185+
186+
monkeypatch.setattr(cli, "CloudflareChallengeSolver", lambda api_key=None: Mock(api_key=api_key))
187+
monkeypatch.setattr(cli, "FileSystemKeyStore", lambda path: FakeKeyStore(path))
188+
monkeypatch.setattr(cli.AcmeCertIssuer, "with_keystore", staticmethod(fake_with_keystore))
189+
monkeypatch.setattr(
190+
cli, "certs_from_pem", lambda *args, **kwargs: [FakeCert(datetime(2026, 1, 1, tzinfo=timezone.utc))]
191+
)
192+
193+
cli.obtain_certificate(["example.com"], api_key="token")
194+
output = capsys.readouterr().out
195+
assert "Using Cloudflare DNS challenge." in output
196+
assert "Certificate expires at" in output
197+
assert "Key path" in output
198+
assert "Cert path" in output
199+
200+
201+
def test_obtain_certificate_http_challenge_unknown_expiry(monkeypatch, capsys):
202+
fake_issuer = FakeAcmeCertIssuer(FakeKeyStore("/etc/ssl"), Mock(), "acme_account")
203+
server = Mock()
204+
205+
monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None)
206+
monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: InMemoryChallengeSolver())
207+
monkeypatch.setattr(cli, "_start_http_challenge_server", lambda solver, port=80: (server, Mock()))
208+
monkeypatch.setattr(cli, "FileSystemKeyStore", lambda path: FakeKeyStore(path))
209+
monkeypatch.setattr(
210+
cli.AcmeCertIssuer,
211+
"with_keystore",
212+
staticmethod(lambda key_store, solver, account_key_name="acme_account": fake_issuer),
213+
)
214+
monkeypatch.setattr(cli, "certs_from_pem", lambda *args, **kwargs: [])
215+
216+
cli.obtain_certificate(["example.com"], api_key=None)
217+
output = capsys.readouterr().out
218+
assert "Starting HTTP challenge server" in output
219+
assert "Certificate expires at: unknown" in output
220+
server.shutdown.assert_called_once()
221+
server.server_close.assert_called_once()
222+
223+
224+
def test_obtain_certificate_handles_error(monkeypatch, capsys):
225+
fake_issuer = FakeAcmeCertIssuer(FakeKeyStore("/etc/ssl"), Mock(), "acme_account")
226+
fake_issuer.raise_error = True
227+
server = Mock()
228+
229+
monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None)
230+
monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: InMemoryChallengeSolver())
231+
monkeypatch.setattr(cli, "_start_http_challenge_server", lambda solver, port=80: (server, Mock()))
232+
monkeypatch.setattr(cli, "FileSystemKeyStore", lambda path: FakeKeyStore(path))
233+
monkeypatch.setattr(
234+
cli.AcmeCertIssuer,
235+
"with_keystore",
236+
staticmethod(lambda key_store, solver, account_key_name="acme_account": fake_issuer),
237+
)
238+
239+
cli.obtain_certificate(["example.com"], api_key=None)
240+
output = capsys.readouterr().out
241+
assert "An error occurred:" in output
242+
assert "boom" in output
243+
server.shutdown.assert_called_once()
244+
server.server_close.assert_called_once()
245+
246+
247+
def test_verify_environment_dns_supported(monkeypatch, capsys):
248+
class FakeSolver:
249+
def __init__(self, api_key=None):
250+
self.api_key = api_key
251+
252+
def supports_domain(self, domain):
253+
return True
254+
255+
monkeypatch.setattr(cli, "CloudflareChallengeSolver", FakeSolver)
256+
cli.verify_environment(["example.com"], api_key="token")
257+
output = capsys.readouterr().out
258+
assert "Cloudflare account appears to manage" in output
259+
260+
261+
def test_verify_environment_dns_unsupported(monkeypatch, capsys):
262+
class FakeSolver:
263+
def __init__(self, api_key=None):
264+
self.api_key = api_key
265+
266+
def supports_domain(self, domain):
267+
return domain != "bad.example"
268+
269+
monkeypatch.setattr(cli, "CloudflareChallengeSolver", FakeSolver)
270+
cli.verify_environment(["good.example", "bad.example"], api_key="token")
271+
output = capsys.readouterr().out
272+
assert "Warning: Cloudflare account does not appear to manage" in output
273+
assert "bad.example" in output
274+
275+
276+
def test_verify_environment_http_no_domains(monkeypatch, capsys):
277+
monkeypatch.setattr(cli, "is_root", lambda: True)
278+
cli.verify_environment([], api_key=None)
279+
output = capsys.readouterr().out
280+
assert "No domains provided" in output
281+
282+
283+
def test_verify_environment_http_not_root(monkeypatch, capsys):
284+
monkeypatch.setattr(cli, "is_root", lambda: False)
285+
286+
def fail_if_called():
287+
raise AssertionError("should not be called")
288+
289+
monkeypatch.setattr(cli, "_ensure_port_80_available", fail_if_called)
290+
cli.verify_environment(["example.com"], api_key=None)
291+
output = capsys.readouterr().out
292+
assert "Warning: not running as root" in output
293+
294+
295+
def test_verify_environment_http_checks(monkeypatch, capsys):
296+
solver = InMemoryChallengeSolver()
297+
server = Mock()
298+
299+
monkeypatch.setattr(cli, "is_root", lambda: True)
300+
monkeypatch.setattr(cli, "find_process_on_port", lambda port: ["999"])
301+
monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None)
302+
monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: solver)
303+
monkeypatch.setattr(cli, "_start_http_challenge_server", lambda challenge_solver, port=80: (server, Mock()))
304+
305+
tokens = iter(["token1", "value1", "token2", "value2", "token3", "value3"])
306+
monkeypatch.setattr(cli.secrets, "token_urlsafe", lambda _: next(tokens))
307+
308+
def fake_get(url, allow_redirects=False, timeout=5):
309+
if "token1" in url:
310+
return Mock(status_code=200, text="value1")
311+
if "token2" in url:
312+
return Mock(status_code=500, text="oops")
313+
raise requests.RequestException("boom")
314+
315+
monkeypatch.setattr(cli.requests, "get", fake_get)
316+
317+
cli.verify_environment(["ok.example", "bad.example", "err.example"], api_key=None)
318+
output = capsys.readouterr().out
319+
assert "Warning: port 80 is in use" in output
320+
assert "OK: ok.example" in output
321+
assert "FAILED: bad.example" in output
322+
assert "FAILED: err.example" in output
323+
assert "Summary: 1 OK, 2 FAILED" in output
324+
assert len(solver) == 0
325+
server.shutdown.assert_called_once()
326+
server.server_close.assert_called_once()
327+
328+
329+
def test_verify_environment_http_port_available(monkeypatch, capsys):
330+
solver = InMemoryChallengeSolver()
331+
server = Mock()
332+
333+
monkeypatch.setattr(cli, "is_root", lambda: True)
334+
monkeypatch.setattr(cli, "find_process_on_port", lambda port: [])
335+
monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None)
336+
monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: solver)
337+
monkeypatch.setattr(cli, "_start_http_challenge_server", lambda challenge_solver, port=80: (server, Mock()))
338+
monkeypatch.setattr(cli.secrets, "token_urlsafe", lambda _: "token")
339+
monkeypatch.setattr(cli.requests, "get", lambda *args, **kwargs: Mock(status_code=200, text="token"))
340+
341+
cli.verify_environment(["ok.example"], api_key=None)
342+
output = capsys.readouterr().out
343+
assert "Port 80 is available" in output
344+
345+
346+
def test_main_verify(monkeypatch):
347+
called = {}
348+
349+
def fake_verify(domains, api_key=None):
350+
called["domains"] = domains
351+
called["api_key"] = api_key
352+
353+
monkeypatch.setattr(cli, "verify_environment", fake_verify)
354+
monkeypatch.setattr(cli, "_resolve_cloudflare_api_key", lambda: "token")
355+
monkeypatch.setattr(cli.sys, "argv", ["certapi", "verify", "example.com"])
356+
357+
with pytest.raises(SystemExit) as exc:
358+
cli.main()
359+
360+
assert exc.value.code == 0
361+
assert called["domains"] == ["example.com"]
362+
assert called["api_key"] == "token"
363+
364+
365+
def test_main_obtain(monkeypatch):
366+
called = {}
367+
368+
def fake_obtain(domains, api_key=None):
369+
called["domains"] = domains
370+
called["api_key"] = api_key
371+
372+
monkeypatch.setattr(cli, "obtain_certificate", fake_obtain)
373+
monkeypatch.setattr(cli, "_resolve_cloudflare_api_key", lambda: None)
374+
monkeypatch.setattr(cli.sys, "argv", ["certapi", "obtain", "example.com"])
375+
376+
cli.main()
377+
assert called["domains"] == ["example.com"]
378+
assert called["api_key"] is None
379+
380+
381+
def test_main_help(monkeypatch):
382+
monkeypatch.setattr(cli.sys, "argv", ["certapi"])
383+
with pytest.raises(SystemExit) as exc:
384+
cli.main()
385+
assert exc.value.code == 1

0 commit comments

Comments
 (0)