diff --git a/1.6.0-rework/master/master.py b/1.6.0-rework/master/master.py new file mode 100755 index 0000000..36d3ec3 --- /dev/null +++ b/1.6.0-rework/master/master.py @@ -0,0 +1,53 @@ +#!/bin/env python3 + +import sys +import os + +# Make it able to import src/ +sys.path.append( + os.path.abspath( + os.path.join( + os.path.dirname(__file__), '../../src' + ) + ) +) + +from checks import FilePermissionCheck, FileOwnershipCheck +from helper import PrintHeader, PrintInfo + +if __name__ == '__main__': + PrintHeader() + PrintInfo("info", "1 - Control Plane Components") + PrintInfo("info", "1.1 - Master Node Configuration Files") + + kops = False + if kops: + in_file = "/etc/kubernetes/manifests/kube-apiserver.manifest" + else: + in_file = "/etc/kubernetes/manifests/kube-apiserver.yaml" + + check_1_1_1 = FilePermissionCheck( + title="1.1.1 - Ensure that the API server pod specification file permissions are set to 644 or more restrictive (Automated)", + level=1, + automated=True, + scored=True, + file=in_file, + max_allowed=0o644, + ) + + check_1_1_1.Run() + check_1_1_1.Print() + + check_1_1_2 = FileOwnershipCheck( + title="1.1.2 - Ensure that the API server pod specification file ownership is set to root:root (Automated)", + level=1, + automated=True, + scored=True, + file=in_file, + required_uid=0, + required_gid=0, + ) + + check_1_1_2.Run() + check_1_1_2.Print() + diff --git a/src/__init__.py b/src/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/src/checks/__init__.py b/src/checks/__init__.py new file mode 100755 index 0000000..33ebec2 --- /dev/null +++ b/src/checks/__init__.py @@ -0,0 +1,2 @@ +from .check import * +from .file_checks import * \ No newline at end of file diff --git a/src/checks/check.py b/src/checks/check.py new file mode 100755 index 0000000..eb4757a --- /dev/null +++ b/src/checks/check.py @@ -0,0 +1,50 @@ +#!/bin/env python3 + +from helper import PrintInfo + +class Check: + _pass: bool + _executed: bool + + _title: str + _automated: bool + _scored: bool + _level: int + + def __init__( + self, + title: str, + automated: bool=None, + scored: bool=None, + level: int=None, + ): + self._executed = False + self._pass = False + + self._title = title + self._automated = automated + self._scored = scored + self._level = level + + def Run(self): + if self._executed: + return + + self._executed = True + self._pass = True + + def Print(self): + if not self._executed: + return + + if self._pass: + PrintInfo("pass", self._title, level=self._level, automated=self._automated, scored=self._scored) + else: + PrintInfo("warn", self._title, level=self._level, automated=self._automated, scored=self._scored) + +def RunChecks(*checks: Check): + for check in checks: + if not isinstance(check, Check): + raise TypeError(f"Expected Check instance, got {type(check).__name__}") + check.Run() + check.Print() diff --git a/src/checks/file_checks.py b/src/checks/file_checks.py new file mode 100755 index 0000000..edc3266 --- /dev/null +++ b/src/checks/file_checks.py @@ -0,0 +1,75 @@ +#!/bin/env python3 + +import os +from .check import Check +from helper import PrintInfo + +class FilePermissionCheck(Check): + _file: str + _max_allowed: int + + def __init__( + self, + *args, + file: str="", + max_allowed:int=0o644, + **kwargs + ): + super().__init__(*args, **kwargs) + self._file = file + self._max_allowed = max_allowed + + def Run(self): + if self._executed: + return + + if not os.path.exists(self._file): + self._executed = False + self._pass = False + PrintInfo(severity="info", message=self._title) + PrintInfo(severity="info", message=" * File not found") + return + + file_mode = os.stat(self._file).st_mode & 0o777 + if file_mode <= self._max_allowed: + self._pass = True + else: + self._pass = False + self._executed = True + + +class FileOwnershipCheck(Check): + _file: str + _required_uid: str + _required_gid: str + + def __init__( + self, + *args, + file: str="", + required_uid: int = 0, + required_gid: int = 0, + **kwargs + ): + super().__init__(*args, **kwargs) + self._file = file + self._required_uid = required_uid + self._required_gid = required_gid + + def Run(self): + if self._executed: + return + + if not os.path.exists(self._file): + self._executed = False + self._pass = False + PrintInfo(severity="info", message=self._title) + PrintInfo(severity="info", message=" * File not found") + return + + stat_info = os.stat(self._file) + if stat_info.st_uid == self._required_uid and stat_info.st_gid == self._required_gid: + self._pass = True + else: + self._pass = False + self._executed = True diff --git a/src/helper/__init__.py b/src/helper/__init__.py new file mode 100755 index 0000000..4c264fd --- /dev/null +++ b/src/helper/__init__.py @@ -0,0 +1 @@ +from .print import * \ No newline at end of file diff --git a/src/helper/print.py b/src/helper/print.py new file mode 100755 index 0000000..e61f090 --- /dev/null +++ b/src/helper/print.py @@ -0,0 +1,55 @@ +#!/bin/env python3 + +bldred = '\033[1;31m' +bldgrn = '\033[1;32m' +bldblu = '\033[1;34m' +bldylw = '\033[1;33m' +bldcyn = '\033[1;36m' +bldgry = '\033[1;37m' +txtrst = '\033[0m' + +def PrintHeader(): + print(f'''{bldylw}# ------------------------------------------------------------------------------ +# Kubernetes CIS benchmark +# +# NeuVector, Inc. (c) 2020- +# +# NeuVector delivers an application and network intelligent container security +# solution that automatically adapts to protect running containers. Don’t let +# security concerns slow down your CI/CD processes. +# ------------------------------------------------------------------------------ +''') + +def PrintInfo(severity: str, message: str, level: int =None, automated=None, scored=None): + match level: + case None: + level_msg = "" + case _: + level_msg = f"[Level {level}]" + + match automated: + case None: + automated_msg = "" + case True: + automated_msg = f"{bldcyn}[Automated]{txtrst}" + case False: + automated_msg = f"{bldcyn}[Manual]{txtrst}" + + match scored: + case None: + scored_msg = "" + case True: + scored_msg = f"[Scored]" + case False: + scored_msg = f"[Not Scored]" + + + match severity: + case "info": + severity_msg = f"{bldblu}[INFO]{txtrst}" + case "pass": + severity_msg = f"{bldgrn}[PASS]{txtrst}" + case "warn": + severity_msg = f"{bldred}[WARN]{txtrst}" + + print(f"{severity_msg}{level_msg}{automated_msg}{scored_msg} {message}")