Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions 1.6.0-rework/master/master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/env python3

import sys
import os

# Make it able to import src/
sys.path.append(
os.path.abspath(
os.path.join(
os.path.dirname(__file__), '../../src'
)
)
)

from checks import FilePermissionCheck, FileOwnershipCheck
from helper import PrintHeader, PrintInfo

if __name__ == '__main__':
PrintHeader()
PrintInfo("info", "1 - Control Plane Components")
PrintInfo("info", "1.1 - Master Node Configuration Files")

kops = False
if kops:
in_file = "/etc/kubernetes/manifests/kube-apiserver.manifest"
else:
in_file = "/etc/kubernetes/manifests/kube-apiserver.yaml"

check_1_1_1 = FilePermissionCheck(
title="1.1.1 - Ensure that the API server pod specification file permissions are set to 644 or more restrictive (Automated)",
level=1,
automated=True,
scored=True,
file=in_file,
max_allowed=0o644,
)

check_1_1_1.Run()
check_1_1_1.Print()

check_1_1_2 = FileOwnershipCheck(
title="1.1.2 - Ensure that the API server pod specification file ownership is set to root:root (Automated)",
level=1,
automated=True,
scored=True,
file=in_file,
required_uid=0,
required_gid=0,
)

check_1_1_2.Run()
check_1_1_2.Print()

Empty file added src/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions src/checks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .check import *
from .file_checks import *
50 changes: 50 additions & 0 deletions src/checks/check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/env python3

from helper import PrintInfo

class Check:
_pass: bool
_executed: bool

_title: str
_automated: bool
_scored: bool
_level: int

def __init__(
self,
title: str,
automated: bool=None,
scored: bool=None,
level: int=None,
):
self._executed = False
self._pass = False

self._title = title
self._automated = automated
self._scored = scored
self._level = level

def Run(self):
if self._executed:
return

self._executed = True
self._pass = True

def Print(self):
if not self._executed:
return

if self._pass:
PrintInfo("pass", self._title, level=self._level, automated=self._automated, scored=self._scored)
else:
PrintInfo("warn", self._title, level=self._level, automated=self._automated, scored=self._scored)

def RunChecks(*checks: Check):
for check in checks:
if not isinstance(check, Check):
raise TypeError(f"Expected Check instance, got {type(check).__name__}")
check.Run()
check.Print()
75 changes: 75 additions & 0 deletions src/checks/file_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/bin/env python3

import os
from .check import Check
from helper import PrintInfo

class FilePermissionCheck(Check):
_file: str
_max_allowed: int

def __init__(
self,
*args,
file: str="",
max_allowed:int=0o644,
**kwargs
):
super().__init__(*args, **kwargs)
self._file = file
self._max_allowed = max_allowed

def Run(self):
if self._executed:
return

if not os.path.exists(self._file):
self._executed = False
self._pass = False
PrintInfo(severity="info", message=self._title)
PrintInfo(severity="info", message=" * File not found")
return

file_mode = os.stat(self._file).st_mode & 0o777
if file_mode <= self._max_allowed:
self._pass = True
else:
self._pass = False
self._executed = True


class FileOwnershipCheck(Check):
_file: str
_required_uid: str
_required_gid: str

def __init__(
self,
*args,
file: str="",
required_uid: int = 0,
required_gid: int = 0,
**kwargs
):
super().__init__(*args, **kwargs)
self._file = file
self._required_uid = required_uid
self._required_gid = required_gid

def Run(self):
if self._executed:
return

if not os.path.exists(self._file):
self._executed = False
self._pass = False
PrintInfo(severity="info", message=self._title)
PrintInfo(severity="info", message=" * File not found")
return

stat_info = os.stat(self._file)
if stat_info.st_uid == self._required_uid and stat_info.st_gid == self._required_gid:
self._pass = True
else:
self._pass = False
self._executed = True
1 change: 1 addition & 0 deletions src/helper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .print import *
55 changes: 55 additions & 0 deletions src/helper/print.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/env python3

bldred = '\033[1;31m'
bldgrn = '\033[1;32m'
bldblu = '\033[1;34m'
bldylw = '\033[1;33m'
bldcyn = '\033[1;36m'
bldgry = '\033[1;37m'
txtrst = '\033[0m'

def PrintHeader():
print(f'''{bldylw}# ------------------------------------------------------------------------------
# Kubernetes CIS benchmark
#
# NeuVector, Inc. (c) 2020-
#
# NeuVector delivers an application and network intelligent container security
# solution that automatically adapts to protect running containers. Don’t let
# security concerns slow down your CI/CD processes.
# ------------------------------------------------------------------------------
''')

def PrintInfo(severity: str, message: str, level: int =None, automated=None, scored=None):
match level:
case None:
level_msg = ""
case _:
level_msg = f"[Level {level}]"

match automated:
case None:
automated_msg = ""
case True:
automated_msg = f"{bldcyn}[Automated]{txtrst}"
case False:
automated_msg = f"{bldcyn}[Manual]{txtrst}"

match scored:
case None:
scored_msg = ""
case True:
scored_msg = f"[Scored]"
case False:
scored_msg = f"[Not Scored]"


match severity:
case "info":
severity_msg = f"{bldblu}[INFO]{txtrst}"
case "pass":
severity_msg = f"{bldgrn}[PASS]{txtrst}"
case "warn":
severity_msg = f"{bldred}[WARN]{txtrst}"

print(f"{severity_msg}{level_msg}{automated_msg}{scored_msg} {message}")