-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathpkdext.py
More file actions
226 lines (182 loc) · 8.25 KB
/
pkdext.py
File metadata and controls
226 lines (182 loc) · 8.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/python
import sys, os
from pathlib import Path
from typing import Dict
from datetime import datetime
from ldif3 import LDIFParser
sys.path.append(str(Path(os.path.dirname(sys.argv[0])) / Path("../libs/PassID-Server/src")))
from pymrtd.pki.x509 import Certificate, CscaCertificate, MasterListSignerCertificate, DocumentSignerCertificate
from pymrtd.pki.crl import CertificateRevocationList
from pymrtd.pki.ml import CscaMasterList
from asn1crypto.crl import CertificateList
def parse_dn(dn: str):
d = {}
dnp = dn.split('+') # split cn from the rest of dn
if len(dnp) == 1:
dn = dnp[0]
elif dnp[0].lower().startswith('cn'):
d['cn'] = dnp[0][4:]
dn = dnp[1]
else:
dn = dnp[0]
d['cn'] = dnp[1][4:]
dna = dn.split(',') # split other dn parts
for e in dna:
ea = e.split('=', 1)
d[ea[0].strip('\\').lower()] = ea[1]
return d
def fatal_error(msg: str):
print(msg, file=sys.stderr)
exit(1)
def print_warning(msg: str):
print("Warning: {}".format(msg))
def format_cert_sn(cert: Certificate):
return hex(cert.serial_number).rstrip("L").lstrip("0x")
def format_cert_fname(cert, baseName = ""):
if baseName != "":
baseName = "_" + baseName
if isinstance(cert, CertificateList) or 'country_name' not in cert.subject.native:
subject = cert.issuer
fp = cert.sha256[0:5].hex()
else:
subject = cert.subject
fp = cert.sha256_fingerprint[0:5]
name = subject.native['country_name'] + baseName
se_no_op = getattr(cert, "serial_number", None)
if callable(se_no_op):
name += "_" + hex(cert.serial_number)[2:]
elif 'tbs_certificate' in cert.native:
name += "_" + hex(int(cert.native['tbs_certificate']['serial_number']))[2:]
else:
name += "_fp_" + fp
return "".join(name.lower().split())
def get_ml_out_dir_name(ml: CscaMasterList):
cert = ml.signerCertificates[0]
name = cert.subject.native['country_name'] + "_ml"
return name.lower()
def get_ofile_for_cert(cert, baseName, ext, out_dir: Path):
out_dir.mkdir(parents=True, exist_ok=True)
out_dir = out_dir.joinpath('{}.{}'.format(format_cert_fname(cert, baseName), ext))
return out_dir.open('wb')
def get_ofile_for_csca(cert: Certificate, out_dir: Path):
return get_ofile_for_cert(cert, 'csca', 'cer', out_dir)
def get_ofile_for_dsc(cert: Certificate, out_dir: Path):
return get_ofile_for_cert(cert, 'dsc', 'cer', out_dir)
def get_ofile_for_crl(crl: CertificateList, out_dir: Path):
return get_ofile_for_cert(crl, 'crl', 'crl', out_dir)
def get_issuer_cert(issued_cert: Certificate, root_certs: Dict[bytes, Certificate]):
if issued_cert.self_signed == 'maybe':
return issued_cert
if issued_cert.authority_key_identifier is not None:
if issued_cert.authority_key_identifier in root_certs:
return root_certs[issued_cert.authority_key_identifier]
else:
issuer = issued_cert.issuer
for skid, rc in root_certs.items():
if rc.subject == issuer:
return rc
return None
def verify_and_write_csca(csca: CscaCertificate, issuing_cert: CscaCertificate, out_dir: Path):
try:
if not csca.isValidOn(datetime.utcnow()):
out_dir /= 'expired'
csca.verify(issuing_cert)
f = get_ofile_for_csca(csca, out_dir)
f.write(csca.dump())
except Exception as e:
if "Signature verification failed" == str(e):
print_warning("Signature verification failed for CSCA: {}, sig_algo: {}"
.format(csca.subject.native['country_name'], csca.signature_algo))
with get_ofile_for_csca(csca, out_dir.joinpath('failed_verification')) as f:
f.write(csca.dump())
else:
print_warning("CSCA verification failed! [C={} SerNo={}]\n\treason: {}"
.format(csca.subject.native['country_name'], format_cert_sn(csca), e))
with get_ofile_for_csca(csca, out_dir.joinpath('failed_verification')) as f:
f.write(csca.dump())
def verify_and_extract_masterlist(ml: CscaMasterList, out_dir: Path):
# verify ml integrity
try:
ml.verify()
except Exception as e:
print_warning("Integrity verification failed for master list issued by {}."
.format(ml.signerCertificates[0].subject.native['country_name']))
out_dir /= 'unverified_ml'
# verify and extract CSCAs
cscas = {}
skipped_cscas = []
for csca in ml.cscaList:
if csca.key_identifier not in cscas:
cscas[csca.key_identifier] = csca
if csca.self_signed != 'maybe':
if csca.authority_key_identifier not in cscas:
skipped_cscas.append(csca)
continue
issuing_cert = cscas[csca.authority_key_identifier]
else:
issuing_cert = csca
verify_and_write_csca(csca, issuing_cert, out_dir)
for csca in skipped_cscas:
issuer_cert = get_issuer_cert(csca, cscas)
if issuer_cert is None:
print_warning("Could not verify signature of CSCA C={} SerNo={}. Issuing CSCA not found!"
.format(csca.subject.native['country_name'], format_cert_sn(csca)))
with get_ofile_for_csca(csca, out_dir.joinpath('unverified')) as f:
f.write(csca.dump())
else:
verify_and_write_csca(csca, issuer_cert, out_dir)
# verify master list signer certificates
for mlsig_cert in ml.signerCertificates:
issuer_cert = get_issuer_cert(mlsig_cert, cscas)
if issuer_cert is None:
print_warning("Could not verify signature of master list signer certificate. Issuing CSCA not found! [C={} Ml-Sig-SerNo={}]".format(mlsig_cert.subject.native['country_name'], format_cert_sn(mlsig_cert)))
else:
try:
mlsig_cert.verify(issuer_cert)
except Exception as e:
print_warning("Failed to verify master list signer C={} Ml-Sig-SerNo={}\n\treason: {}".format(mlsig_cert.subject.native['country_name'],format_cert_sn(mlsig_cert), str(e)))
if __name__ == "__main__":
valid_ext = set([".ml", ".ldif"])
default_out_dir_csca = Path("csca")
default_out_dir_dsc = Path("dsc")
default_out_dir_crl = Path("crl")
if len(sys.argv[1:]) == 0:
fatal_error("Usage: <path_to_file|*.ml|*.ldif>")
in_file_path = Path(sys.argv[1])
in_file_ext = in_file_path.suffix
if in_file_ext not in valid_ext:
fatal_error("Error: Invalid input file!")
if not in_file_path.exists() or in_file_path.is_dir():
fatal_error("Error: Invalid input file path!")
with in_file_path.open('rb') as f:
if in_file_ext == '.ml':
ml_bytes = f.read()
ml = CscaMasterList.load(ml_bytes)
verify_and_extract_masterlist(ml,
default_out_dir_csca.joinpath(get_ml_out_dir_name(ml))
)
else:
parser = LDIFParser(f)
print("Note: DSC and CRL won't be verified against issuing CSCA!")
for dn, entry in parser.parse():
# ML
if 'CscaMasterListData' in entry:
ml = entry['CscaMasterListData'][0]
ml = CscaMasterList.load(ml)
verify_and_extract_masterlist(ml,
default_out_dir_csca.joinpath(get_ml_out_dir_name(ml))
)
# DSC
elif 'userCertificate' in entry or 'userCertificate;binary' in entry:
dn = parse_dn(dn)
dsc = entry['userCertificate;binary'][0]
dsc = DocumentSignerCertificate.load(dsc)
f = get_ofile_for_dsc(dsc, default_out_dir_dsc / dn['c'].lower() / 'unverified')
f.write(dsc.dump())
# CRL
elif 'certificateRevocationList' in entry or 'certificateRevocationList;binary' in entry:
dn = parse_dn(dn)
crl = entry['certificateRevocationList;binary'][0]
crl = CertificateRevocationList.load(crl)
f = get_ofile_for_crl(crl, default_out_dir_crl / dn['c'].lower() / 'unverified')
f.write(crl.dump())