-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_registration
More file actions
executable file
·85 lines (75 loc) · 2.92 KB
/
test_registration
File metadata and controls
executable file
·85 lines (75 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
import json
import requests
import time
import urllib.parse
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives._serialization import PublicFormat
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from jose import jwk
def gen_key_secp256r1():
curve = ec.SECP256R1()
return ec.generate_private_key(curve, default_backend())
def jwk_secp256r1(key: ec.EllipticCurvePrivateKey):
public_key = key.public_key()
der_public_key = public_key.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
base64_public_key = jwk.b64encode(der_public_key).decode()
return {
"kty": "EC",
"crv": "P-256",
"x": base64_public_key[:43],
"y": base64_public_key[43:]
}
def register_account(url: str, email: str, key: ec.EllipticCurvePrivateKey):
jwk_key = jwk_secp256r1(key)
jwk_thumbprint = jwk.thumbprint(jwk_key)
header = {
"alg": "ES256",
"jwk": jwk_key,
"kid": jwk_thumbprint
}
payload = {
"contact": [f"mailto:{email}"],
"termsOfServiceAgreed": True,
}
nonce = get_nonce(url)
protected = get_protected(header, nonce, url + "/acme/new-account")
signature = sign(protected, payload, key)
request = {
"protected": protected,
"payload": json.dumps(payload),
"signature": signature
}
response = send_signed_request(url + "/acme/new-account", request)
print(response.headers)
print(response.json())
def get_nonce(url: str):
response = requests.head(url=url)
return response.headers["Replay-Nonce"]
def get_protected(header: dict, nonce: str, url: str):
return jwk.b64encode(json.dumps({
"alg": header["alg"],
"jwk": header["jwk"],
"kid": header["kid"],
"nonce": nonce,
"url": url
}).encode()).decode()
def sign(protected: str, payload: dict, key: ec.EllipticCurvePrivateKey):
message = "{}.{}".format(protected, json.dumps(payload)).encode()
signature = key.sign(message, ec.ECDSA(hashes.SHA256()))
return jwk.b64encode(signature).decode()
def send_signed_request(url: str, request: dict):
response = requests.post(url, data=json.dumps(request), headers={"Content-Type": "application/jose+json"})
if response.status_code == 400:
raise ValueError(response.json())
elif response.status_code == 429:
raise ValueError("Rate limit exceeded, retry after %s seconds" % response.headers.get("Retry-After"))
elif response.status_code < 200 or response.status_code >= 300:
raise ValueError("Unexpected status code: %d" % response.status_code)
return response
url = "https://acme-staging-v02.api.letsencrypt.org"
email = "sudip@sireto.io"
key = gen_key_secp256r1()
register_account(url, email, key)