| 
 | 1 | +import logging  | 
 | 2 | +import socket  | 
 | 3 | +import ssl  | 
 | 4 | +from datetime import datetime, timezone  | 
 | 5 | + | 
 | 6 | +from OpenSSL import crypto  | 
 | 7 | + | 
 | 8 | +from nettacker.core.lib.base import BaseEngine, BaseLibrary  | 
 | 9 | + | 
 | 10 | +log = logging.getLogger(__name__)  | 
 | 11 | + | 
 | 12 | + | 
 | 13 | +def is_weak_hash_algo(algo):  | 
 | 14 | +    algo = algo.lower()  | 
 | 15 | +    for unsafe_algo in ("md2", "md4", "md5", "sha1"):  | 
 | 16 | +        if unsafe_algo in algo:  | 
 | 17 | +            return True  | 
 | 18 | +    return False  | 
 | 19 | + | 
 | 20 | + | 
 | 21 | +def create_socket_connection(context, host, port, timeout):  | 
 | 22 | +    socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  | 
 | 23 | +    socket_connection.settimeout(timeout)  | 
 | 24 | +    socket_connection.connect((host, port))  | 
 | 25 | +    socket_connection = context.wrap_socket(socket_connection, server_hostname=host)  | 
 | 26 | +    return socket_connection  | 
 | 27 | + | 
 | 28 | + | 
 | 29 | +def is_weak_ssl_version(host, port, timeout):  | 
 | 30 | +    def test_ssl_version(host, port, timeout, ssl_version=None):  | 
 | 31 | +        try:  | 
 | 32 | +            context = ssl.SSLContext(ssl_version)  | 
 | 33 | +            socket_connection = create_socket_connection(context, host, port, timeout)  | 
 | 34 | +            return socket_connection.version()  | 
 | 35 | + | 
 | 36 | +        except ssl.SSLError:  | 
 | 37 | +            return False  | 
 | 38 | + | 
 | 39 | +        except (socket.timeout, ConnectionRefusedError, ConnectionResetError):  | 
 | 40 | +            return None  | 
 | 41 | + | 
 | 42 | +    ssl_versions = (  | 
 | 43 | +        ssl.PROTOCOL_TLS_CLIENT,  # TLS 1.3  | 
 | 44 | +        ssl.PROTOCOL_TLSv1_2,  | 
 | 45 | +        ssl.PROTOCOL_TLSv1_1,  | 
 | 46 | +        ssl.PROTOCOL_TLSv1,  | 
 | 47 | +    )  | 
 | 48 | +    supported_versions = []  | 
 | 49 | +    lowest_version = ""  | 
 | 50 | +    for ssl_version in ssl_versions:  | 
 | 51 | +        version = test_ssl_version(host, port, timeout, ssl_version=ssl_version)  | 
 | 52 | +        if version:  | 
 | 53 | +            lowest_version = version  | 
 | 54 | +            supported_versions.append(version)  | 
 | 55 | + | 
 | 56 | +    return supported_versions, lowest_version not in {"TLSv1.2", "TLSv1.3"}  | 
 | 57 | + | 
 | 58 | + | 
 | 59 | +def is_weak_cipher_suite(host, port, timeout):  | 
 | 60 | +    def test_single_cipher(host, port, cipher, timeout):  | 
 | 61 | +        try:  | 
 | 62 | +            context = ssl.create_default_context()  | 
 | 63 | +            context.check_hostname = False  | 
 | 64 | +            context.verify_mode = ssl.CERT_NONE  | 
 | 65 | +            context.set_ciphers(cipher)  | 
 | 66 | +            create_socket_connection(context, host, port, timeout)  | 
 | 67 | +            return True  | 
 | 68 | + | 
 | 69 | +        except ssl.SSLError:  | 
 | 70 | +            return False  | 
 | 71 | + | 
 | 72 | +        except (socket.timeout, ConnectionRefusedError, ConnectionResetError):  | 
 | 73 | +            return None  | 
 | 74 | + | 
 | 75 | +    cipher_suites = [  | 
 | 76 | +        "HIGH",  # OpenSSL cipher strings  | 
 | 77 | +        "MEDIUM",  | 
 | 78 | +        "LOW",  | 
 | 79 | +        "EXP",  | 
 | 80 | +        "eNULL",  | 
 | 81 | +        "aNULL",  | 
 | 82 | +        "RC4",  | 
 | 83 | +        "DES",  | 
 | 84 | +        "MD5",  | 
 | 85 | +        "SHA1",  | 
 | 86 | +        "DH",  | 
 | 87 | +        "ADH",  | 
 | 88 | +        "DHE",  | 
 | 89 | +        "ECDH",  | 
 | 90 | +        "ECDHE",  | 
 | 91 | +        "TLSv1",  | 
 | 92 | +        "TLSv1.1",  | 
 | 93 | +        "TLSv1.2",  | 
 | 94 | +        "TLSv1.3",  | 
 | 95 | +    ]  | 
 | 96 | + | 
 | 97 | +    supported_ciphers = []  | 
 | 98 | +    for cipher in cipher_suites:  | 
 | 99 | +        if test_single_cipher(host, port, cipher, timeout):  | 
 | 100 | +            supported_ciphers.append(cipher)  | 
 | 101 | + | 
 | 102 | +    weak_ciphers = {"LOW", "EXP", "eNULL", "aNULL", "RC4", "DES", "MD5", "DH", "ADH"}  | 
 | 103 | +    for cipher in supported_ciphers:  | 
 | 104 | +        if cipher in weak_ciphers:  | 
 | 105 | +            return supported_ciphers, True  | 
 | 106 | + | 
 | 107 | +    return supported_ciphers, False  | 
 | 108 | + | 
 | 109 | + | 
 | 110 | +def create_tcp_socket(host, port, timeout):  | 
 | 111 | +    try:  | 
 | 112 | +        socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  | 
 | 113 | +        socket_connection.settimeout(timeout)  | 
 | 114 | +        socket_connection.connect((host, port))  | 
 | 115 | +        ssl_flag = False  | 
 | 116 | +    except ConnectionRefusedError:  | 
 | 117 | +        return None  | 
 | 118 | + | 
 | 119 | +    try:  | 
 | 120 | +        socket_connection = ssl.wrap_socket(socket_connection)  | 
 | 121 | +        ssl_flag = True  | 
 | 122 | +    except Exception:  | 
 | 123 | +        socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  | 
 | 124 | +        socket_connection.settimeout(timeout)  | 
 | 125 | +        socket_connection.connect((host, port))  | 
 | 126 | + | 
 | 127 | +    return socket_connection, ssl_flag  | 
 | 128 | + | 
 | 129 | + | 
 | 130 | +def get_cert_info(cert):  | 
 | 131 | +    x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)  | 
 | 132 | +    weak_signing_algo = is_weak_hash_algo(str(x509.get_signature_algorithm()))  | 
 | 133 | +    cert_expires = datetime.strptime(x509.get_notAfter().decode("utf-8"), "%Y%m%d%H%M%S%z")  | 
 | 134 | +    cert_activation = datetime.strptime(x509.get_notBefore().decode("utf-8"), "%Y%m%d%H%M%S%z")  | 
 | 135 | +    issuer_str = ", ".join(  | 
 | 136 | +        f"{name.decode()}={value.decode()}" for name, value in x509.get_issuer().get_components()  | 
 | 137 | +    )  | 
 | 138 | +    subject_str = ", ".join(  | 
 | 139 | +        f"{name.decode()}={value.decode()}" for name, value in x509.get_subject().get_components()  | 
 | 140 | +    )  | 
 | 141 | +    return {  | 
 | 142 | +        "expired": x509.has_expired(),  | 
 | 143 | +        "self_signed": issuer_str == subject_str,  | 
 | 144 | +        "issuer": issuer_str,  | 
 | 145 | +        "subject": subject_str,  | 
 | 146 | +        "signing_algo": str(x509.get_signature_algorithm()),  | 
 | 147 | +        "weak_signing_algo": weak_signing_algo,  | 
 | 148 | +        "activation_date": cert_activation.strftime("%Y-%m-%d"),  | 
 | 149 | +        "not_activated": (cert_activation - datetime.now(timezone.utc)).days > 0,  | 
 | 150 | +        "expiration_date": cert_expires.strftime("%Y-%m-%d"),  | 
 | 151 | +        "expiring_soon": (cert_expires - datetime.now(timezone.utc)).days < 30,  | 
 | 152 | +    }  | 
 | 153 | + | 
 | 154 | + | 
 | 155 | +class SslLibrary(BaseLibrary):  | 
 | 156 | +    def ssl_certificate_scan(self, host, port, timeout):  | 
 | 157 | +        tcp_socket = create_tcp_socket(host, port, timeout)  | 
 | 158 | +        if tcp_socket is None:  | 
 | 159 | +            return None  | 
 | 160 | + | 
 | 161 | +        socket_connection, ssl_flag = tcp_socket  | 
 | 162 | +        peer_name = socket_connection.getpeername()  | 
 | 163 | +        scan_info = {  | 
 | 164 | +            "ssl_flag": ssl_flag,  | 
 | 165 | +            "peer_name": peer_name,  | 
 | 166 | +            "service": socket.getservbyport(int(port)),  | 
 | 167 | +        }  | 
 | 168 | + | 
 | 169 | +        if ssl_flag:  | 
 | 170 | +            cert = ssl.get_server_certificate((host, port))  | 
 | 171 | +            cert_info = get_cert_info(cert)  | 
 | 172 | +            scan_info = cert_info | scan_info  | 
 | 173 | +            return scan_info  | 
 | 174 | + | 
 | 175 | +        return scan_info  | 
 | 176 | + | 
 | 177 | +    def ssl_version_and_cipher_scan(self, host, port, timeout):  | 
 | 178 | +        tcp_socket = create_tcp_socket(host, port, timeout)  | 
 | 179 | +        if tcp_socket is None:  | 
 | 180 | +            return None  | 
 | 181 | + | 
 | 182 | +        socket_connection, ssl_flag = tcp_socket  | 
 | 183 | +        peer_name = socket_connection.getpeername()  | 
 | 184 | + | 
 | 185 | +        if ssl_flag:  | 
 | 186 | +            try:  | 
 | 187 | +                cert = ssl.get_server_certificate((host, port))  | 
 | 188 | +            except ssl.SSLError:  | 
 | 189 | +                cert = None  | 
 | 190 | +            cert_info = get_cert_info(cert) if cert else None  | 
 | 191 | +            ssl_ver, weak_version = is_weak_ssl_version(host, port, timeout)  | 
 | 192 | +            cipher_suite, weak_cipher_suite = is_weak_cipher_suite(host, port, timeout)  | 
 | 193 | + | 
 | 194 | +            return {  | 
 | 195 | +                "ssl_version": ssl_ver,  | 
 | 196 | +                "weak_version": weak_version,  | 
 | 197 | +                "cipher_suite": cipher_suite,  | 
 | 198 | +                "weak_cipher_suite": weak_cipher_suite,  | 
 | 199 | +                "issuer": cert_info["issuer"] if cert_info else "NA",  | 
 | 200 | +                "subject": cert_info["subject"] if cert_info else "NA",  | 
 | 201 | +                "expiration_date": cert_info["expiration_date"] if cert_info else "NA",  | 
 | 202 | +                "ssl_flag": ssl_flag,  | 
 | 203 | +                "peer_name": peer_name,  | 
 | 204 | +                "service": socket.getservbyport(int(port)),  | 
 | 205 | +            }  | 
 | 206 | + | 
 | 207 | +        return {  | 
 | 208 | +            "ssl_flag": ssl_flag,  | 
 | 209 | +            "service": socket.getservbyport(int(port)),  | 
 | 210 | +            "peer_name": peer_name,  | 
 | 211 | +        }  | 
 | 212 | + | 
 | 213 | + | 
 | 214 | +class SslEngine(BaseEngine):  | 
 | 215 | +    library = SslLibrary  | 
 | 216 | + | 
 | 217 | +    def response_conditions_matched(self, sub_step, response):  | 
 | 218 | +        conditions = sub_step["response"]["conditions"]  | 
 | 219 | +        condition_type = sub_step["response"]["condition_type"]  | 
 | 220 | +        condition_results = {}  | 
 | 221 | +        if sub_step["method"] in {  | 
 | 222 | +            "ssl_certificate_scan",  | 
 | 223 | +            "ssl_version_and_cipher_scan",  | 
 | 224 | +        }:  | 
 | 225 | +            if response and response["ssl_flag"]:  | 
 | 226 | +                for condition in conditions:  | 
 | 227 | +                    if "grouped_conditions" in condition:  | 
 | 228 | +                        gc_type = conditions[condition]["condition_type"]  | 
 | 229 | +                        gc_conditions = conditions[condition]["conditions"]  | 
 | 230 | +                        gc_condition_results = {}  | 
 | 231 | +                        for gc_condition in gc_conditions:  | 
 | 232 | +                            if (  | 
 | 233 | +                                gc_conditions[gc_condition]["reverse"]  | 
 | 234 | +                                and not response[gc_condition]  | 
 | 235 | +                            ):  | 
 | 236 | +                                gc_condition_results[gc_condition] = not response[gc_condition]  | 
 | 237 | + | 
 | 238 | +                            elif (  | 
 | 239 | +                                not gc_conditions[gc_condition]["reverse"]  | 
 | 240 | +                                and response[gc_condition]  | 
 | 241 | +                            ):  | 
 | 242 | +                                gc_condition_results[gc_condition] = response[gc_condition]  | 
 | 243 | + | 
 | 244 | +                        if gc_type == "and":  | 
 | 245 | +                            gc_condition_results = (  | 
 | 246 | +                                gc_condition_results  | 
 | 247 | +                                if len(gc_condition_results) == len(gc_conditions)  | 
 | 248 | +                                else {}  | 
 | 249 | +                            )  | 
 | 250 | + | 
 | 251 | +                        condition_results.update(gc_condition_results)  | 
 | 252 | + | 
 | 253 | +                    elif (conditions[condition]["reverse"] and not response[condition]) or (  | 
 | 254 | +                        not conditions[condition]["reverse"] and response[condition]  | 
 | 255 | +                    ):  | 
 | 256 | +                        condition_results[condition] = True  | 
 | 257 | + | 
 | 258 | +                if condition_type == "and":  | 
 | 259 | +                    return condition_results if len(condition_results) == len(conditions) else []  | 
 | 260 | +                if condition_type == "or":  | 
 | 261 | +                    return condition_results if condition_results else []  | 
 | 262 | +                return []  | 
 | 263 | + | 
 | 264 | +        return []  | 
 | 265 | + | 
 | 266 | +    def apply_extra_data(self, sub_step, response):  | 
 | 267 | +        sub_step["response"]["ssl_flag"] = (  | 
 | 268 | +            response["ssl_flag"] if isinstance(response, dict) else False  | 
 | 269 | +        )  | 
 | 270 | +        sub_step["response"]["conditions_results"] = self.response_conditions_matched(  | 
 | 271 | +            sub_step, response  | 
 | 272 | +        )  | 
0 commit comments