|
12 | 12 | from datetime import timedelta |
13 | 13 | from functools import cached_property |
14 | 14 | from hashlib import sha256 |
| 15 | +from typing import Set |
15 | 16 |
|
16 | 17 | import dns |
17 | 18 | import psl_dns |
18 | 19 | import rest_framework.authtoken.models |
| 20 | +from cryptography import x509, hazmat |
19 | 21 | from django.conf import settings |
20 | 22 | from django.contrib.auth.hashers import make_password |
21 | 23 | from django.contrib.auth.models import BaseUserManager, AbstractBaseUser |
@@ -939,3 +941,132 @@ def verify(self, solution: str): |
939 | 941 | and |
940 | 942 | age <= settings.CAPTCHA_VALIDITY_PERIOD # not expired |
941 | 943 | ) |
| 944 | + |
| 945 | + |
| 946 | +class Identity(models.Model): |
| 947 | + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) |
| 948 | + name = models.CharField(max_length=24) |
| 949 | + created = models.DateTimeField(auto_now_add=True) |
| 950 | + owner = models.ForeignKey(User, on_delete=models.PROTECT, related_name='identities') |
| 951 | + |
| 952 | + class Meta: |
| 953 | + abstract = True |
| 954 | + |
| 955 | + |
| 956 | +class TLSIdentity(Identity): |
| 957 | + |
| 958 | + class CertificateUsage(models.Choices): |
| 959 | + CA_CONSTRAINT = 0 |
| 960 | + SERVICE_CERTIFICATE_CONSTRAINT = 1 |
| 961 | + TRUST_ANCHOR_ASSERTION = 2 |
| 962 | + DOMAIN_ISSUED_CERTIFICATE = 3 |
| 963 | + |
| 964 | + class Selector(models.Choices): |
| 965 | + FULL_CERTIFICATE = 0 |
| 966 | + SUBJECT_PUBLIC_KEY_INFO = 1 |
| 967 | + |
| 968 | + class MatchingType(models.Choices): |
| 969 | + NO_HASH_USED = 0 |
| 970 | + SHA256 = 1 |
| 971 | + SHA512 = 2 |
| 972 | + |
| 973 | + class Protocol(models.TextChoices): |
| 974 | + TCP = 'tcp' |
| 975 | + UDP = 'udp' |
| 976 | + SCTP = 'sctp' |
| 977 | + |
| 978 | + certificate = models.TextField() |
| 979 | + |
| 980 | + tlsa_selector = models.IntegerField(choices=Selector.choices, default=Selector.SUBJECT_PUBLIC_KEY_INFO) |
| 981 | + tlsa_matching_type = models.IntegerField(choices=MatchingType.choices, default=MatchingType.SHA256) |
| 982 | + tlsa_certificate_usage = models.IntegerField(choices=CertificateUsage.choices, |
| 983 | + default=CertificateUsage.DOMAIN_ISSUED_CERTIFICATE) |
| 984 | + |
| 985 | + port = models.IntegerField(default=443) |
| 986 | + protocol = models.TextField(choices=Protocol.choices, default=Protocol.TCP) |
| 987 | + |
| 988 | + scheduled_removal = models.DateTimeField(null=True) |
| 989 | + |
| 990 | + def __init__(self, *args, **kwargs): |
| 991 | + super().__init__(*args, **kwargs) |
| 992 | + if 'not_valid_after' not in kwargs: |
| 993 | + self.scheduled_removal = self.not_valid_after |
| 994 | + |
| 995 | + @property |
| 996 | + def tlsa_record(self) -> str: |
| 997 | + # choose hash function |
| 998 | + if self.tlsa_matching_type == 1: |
| 999 | + hash_function = hazmat.primitives.hashes.SHA256() |
| 1000 | + elif self.tlsa_matching_type == 2: |
| 1001 | + hash_function = hazmat.primitives.hashes.SHA512() |
| 1002 | + else: |
| 1003 | + raise NotImplementedError |
| 1004 | + |
| 1005 | + # choose data to hash |
| 1006 | + if self.tlsa_selector == 0: |
| 1007 | + to_be_hashed = self._cert.public_key().public_bytes( |
| 1008 | + hazmat.primitives.serialization.Encoding.DER, |
| 1009 | + hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo |
| 1010 | + ) |
| 1011 | + else: |
| 1012 | + raise NotImplementedError |
| 1013 | + |
| 1014 | + # compute the hash |
| 1015 | + h = hazmat.primitives.hashes.Hash(hash_function) |
| 1016 | + h.update(to_be_hashed) |
| 1017 | + hash = h.finalize.hex() |
| 1018 | + |
| 1019 | + # create TLSA record |
| 1020 | + return f"{self.tlsa_certificate_usage:n} {self.tlsa_selector:n} {self.tlsa_matching_type:n} {hash}" |
| 1021 | + |
| 1022 | + @property |
| 1023 | + def _cert(self) -> x509.Certificate: |
| 1024 | + return x509.load_pem_x509_certificate(self.certificate.encode()) |
| 1025 | + |
| 1026 | + @property |
| 1027 | + def fingerprint(self) -> str: |
| 1028 | + return self._cert.fingerprint(hazmat.primitives.hashes.SHA256()).hex() |
| 1029 | + |
| 1030 | + @property |
| 1031 | + def subject_names(self) -> Set[str]: |
| 1032 | + subject_names = { |
| 1033 | + x.value for x in |
| 1034 | + self._cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) |
| 1035 | + } |
| 1036 | + |
| 1037 | + try: |
| 1038 | + subject_alternative_names = { |
| 1039 | + x for x in |
| 1040 | + self._cert.extensions.get_extension_for_oid( |
| 1041 | + x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(x509.DNSName) |
| 1042 | + } |
| 1043 | + except x509.extensions.ExtensionNotFound: |
| 1044 | + subject_alternative_names = set() |
| 1045 | + |
| 1046 | + return subject_names | subject_alternative_names |
| 1047 | + |
| 1048 | + @property |
| 1049 | + def domains_subnames(self): |
| 1050 | + domains_subnames = [] |
| 1051 | + for name in self.subject_names: |
| 1052 | + # filter names for valid domain names |
| 1053 | + try: |
| 1054 | + validate_domain_name[1](name) |
| 1055 | + except ValidationError: |
| 1056 | + continue |
| 1057 | + |
| 1058 | + # find user-owned parent domain |
| 1059 | + domain = 'example.dedyn.io' |
| 1060 | + subname = '*' |
| 1061 | + |
| 1062 | + # return subname, domain pair |
| 1063 | + domains_subnames.append((subname, domain)) |
| 1064 | + return domains_subnames |
| 1065 | + |
| 1066 | + @property |
| 1067 | + def not_valid_before(self): |
| 1068 | + return self._cert.not_valid_before |
| 1069 | + |
| 1070 | + @property |
| 1071 | + def not_valid_after(self): |
| 1072 | + return self._cert.not_valid_after |
0 commit comments