Skip to content
116 changes: 116 additions & 0 deletions dissect/target/helpers/certificate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from __future__ import annotations

import base64
import binascii
import hashlib
from pathlib import Path

from flow.record import RecordDescriptor

try:
from asn1crypto import pem, x509

HAS_ASN1 = True

except ImportError:
HAS_ASN1 = False


COMMON_CERTIFICATE_FIELDS = [
("digest", "fingerprint"),
("varint", "serial_number"),
("datetime", "not_valid_before"),
("datetime", "not_valid_after"),
("string", "issuer_dn"),
("string", "subject_dn"),
("bytes", "pem"),
]

CertificateRecord = RecordDescriptor(
"certificate",
[
*COMMON_CERTIFICATE_FIELDS,
],
)

# Translation layer for asn1crypto names to RFC4514 names.
# References: https://github.com/wbond/asn1crypto/blob/master/asn1crypto/x509.py @ NameType
# References: https://github.com/pyca/cryptography/blob/main/src/cryptography/x509/name.py
NAMEOID_TO_NAME = {
"common_name": "CN", # 2.5.4.3
"country_name": "C", # 2.5.4.6
"locality_name": "L", # 2.5.4.7
"state_or_province_name": "ST", # 2.5.4.8
"street_address": "STREET", # 2.5.4.9
"organization_name": "O", # 2.5.4.10
"organizational_unit_name": "OU", # 2.5.4.11
"domain_component": "DC", # 0.9.2342.192.00300.100.1.25
"user_id": "UID", # 0.9.2342.192.00300.100.1.1
}


def compute_pem_fingerprints(pem: str | bytes) -> tuple[str, str, str]:
"""Compute the MD5, SHA-1 and SHA-256 fingerprint hash of a x509 certificate PEM."""

if pem is None:
raise ValueError("No PEM provided")

if isinstance(pem, bytes):
pem = pem.decode()

elif not isinstance(pem, str):
raise TypeError("Provided PEM is not str or bytes")

stripped_pem = pem.strip().removeprefix("-----BEGIN CERTIFICATE-----").removesuffix("-----END CERTIFICATE-----")

try:
der = base64.b64decode(stripped_pem)
except binascii.Error as e:
raise ValueError(f"Unable to parse PEM: {e!s}") from e

md5 = hashlib.md5(der).hexdigest()
sha1 = hashlib.sha1(der).hexdigest()
sha256 = hashlib.sha256(der).hexdigest()

return md5, sha1, sha256


def parse_x509(file: str | bytes | Path) -> CertificateRecord:
"""Parses a PEM file. Returns a CertificateREcord. Does not parse a public key embedded in a x509 certificate."""

if isinstance(file, str):
content = file.encode()

elif isinstance(file, bytes):
content = file

elif isinstance(file, Path) or hasattr(file, "read_bytes"):
content = file.read_bytes()

else:
raise TypeError("Parameter file is not of type str, bytes or Path")

if not HAS_ASN1:
raise ValueError("Missing asn1crypto dependency")

md5, _, _ = compute_pem_fingerprints(content.decode())
_, _, der = pem.unarmor(content)
crt = x509.Certificate.load(der)

issuer = []
for key, value in crt.issuer.native.items():
issuer.append(f"{NAMEOID_TO_NAME.get(key, key)}={value}")

subject = []
for key, value in crt.subject.native.items():
subject.append(f"{NAMEOID_TO_NAME.get(key, key)}={value}")

return CertificateRecord(
not_valid_before=crt.not_valid_before,
not_valid_after=crt.not_valid_after,
issuer_dn=",".join(issuer),
subject_dn=",".join(subject),
fingerprint=(md5, crt.sha1.hex(), crt.sha256.hex()),
serial_number=crt.serial_number,
pem=crt.dump(),
)
59 changes: 49 additions & 10 deletions dissect/target/plugins/apps/webserver/apache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from typing import TYPE_CHECKING, NamedTuple

from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError
from dissect.target.helpers.fsutil import open_decompress
from dissect.target.helpers.certificate import parse_x509
from dissect.target.helpers.fsutil import TargetPath, open_decompress
from dissect.target.plugin import OperatingSystem, export
from dissect.target.plugins.apps.webserver.webserver import (
WebserverAccessLogRecord,
WebserverCertificateRecord,
WebserverErrorLogRecord,
WebserverHostRecord,
WebserverPlugin,
Expand Down Expand Up @@ -423,6 +425,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]:

yield WebserverAccessLogRecord(
ts=datetime.strptime(log["ts"], "%d/%b/%Y:%H:%M:%S %z"),
webserver=self.__namespace__,
remote_user=clean_value(log["remote_user"]),
remote_ip=log["remote_ip"],
local_ip=clean_value(log.get("local_ip")),
Expand Down Expand Up @@ -472,6 +475,7 @@ def error(self) -> Iterator[WebserverErrorLogRecord]:

yield WebserverErrorLogRecord(
ts=ts,
webserver=self.__namespace__,
pid=log.get("pid"),
remote_ip=remote_ip,
module=log["module"],
Expand All @@ -495,34 +499,69 @@ def hosts(self) -> Iterator[WebserverHostRecord]:
- https://httpd.apache.org/docs/2.4/mod/core.html#virtualhost
"""

def _map_path(path: str | None) -> TargetPath:
return self.target.fs.path(path) if path else None

for path in self.virtual_hosts:
# A configuration file can contain multiple VirtualHost directives.
current_vhost = {}
vhost = {}
for line in path.open("rt"):
line_lower = line.lower()
if "<virtualhost" in line_lower:
# Currently only supports a single addr:port combination.
if match := RE_VIRTUALHOST.match(line.lstrip()):
current_vhost = match.groupdict()
vhost = match.groupdict()
else:
self.target.log.warning("Unable to parse VirtualHost directive %r in %s", line, path)
current_vhost = {}
vhost = {}

elif "</virtualhost" in line_lower:
yield WebserverHostRecord(
ts=path.lstat().st_mtime,
server_name=current_vhost.get("servername") or current_vhost.get("addr"),
server_port=current_vhost.get("port"),
root_path=current_vhost.get("documentroot"),
access_log_config=current_vhost.get("customlog", "").rpartition(" ")[0],
error_log_config=current_vhost.get("errorlog"),
webserver=self.__namespace__,
server_name=vhost.get("servername") or vhost.get("addr"),
server_port=vhost.get("port"),
root_path=_map_path(vhost.get("documentroot")),
access_log_config=_map_path(vhost.get("customlog", "").rpartition(" ")[0]),
error_log_config=_map_path(vhost.get("errorlog")),
tls_certificate=_map_path(vhost.get("sslcertificatefile")),
tls_key=_map_path(vhost.get("sslcertificatekeyfile")),
source=path,
_target=self.target,
)

else:
key, _, value = line.strip().partition(" ")
current_vhost[key.lower()] = value
vhost[key.lower()] = value

@export(record=WebserverCertificateRecord)
def certificates(self) -> Iterator[WebserverCertificateRecord]:
"""Return host certificates for found Apache ``VirtualHost`` directives."""
certs = set()

for host in self.hosts():
if host.tls_certificate and (cert_path := self.target.fs.path(host.tls_certificate)).is_file():
certs.add(cert_path)

if self.server_root:
for cert_path in itertools.chain(self.server_root.glob("**/*.crt"), self.server_root.glob("**/*.pem")):
if cert_path not in certs:
certs.add(cert_path)

for cert_path in certs:
try:
cert = parse_x509(cert_path)
yield WebserverCertificateRecord(
ts=cert_path.lstat().st_mtime,
webserver=self.__namespace__,
**cert._asdict(),
host=host.server_name,
source=cert_path,
_target=self.target,
)
except Exception as e: # noqa: PERF203
self.target.log.warning("Unable to parse certificate %s :%s", cert_path, e)
self.target.log.debug("", exc_info=e)

def _iterate_log_lines(self, paths: list[Path]) -> Iterator[tuple[str, Path]]:
"""Iterate through a list of paths and yield tuples of loglines and the path of the file where they're from."""
Expand Down
1 change: 1 addition & 0 deletions dissect/target/plugins/apps/webserver/caddy.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]:
log = match.groupdict()
yield WebserverAccessLogRecord(
ts=datetime.strptime(log["ts"], "%d/%b/%Y:%H:%M:%S %z"),
webserver="caddy",
remote_ip=log["remote_ip"],
method=log["method"],
uri=log["uri"],
Expand Down
1 change: 1 addition & 0 deletions dissect/target/plugins/apps/webserver/iis.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]:
for iis_record in self.logs():
yield WebserverAccessLogRecord(
ts=iis_record.ts,
webserver="iis",
remote_user=iis_record.username,
remote_ip=iis_record.client_ip,
method=iis_record.request_method,
Expand Down
84 changes: 63 additions & 21 deletions dissect/target/plugins/apps/webserver/nginx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import json
import re
from datetime import datetime
from itertools import chain
from typing import TYPE_CHECKING

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.fsutil import open_decompress
from dissect.target.helpers.certificate import parse_x509
from dissect.target.helpers.fsutil import TargetPath, open_decompress
from dissect.target.plugin import export
from dissect.target.plugins.apps.webserver.webserver import (
WebserverAccessLogRecord,
WebserverCertificateRecord,
WebserverErrorLogRecord,
WebserverHostRecord,
WebserverPlugin,
Expand Down Expand Up @@ -245,6 +248,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]:

yield WebserverAccessLogRecord(
ts=ts,
webserver="nginx",
bytes_sent=bytes_sent,
**log,
source=path,
Expand Down Expand Up @@ -287,6 +291,7 @@ def error(self) -> Iterator[WebserverErrorLogRecord]:

yield WebserverErrorLogRecord(
ts=ts,
webserver="nginx",
**log,
source=path,
_target=self.target,
Expand All @@ -300,34 +305,71 @@ def hosts(self) -> Iterator[WebserverHostRecord]:
- https://nginx.org/en/docs/http/ngx_http_core_module.html#server
"""

def yield_record(current_server: dict) -> Iterator[WebserverHostRecord]:
yield WebserverHostRecord(
ts=host_path.lstat().st_mtime,
server_name=current_server.get("server_name") or current_server.get("listen"),
server_port=current_server.get("listen"),
root_path=current_server.get("root"),
access_log_config=current_server.get("access_log"),
error_log_config=current_server.get("error_log"),
source=host_path,
_target=self.target,
)

for host_path in self.host_paths:
current_server = {}
server = {}
seen_server_directive = False
for line in host_path.open("rt"):
if "server {" in line:
if current_server:
yield from yield_record(current_server)
current_server = {}
if server:
yield construct_hosts_record(self.target, host_path, server)
server = {}
seen_server_directive = True

elif seen_server_directive:
key, _, value = line.strip().partition(" ")
current_server[key] = value.rstrip(";")

if current_server:
yield from yield_record(current_server)
server[key] = value.rstrip(";").strip()

if server:
yield construct_hosts_record(self.target, host_path, server)

@export(record=WebserverCertificateRecord)
def certificates(self) -> Iterator[WebserverCertificateRecord]:
"""Return found server certificates in the NGINX configuration."""
certs = set()

for host in self.hosts():
# Parse x509 certificate
if host.tls_certificate and (cert_path := self.target.fs.path(host.tls_certificate)).is_file():
certs.add(cert_path)

root = self.target.fs.path("/etc/nginx")
for cert_path in chain(root.glob("**/*.crt"), root.glob("**/*.pem")):
if cert_path not in certs:
certs.add(cert_path)

for cert_path in certs:
try:
cert = parse_x509(cert_path)
yield WebserverCertificateRecord(
ts=cert_path.lstat().st_mtime,
webserver="nginx",
**cert._asdict(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yunzheng what are your thoughts on this pattern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong with this pattern. Other options could've been using a webserver metadata RecordDescriptor in combination with GroupedRecord, or using extend_record. But it seems there are already other Webserver-like RecordDescriptors defined so I think this is fine.

It will however, overwrite the _source, _generated and _version fields. But depending if that's an issue or not, can be fixed by using cert._asdict(exclude=["_source", "_generated", "_version"]).

host=host.server_name,
source=cert_path,
_target=self.target,
)
except Exception as e: # noqa: PERF203
self.target.log.warning("Unable to parse certificate %s :%s", cert_path, e)
self.target.log.debug("", exc_info=e)


def construct_hosts_record(target: Target, host_path: Path, server: dict) -> WebserverHostRecord:
def _map_path(path: str | None) -> TargetPath:
return target.fs.path(path) if path else None

return WebserverHostRecord(
ts=host_path.lstat().st_mtime,
webserver="nginx",
server_name=server.get("server_name") or server.get("listen"),
server_port=server.get("listen", "").replace(" ssl", "") or None,
root_path=_map_path(server.get("root")),
access_log_config=_map_path(server.get("access_log")),
error_log_config=_map_path(server.get("error_log")),
tls_certificate=_map_path(server.get("ssl_certificate")),
tls_key=_map_path(server.get("ssl_certificate_key")),
source=host_path,
_target=target,
)


def parse_json_line(line: str) -> dict[str, str] | None:
Expand Down
Loading
Loading