diff --git a/.github/workflows/_build-package.yml b/.github/workflows/_build-package.yml new file mode 100644 index 0000000..b468257 --- /dev/null +++ b/.github/workflows/_build-package.yml @@ -0,0 +1,22 @@ +name: build-package +on: + workflow_call: +jobs: + build: + name: Build wheel and sdist + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: 3.13 + - name: Install build dependencies + run: pip install --no-cache-dir -U pip . build twine + - name: Build package + run: python -m build --sdist --wheel + - name: Upload built distributions + uses: actions/upload-artifact@v4 + with: + name: dist + path: dist \ No newline at end of file diff --git a/.github/workflows/_static-checks.yml b/.github/workflows/_static-checks.yml new file mode 100644 index 0000000..e31eb1c --- /dev/null +++ b/.github/workflows/_static-checks.yml @@ -0,0 +1,24 @@ +name: static-checks +on: + workflow_call: +jobs: + static-checks: + name: Run static checks + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: pip install --no-cache-dir -U pip . black flake8 bandit + - name: Lint check with flake8 + run: flake8 cortexutils/ tests/ setup.py + - name: Format check with black + run: black --check cortexutils/ tests/ setup.py + - name: Security check with bandit + run: bandit -r cortexutils/ diff --git a/.github/workflows/_unit-tests.yml b/.github/workflows/_unit-tests.yml new file mode 100644 index 0000000..7b2eed4 --- /dev/null +++ b/.github/workflows/_unit-tests.yml @@ -0,0 +1,20 @@ +name: unit-tests +on: + workflow_call: +jobs: + unit-tests: + name: Run unit tests + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: pip install --no-cache-dir -U pip . + - name: Run unit tests + run: python -m unittest --verbose diff --git a/.github/workflows/_upload-package.yml b/.github/workflows/_upload-package.yml new file mode 100644 index 0000000..89073fd --- /dev/null +++ b/.github/workflows/_upload-package.yml @@ -0,0 +1,37 @@ +name: upload-package +on: + workflow_call: + secrets: + PYPI_TOKEN: + required: true +jobs: + upload: + name: Upload wheel and sdist + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Compare tag and package version + run: | + TAG=${GITHUB_REF#refs/*/} + VERSION=$(grep -Po '(?<=version=")[^"]*' setup.py) + if [ "$TAG" != "$VERSION" ]; then + echo "Tag value and package version are different: ${TAG} != ${VERSION}" + exit 1 + fi + - name: Download built distributions + uses: actions/download-artifact@v4 + with: + name: dist + path: dist + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: 3.13 + - name: Install build dependencies + run: pip install --no-cache-dir -U pip . twine + - name: Upload to PyPI + run: twine upload dist/* + env: + TWINE_REPOSITORY_URL: https://upload.pypi.org/legacy/ + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} diff --git a/.github/workflows/main-cicd.yml b/.github/workflows/main-cicd.yml new file mode 100644 index 0000000..57107d6 --- /dev/null +++ b/.github/workflows/main-cicd.yml @@ -0,0 +1,21 @@ +name: cicd +on: + push: + branches: + - main + tags: + - "*" + pull_request: +jobs: + static-checks: + uses: ./.github/workflows/_static-checks.yml + unit-tests: + uses: ./.github/workflows/_unit-tests.yml + build-package: + uses: ./.github/workflows/_build-package.yml + upload-package: + if: startsWith(github.ref, 'refs/tags/') + uses: ./.github/workflows/_upload-package.yml + needs: [build-package, unit-tests, static-checks] + secrets: + PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} diff --git a/cortexutils/analyzer.py b/cortexutils/analyzer.py index ec639c1..9136224 100644 --- a/cortexutils/analyzer.py +++ b/cortexutils/analyzer.py @@ -1,15 +1,12 @@ #!/usr/bin/env python # encoding: utf-8 -import json import os -import stat +import tempfile +from shutil import copyfileobj from cortexutils.extractor import Extractor from cortexutils.worker import Worker -from shutil import copyfileobj -import tempfile -import ntpath class Analyzer(Worker): @@ -21,21 +18,27 @@ def __init__(self, job_directory=None, secret_phrases=None): self.artifact = self._input # Check for auto extraction config - self.auto_extract = self.get_param('config.auto_extract', self.get_param('config.auto_extract_artifacts', True)) + self.auto_extract = self.get_param( + "config.auto_extract", self.get_param("config.auto_extract_artifacts", True) + ) def get_data(self): """Wrapper for getting data from input dict. :return: Data (observable value) given through Cortex""" - if self.data_type == 'file': - return self.get_param('filename', None, 'Missing filename.') + if self.data_type == "file": + return self.get_param("filename", None, "Missing filename.") else: - return self.get_param('data', None, 'Missing data field') + return self.get_param("data", None, "Missing data field") def get_param(self, name, default=None, message=None): data = super(Analyzer, self).get_param(name, default, message) - if name == 'file' and self.data_type == 'file' and self.job_directory is not None: - path = '%s/input/%s' % (self.job_directory, data) + if ( + name == "file" + and self.data_type == "file" + and self.job_directory is not None + ): + path = "%s/input/%s" % (self.job_directory, data) if os.path.isfile(path): return path else: @@ -50,17 +53,19 @@ def build_taxonomy(self, level, namespace, predicate, value): :return: dict """ # Set info level if something not expected is set - if level not in ['info', 'safe', 'suspicious', 'malicious']: - level = 'info' + if level not in ["info", "safe", "suspicious", "malicious"]: + level = "info" return { - 'level': level, - 'namespace': namespace, - 'predicate': predicate, - 'value': value + "level": level, + "namespace": namespace, + "predicate": predicate, + "value": value, } def summary(self, raw): - """Returns a summary, needed for 'short.html' template. Overwrite it for your needs! + """Returns a summary, needed for 'short.html' template. + + Overwrite it for your needs! :returns: by default return an empty dict""" return {} @@ -75,20 +80,26 @@ def artifacts(self, raw): return [] def build_artifact(self, data_type, data, **kwargs): - if data_type == 'file': + if data_type == "file": if os.path.isfile(data): dst = tempfile.NamedTemporaryFile( - dir=os.path.join(self.job_directory, "output"), delete=False) - with open(data, 'rb') as src: + dir=os.path.join(self.job_directory, "output"), delete=False + ) + with open(data, "rb") as src: copyfileobj(src, dst) dstfname = dst.name dst.close() os.chmod(dstfname, 0o444) - kwargs.update({'dataType': data_type, 'file': os.path.basename(dst.name), - 'filename': os.path.basename(data)}) + kwargs.update( + { + "dataType": data_type, + "file": os.path.basename(dst.name), + "filename": os.path.basename(data), + } + ) return kwargs else: - kwargs.update({'dataType': data_type, 'data': data}) + kwargs.update({"dataType": data_type, "data": data}) return kwargs def report(self, full_report, ensure_ascii=False): @@ -101,19 +112,22 @@ def report(self, full_report, ensure_ascii=False): try: summary = self.summary(full_report) except Exception: - pass + pass # nosec B110 operation_list = [] try: operation_list = self.operations(full_report) except Exception: - pass - super(Analyzer, self).report({ - 'success': True, - 'summary': summary, - 'artifacts': self.artifacts(full_report), - 'operations': operation_list, - 'full': full_report - }, ensure_ascii) + pass # nosec B110 + super(Analyzer, self).report( + { + "success": True, + "summary": summary, + "artifacts": self.artifacts(full_report), + "operations": operation_list, + "full": full_report, + }, + ensure_ascii, + ) def run(self): """Overwritten by analyzers""" @@ -121,20 +135,26 @@ def run(self): # Not breaking compatibility def notSupported(self): - self.error('This datatype is not supported by this analyzer.') + self.error("This datatype is not supported by this analyzer.") # Not breaking compatibility def unexpectedError(self, e): - self.error('Unexpected Error: ' + str(e)) + self.error("Unexpected Error: " + str(e)) # Not breaking compatibility def getData(self): - """For not breaking compatibility to cortexutils.analyzer, this wraps get_data()""" + """Wrapper of get_data. + + For not breaking compatibility to cortexutils.analyzer. + """ return self.get_data() # Not breaking compatibility def getParam(self, name, default=None, message=None): - """For not breaking compatibility to cortexutils.analyzer, this wraps get_param()""" + """Wrapper for get_param. + + For not breaking compatibility to cortexutils.analyzer. + """ return self.get_param(name=name, default=default, message=message) # Not breaking compatibility diff --git a/cortexutils/extractor.py b/cortexutils/extractor.py index fed739b..d5ef5b0 100644 --- a/cortexutils/extractor.py +++ b/cortexutils/extractor.py @@ -1,7 +1,6 @@ #!/usr/bin/env python -from builtins import str as unicode - import re +from builtins import str as unicode class ExtractionError(Exception): @@ -9,15 +8,19 @@ class ExtractionError(Exception): class Extractor: - """ - The extractor class tries to detect ioc attribute types using regex-matching. Two functions are provided: - - ``check_string(str)`` which checks a string for a regex match and just returns the type - - ``check_iterable(itr)`` that iterates over a list or a dictionary and returns a list of {type, value} dicts + """The extractor class tries to detect ioc attribute types using regex-matching. - Currently, this is not a fulltext search, so the the ioc's must be isolated strings, to get found. - This can be iterated for ioc's. + Two functions are provided: + - ``check_string(str)`` which checks a string for a regex matc + and just returns the type + - ``check_iterable(itr)`` that iterates over a list or a dictionary + and returns a list of {type, value} dicts - :param ignore: List of strings or a single string to ignore when matching artifacts to type + Currently, this is not a fulltext search, so the the ioc's must be isolated strings, + to get found. This can be iterated for ioc's. + + :param ignore: List of strings or a single string to ignore + when matching artifacts to type :type ignore: list, str """ @@ -35,85 +38,100 @@ def __init_regex(): """ # IPv4 - regex = [{ - 'type': 'ip', - 'regex': re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}') - }] - - # IPv6 - # RegEx from https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses - r = '(' + \ - '([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|' + \ - '([0-9a-fA-F]{1,4}:){1,7}:|' + \ - '([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|' + \ - '([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|' + \ - '([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|' + \ - '([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|' + \ - '([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|' + \ - '[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|' + \ - ':((:[0-9a-fA-F]{1,4}){1,7}|:)|' + \ - 'fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|' + \ - '::(ffff(:0{1,4}){0,1}:){0,1}' + \ - '((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' + \ - '(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|' + \ - '([0-9a-fA-F]{1,4}:){1,4}:' + \ - '((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' + \ - '(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])' + \ - ')' - regex.append({ - 'type': 'ip', - 'regex': re.compile(r'{}'.format(r)) - }) + regex = [ + { + "type": "ip", + "regex": re.compile(r"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}"), + } + ] + + # IPv6 RegEx from: + # https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses + r = ( + "(" + + "([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|" + + "([0-9a-fA-F]{1,4}:){1,7}:|" + + "([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|" + + "([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|" + + "([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|" + + "([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|" + + "([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|" + + "[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|" + + ":((:[0-9a-fA-F]{1,4}){1,7}|:)|" + + "fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|" + + "::(ffff(:0{1,4}){0,1}:){0,1}" + + r"((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}" + + "(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|" + + "([0-9a-fA-F]{1,4}:){1,4}:" + + r"((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}" + + "(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])" + + ")" + ) + regex.append({"type": "ip", "regex": re.compile(r"{}".format(r))}) # URL - regex.append({ - 'type': 'url', - 'regex': re.compile(r'^(http://|https://)') - }) + regex.append({"type": "url", "regex": re.compile(r"^(http://|https://)")}) # domain - regex.append({ - 'type': 'domain', - 'regex': re.compile(r'^(?!http://|https://)^[\w\-]+\.[a-zA-Z]+$') - }) + regex.append( + { + "type": "domain", + "regex": re.compile(r"^(?!http://|https://)^[\w\-]+\.[a-zA-Z]+$"), + } + ) # hash - regex.append({ - 'type': 'hash', - 'regex': re.compile(r'^([0-9a-fA-F]{32}|[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$') - }) + regex.append( + { + "type": "hash", + "regex": re.compile( + r"^([0-9a-fA-F]{32}|[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$" + ), + } + ) # user-agent - regex.append({ - 'type': 'user-agent', - 'regex': re.compile(r'^(Mozilla/[45]\.0 |AppleWebKit/[0-9]{3}\.[0-9]{2} |Chrome/[0-9]{2}\.[0-9]\.' - r'[0-9]{4}\.[0-9]{3} |Safari/[0-9]{3}\.[0-9]{2} ).*?$') - }) + regex.append( + { + "type": "user-agent", + "regex": re.compile( + r"^(Mozilla/[45]\.0 |AppleWebKit/[0-9]{3}\.[0-9]{2} |Chrome/[0-9]{2}\.[0-9]\." # noqa + r"[0-9]{4}\.[0-9]{3} |Safari/[0-9]{3}\.[0-9]{2} ).*?$" + ), + } + ) # uri_path - regex.append({ - 'type': 'uri_path', - 'regex': re.compile(r'^(?!http://|https://)[A-Za-z]*://') - }) + regex.append( + { + "type": "uri_path", + "regex": re.compile(r"^(?!http://|https://)[A-Za-z]*://"), + } + ) # regkey - regex.append({ - 'type': 'registry', - 'regex': re.compile(r'^(HKEY|HKLM|HKCU|HKCR|HKCC)' - r'(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$') - }) + regex.append( + { + "type": "registry", + "regex": re.compile( + r"^(HKEY|HKLM|HKCU|HKCR|HKCC)" + r"(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$" # noqa + ), + } + ) # mail - regex.append({ - 'type': 'mail', - 'regex': re.compile(r'[\w.\-]+@\w+\.[\w.]+') - }) + regex.append({"type": "mail", "regex": re.compile(r"[\w.\-]+@\w+\.[\w.]+")}) # fqdn - regex.append({ - 'type': 'fqdn', - 'regex': re.compile(r'^(?!http://|https://)^[\w\-.]+\.[\w\-]+\.[a-zA-Z]+$') - }) + regex.append( + { + "type": "fqdn", + "regex": re.compile( + r"^(?!http://|https://)^[\w\-.]+\.[\w\-]+\.[a-zA-Z]+$" + ), + } + ) return regex @@ -127,15 +145,15 @@ def __checktype(self, value): """ if self.ignore: if isinstance(value, str) and self.ignore in value: - return '' + return "" if self.ignore == value: - return '' + return "" if isinstance(value, (str, unicode)): for r in self.regex: - if r.get('regex').match(value): - return r.get('type') - return '' + if r.get("regex").match(value): + return r.get("type") + return "" def check_string(self, value): """ @@ -149,8 +167,9 @@ def check_string(self, value): return self.__checktype(value) def check_iterable(self, iterable): - """ - Checks values of a list or a dict on ioc's. Returns a list of dict {type, value}. Raises TypeError, if iterable + """Checks values of a list or a dict on ioc's. + + Returns a list of dict {type, value}. Raises TypeError, if iterable is not an expected type. :param iterable: List or dict of values @@ -163,10 +182,7 @@ def check_iterable(self, iterable): if isinstance(iterable, (str, unicode)): dt = self.__checktype(iterable) if len(dt) > 0: - results.append({ - 'dataType': dt, - 'data': iterable - }) + results.append({"dataType": dt, "data": iterable}) elif isinstance(iterable, list): for item in iterable: if isinstance(item, list) or isinstance(item, dict): @@ -174,10 +190,7 @@ def check_iterable(self, iterable): else: dt = self.__checktype(item) if len(dt) > 0: - results.append({ - 'dataType': dt, - 'data': item - }) + results.append({"dataType": dt, "data": item}) elif isinstance(iterable, dict): for _, item in iterable.items(): if isinstance(item, list) or isinstance(item, dict): @@ -185,12 +198,9 @@ def check_iterable(self, iterable): else: dt = self.__checktype(item) if len(dt) > 0: - results.append({ - 'dataType': dt, - 'data': item - }) + results.append({"dataType": dt, "data": item}) else: - raise TypeError('Not supported type.') + raise TypeError("Not supported type.") return self.deduplicate(results) @@ -200,7 +210,10 @@ def deduplicate(list_of_objects): for obj in list_of_objects: present = False for new_object in dedup_list: - if obj['dataType'] == new_object['dataType'] and obj['data'] == new_object['data']: + if ( + obj["dataType"] == new_object["dataType"] + and obj["data"] == new_object["data"] + ): present = True if not present: dedup_list.append(obj) diff --git a/cortexutils/responder.py b/cortexutils/responder.py index 3cd0939..f9ded69 100644 --- a/cortexutils/responder.py +++ b/cortexutils/responder.py @@ -1,8 +1,6 @@ #!/usr/bin/env python # encoding: utf-8 -import json -import os from cortexutils.worker import Worker @@ -18,7 +16,7 @@ def get_data(self): """Wrapper for getting data from input dict. :return: Data (observable value) given through Cortex""" - return self.get_param('data', None, 'Missing data field') + return self.get_param("data", None, "Missing data field") def report(self, full_report, ensure_ascii=False): """Returns a json dict via stdout. @@ -30,12 +28,11 @@ def report(self, full_report, ensure_ascii=False): try: operation_list = self.operations(full_report) except Exception: - pass - super(Responder, self).report({ - 'success': True, - 'full': full_report, - 'operations': operation_list - }, ensure_ascii) + pass # nosec B110 + super(Responder, self).report( + {"success": True, "full": full_report, "operations": operation_list}, + ensure_ascii, + ) def run(self): """Overwritten by responders""" diff --git a/cortexutils/worker.py b/cortexutils/worker.py index c5aab6a..0ff9827 100644 --- a/cortexutils/worker.py +++ b/cortexutils/worker.py @@ -4,11 +4,11 @@ import codecs import json import os -import select import sys DEFAULT_SECRET_PHRASES = ("key", "password", "secret") + class Worker(object): READ_TIMEOUT = 3 # seconds @@ -17,7 +17,7 @@ def __init__(self, job_directory, secret_phrases): if len(sys.argv) > 1: job_directory = sys.argv[1] else: - job_directory = '/job' + job_directory = "/job" self.job_directory = job_directory if secret_phrases is None: self.secret_phrases = DEFAULT_SECRET_PHRASES @@ -25,73 +25,77 @@ def __init__(self, job_directory, secret_phrases): self.secret_phrases = secret_phrases # Load input self._input = {} - if os.path.isfile('%s/input/input.json' % self.job_directory): - with open('%s/input/input.json' % self.job_directory) as f_input: + if os.path.isfile("%s/input/input.json" % self.job_directory): + with open("%s/input/input.json" % self.job_directory) as f_input: self._input = json.load(f_input) - else: # If input file doesn't exist, fallback to old behavior and read input from stdin + else: + # If input file doesn't exist, + # fallback to old behavior and read input from stdin self.job_directory = None self.__set_encoding() if not sys.stdin.isatty(): self._input = json.load(sys.stdin) else: - self.error('Input file doesn''t exist') + self.error("Input file doesn" "t exist") # Set parameters - self.data_type = self.get_param('dataType', None, 'Missing dataType field') - self.tlp = self.get_param('tlp', 2) - self.pap = self.get_param('pap', 2) + self.data_type = self.get_param("dataType", None, "Missing dataType field") + self.tlp = self.get_param("tlp", 2) + self.pap = self.get_param("pap", 2) - self.enable_check_tlp = self.get_param('config.check_tlp', False) - self.max_tlp = self.get_param('config.max_tlp', 2) + self.enable_check_tlp = self.get_param("config.check_tlp", False) + self.max_tlp = self.get_param("config.max_tlp", 2) - self.enable_check_pap = self.get_param('config.check_pap', False) - self.max_pap = self.get_param('config.max_pap', 2) + self.enable_check_pap = self.get_param("config.check_pap", False) + self.max_pap = self.get_param("config.max_pap", 2) # Set proxy configuration if available - self.http_proxy = self.get_param('config.proxy.http') - self.https_proxy = self.get_param('config.proxy.https') + self.http_proxy = self.get_param("config.proxy.http") + self.https_proxy = self.get_param("config.proxy.https") self.__set_proxies() # Finally run check tlp if not (self.__check_tlp()): - self.error('TLP is higher than allowed.') + self.error("TLP is higher than allowed.") if not (self.__check_pap()): - self.error('PAP is higher than allowed.') + self.error("PAP is higher than allowed.") def __set_proxies(self): if self.http_proxy is not None: - os.environ['http_proxy'] = self.http_proxy + os.environ["http_proxy"] = self.http_proxy if self.https_proxy is not None: - os.environ['https_proxy'] = self.https_proxy + os.environ["https_proxy"] = self.https_proxy @staticmethod def __set_encoding(): try: - if sys.stdout.encoding != 'UTF-8': + if sys.stdout.encoding != "UTF-8": if sys.version_info[0] == 3: - sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict') + sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict") else: - sys.stdout = codecs.getwriter('utf-8')(sys.stdout, 'strict') - if sys.stderr.encoding != 'UTF-8': + sys.stdout = codecs.getwriter("utf-8")(sys.stdout, "strict") + if sys.stderr.encoding != "UTF-8": if sys.version_info[0] == 3: - sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'strict') + sys.stderr = codecs.getwriter("utf-8")(sys.stderr.buffer, "strict") else: - sys.stderr = codecs.getwriter('utf-8')(sys.stderr, 'strict') + sys.stderr = codecs.getwriter("utf-8")(sys.stderr, "strict") except Exception: - pass + pass # nosec B110 def __get_param(self, source, name, default=None, message=None): """Extract a specific parameter from given source. :param source: Python dict to search through - :param name: Name of the parameter to get. JSON-like syntax, e.g. `config.username` at first, but in recursive - calls a list + :param name: Name of the parameter to get. JSON-like syntax, + e.g. `config.username` at first, but in recursive calls a list :param default: Default value, if not found. Default: None - :param message: Error message. If given and name not found, exit with error. Default: None""" + :param message: Error message. If given and name not found, exit with error. + Default: None + """ if isinstance(name, str): - name = name.split('.') + name = name.split(".") if len(name) == 0: # The name is empty, return the source content @@ -120,17 +124,19 @@ def __write_output(self, data, ensure_ascii=False): json.dump(data, sys.stdout, ensure_ascii=ensure_ascii) else: try: - os.makedirs('%s/output' % self.job_directory) - except: - pass - with open('%s/output/output.json' % self.job_directory, mode='w') as f_output: + os.makedirs("%s/output" % self.job_directory) + except Exception: + pass # nosec B110 + with open( + "%s/output/output.json" % self.job_directory, mode="w" + ) as f_output: json.dump(data, f_output, ensure_ascii=ensure_ascii) def get_data(self): """Wrapper for getting data from input dict. :return: Data (observable value) given through Cortex""" - return self.get_param('data', None, 'Missing data field') + return self.get_param("data", None, "Missing data field") @staticmethod def build_operation(op_type, **parameters): @@ -139,9 +145,7 @@ def build_operation(op_type, **parameters): :param parameters: a dict including the operation's params :return: dict """ - operation = { - 'type': op_type - } + operation = {"type": op_type} operation.update(parameters) return operation @@ -154,15 +158,22 @@ def operations(self, raw): def get_param(self, name, default=None, message=None): """Just a wrapper for Analyzer.__get_param. - :param name: Name of the parameter to get. JSON-like syntax, e.g. `config.username` + :param name: Name of the parameter to get. + JSON-like syntax, e.g. `config.username` :param default: Default value, if not found. Default: None - :param message: Error message. If given and name not found, exit with error. Default: None""" + :param message: Error message. If given and name not found, exit with error. + Default: None + """ return self.__get_param(self._input, name, default, message) def error(self, message, ensure_ascii=False): - """Stop analyzer with an error message. Changing ensure_ascii can be helpful when stucking - with ascii <-> utf-8 issues. Additionally, the input as returned, too. Maybe helpful when dealing with errors. + """Stop analyzer with an error message. + + Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues. + Additionally, the input as returned, too. + Maybe helpful when dealing with errors. + :param message: Error message :param ensure_ascii: Force ascii output. Default: False""" @@ -170,23 +181,25 @@ def error(self, message, ensure_ascii=False): analyzer_input = self._input # Loop over all the sensitive config names and clean them - for config_key in analyzer_input.get('config', {}).keys(): + for config_key in analyzer_input.get("config", {}).keys(): if any( - secret_phrase in config_key.lower() + secret_phrase in config_key.lower() for secret_phrase in self.secret_phrases ): - analyzer_input['config'][config_key] = 'REMOVED' + analyzer_input["config"][config_key] = "REMOVED" - self.__write_output({'success': False, - 'input': analyzer_input, - 'errorMessage': message}, - ensure_ascii=ensure_ascii) + self.__write_output( + {"success": False, "input": analyzer_input, "errorMessage": message}, + ensure_ascii=ensure_ascii, + ) # Force exit after error sys.exit(1) def summary(self, raw): - """Returns a summary, needed for 'short.html' template. Overwrite it for your needs! + """Returns a summary, needed for 'short.html' template. + + Overwrite it for your needs! :returns: by default return an empty dict""" return {} diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8682720 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[flake8] +max-line-length=88 \ No newline at end of file diff --git a/setup.py b/setup.py index 814cf3f..e0bc00d 100644 --- a/setup.py +++ b/setup.py @@ -1,32 +1,36 @@ from setuptools import setup setup( - name='cortexutils', - version='2.2.0', - description='A Python library for including utility classes for Cortex analyzers and responders', - long_description=open('README').read(), - author='TheHive-Project', - author_email='support@thehive-project.org', - license='AGPL-V3', - url='https://github.com/TheHive-Project/Cortex-Analyzers/tree/master/contrib', + name="cortexutils", + version="2.2.0", + description=( + "A Python library for including utility classes for " + "Cortex analyzers and responders" + ), + long_description=open("README").read(), + author="TheHive-Project", + author_email="support@thehive-project.org", + license="AGPL-V3", + url="https://github.com/TheHive-Project/Cortex-Analyzers/tree/master/contrib", classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Intended Audience :: Information Technology', - 'Intended Audience :: Science/Research', - 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)', - 'Natural Language :: English', - 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Topic :: Security', - 'Topic :: Software Development :: Libraries :: Python Modules'], + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Intended Audience :: Information Technology", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", # noqa + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Topic :: Security", + "Topic :: Software Development :: Libraries :: Python Modules", + ], py_modules=[ - 'future', - 'cortexutils.worker', - 'cortexutils.analyzer', - 'cortexutils.responder', - 'cortexutils.extractor' + "future", + "cortexutils.worker", + "cortexutils.analyzer", + "cortexutils.responder", + "cortexutils.extractor", ], install_requires=[], - test_suite='tests' + test_suite="tests", ) diff --git a/tests/test_suite_analyzer.py b/tests/test_suite_analyzer.py index 335fae5..384ab4c 100644 --- a/tests/test_suite_analyzer.py +++ b/tests/test_suite_analyzer.py @@ -18,7 +18,7 @@ def load_test_fixture(fixture_path): path = os.path.dirname(os.path.abspath(__file__)) - fixture_file = open(path + '/' + fixture_path) + fixture_file = open(path + "/" + fixture_path) input = fixture_file.read() fixture_file.close() sys.stdin = StringIO(input) @@ -28,11 +28,11 @@ def load_test_fixture(fixture_path): class TestMinimalConfig(unittest.TestCase): def setUp(self): - load_test_fixture('fixtures/test-minimal-config.json') + load_test_fixture("fixtures/test-minimal-config.json") self.analyzer = Analyzer() def test_default_config(self): - self.assertEqual(self.analyzer.data_type, 'ip') + self.assertEqual(self.analyzer.data_type, "ip") self.assertEqual(self.analyzer.tlp, 2) self.assertEqual(self.analyzer.enable_check_tlp, False) self.assertEqual(self.analyzer.max_tlp, 2) @@ -44,30 +44,30 @@ def test_artifact_data(self): self.assertEqual(self.analyzer.get_data(), "1.1.1.1") def test_params_data(self): - self.assertEqual(self.analyzer.getParam('data'), "1.1.1.1") - self.assertEqual(self.analyzer.get_param('data'), "1.1.1.1") + self.assertEqual(self.analyzer.getParam("data"), "1.1.1.1") + self.assertEqual(self.analyzer.get_param("data"), "1.1.1.1") class TestProxyConfig(unittest.TestCase): def setUp(self): - load_test_fixture('fixtures/test-proxy-config.json') + load_test_fixture("fixtures/test-proxy-config.json") self.analyzer = Analyzer() def test_proxy_config(self): - proxy_url = 'http://local.proxy:8080' + proxy_url = "http://local.proxy:8080" self.assertEqual(self.analyzer.http_proxy, proxy_url) self.assertEqual(self.analyzer.https_proxy, proxy_url) - self.assertEqual(os.environ['http_proxy'], proxy_url) - self.assertEqual(os.environ['https_proxy'], proxy_url) + self.assertEqual(os.environ["http_proxy"], proxy_url) + self.assertEqual(os.environ["https_proxy"], proxy_url) class TestTlpConfig(unittest.TestCase): def setUp(self): - load_test_fixture('fixtures/test-tlp-config.json') + load_test_fixture("fixtures/test-tlp-config.json") self.analyzer = Analyzer() def test_check_tlp_disabled(self): @@ -99,59 +99,57 @@ def test_check_tlp_ok(self): class TestErrorResponse(unittest.TestCase): def setUp(self): - load_test_fixture('fixtures/test-error-response.json') + load_test_fixture("fixtures/test-error-response.json") self.analyzer = Analyzer() def test_error_response(self): - self.assertEqual(self.analyzer.get_param('config.password'), "secret") - self.assertEqual(self.analyzer.get_param('config.key'), "secret") - self.assertEqual(self.analyzer.get_param('config.apikey'), "secret") - self.assertEqual(self.analyzer.get_param('config.api_key'), "secret") - self.assertEqual(self.analyzer.get_param('config.apiSecret'), "secret") - self.assertEqual(self.analyzer.get_param('config.api_Pass'), "secret") - self.assertEqual(self.analyzer.get_param('config.API'), "secret") - + self.assertEqual(self.analyzer.get_param("config.password"), "secret") + self.assertEqual(self.analyzer.get_param("config.key"), "secret") + self.assertEqual(self.analyzer.get_param("config.apikey"), "secret") + self.assertEqual(self.analyzer.get_param("config.api_key"), "secret") + self.assertEqual(self.analyzer.get_param("config.apiSecret"), "secret") + self.assertEqual(self.analyzer.get_param("config.api_Pass"), "secret") + self.assertEqual(self.analyzer.get_param("config.API"), "secret") # Run the error method with self.assertRaises(SystemExit): - self.analyzer.error('Error', True) + self.analyzer.error("Error", True) # Get the output output = sys.stdout.getvalue().strip() json_output = json.loads(output) - self.assertEqual(json_output['success'], False) - self.assertEqual(json_output['errorMessage'], 'Error') - self.assertEqual(json_output['input']['dataType'], 'ip') - self.assertEqual(json_output['input']['data'], '1.1.1.1') - self.assertEqual(json_output['input']['config']['password'], 'REMOVED') - self.assertEqual(json_output['input']['config']['key'], 'REMOVED') - self.assertEqual(json_output['input']['config']['apikey'], 'REMOVED') - self.assertEqual(json_output['input']['config']['api_key'], 'REMOVED') - self.assertEqual(json_output['input']['config']['apiSecret'], 'REMOVED') - self.assertEqual(json_output['input']['config']['api_Pass'], 'secret') - self.assertEqual(json_output['input']['config']['API'], 'secret') - + self.assertEqual(json_output["success"], False) + self.assertEqual(json_output["errorMessage"], "Error") + self.assertEqual(json_output["input"]["dataType"], "ip") + self.assertEqual(json_output["input"]["data"], "1.1.1.1") + self.assertEqual(json_output["input"]["config"]["password"], "REMOVED") + self.assertEqual(json_output["input"]["config"]["key"], "REMOVED") + self.assertEqual(json_output["input"]["config"]["apikey"], "REMOVED") + self.assertEqual(json_output["input"]["config"]["api_key"], "REMOVED") + self.assertEqual(json_output["input"]["config"]["apiSecret"], "REMOVED") + self.assertEqual(json_output["input"]["config"]["api_Pass"], "secret") + self.assertEqual(json_output["input"]["config"]["API"], "secret") class TestReportResponse(unittest.TestCase): def setUp(self): - load_test_fixture('fixtures/test-report-response.json') + load_test_fixture("fixtures/test-report-response.json") self.analyzer = Analyzer() def test_report_response(self): # Run the analyzer report method - self.analyzer.report({'report_id': '12345'}) + self.analyzer.report({"report_id": "12345"}) # Get the output output = sys.stdout.getvalue().strip() json_output = json.loads(output) - self.assertEqual(json_output.get('success'), True) - self.assertEqual(json_output.get('errorMessage', None), None) - self.assertEqual(json_output['full']['report_id'], '12345') + self.assertEqual(json_output.get("success"), True) + self.assertEqual(json_output.get("errorMessage", None), None) + self.assertEqual(json_output["full"]["report_id"], "12345") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_suite_extractor.py b/tests/test_suite_extractor.py index 164dae3..d24db39 100644 --- a/tests/test_suite_extractor.py +++ b/tests/test_suite_extractor.py @@ -15,149 +15,138 @@ def setUp(self): def test_single_fqdn(self): self.assertEqual( - self.extractor.check_string(value='www.google.de'), - 'fqdn', - 'FQDN single string: wrong data type.' + self.extractor.check_string(value="www.google.de"), + "fqdn", + "FQDN single string: wrong data type.", ) def test_single_fqdn_as_unicode(self): self.assertEqual( - self.extractor.check_string(value=u'www.google.de'), - 'fqdn', - 'FQDN single string: wrong data type.' + self.extractor.check_string(value="www.google.de"), + "fqdn", + "FQDN single string: wrong data type.", ) def test_single_domain(self): self.assertEqual( - self.extractor.check_string(value='google.de'), - 'domain', - 'domain single string: wrong data type.' + self.extractor.check_string(value="google.de"), + "domain", + "domain single string: wrong data type.", ) def test_single_url(self): self.assertEqual( - self.extractor.check_string(value='https://google.de'), - 'url', - 'url single string: wrong data type.' + self.extractor.check_string(value="https://google.de"), + "url", + "url single string: wrong data type.", ) def test_single_ipv4(self): self.assertEqual( - self.extractor.check_string(value='10.0.0.1'), - 'ip', - 'ipv4 single string: wrong data type.' + self.extractor.check_string(value="10.0.0.1"), + "ip", + "ipv4 single string: wrong data type.", ) def test_single_ipv6(self): self.assertEqual( - self.extractor.check_string(value='2001:0db8:85a3:08d3:1319:8a2e:0370:7344'), - 'ip', - 'ipv6 single string: wrong data type.' + self.extractor.check_string( + value="2001:0db8:85a3:08d3:1319:8a2e:0370:7344" + ), + "ip", + "ipv6 single string: wrong data type.", ) def test_single_md5(self): self.assertEqual( - self.extractor.check_string(value='b373bd6b144e7846f45a1e47ced380b8'), - 'hash', - 'md5 single string: wrong data type.' + self.extractor.check_string(value="b373bd6b144e7846f45a1e47ced380b8"), + "hash", + "md5 single string: wrong data type.", ) def test_single_sha1(self): self.assertEqual( - self.extractor.check_string(value='94d4d48ba9a79304617f8291982bf69a8ce16fb0'), - 'hash', - 'sha1 single string: wrong data type.' + self.extractor.check_string( + value="94d4d48ba9a79304617f8291982bf69a8ce16fb0" + ), + "hash", + "sha1 single string: wrong data type.", ) def test_single_sha256(self): self.assertEqual( - self.extractor.check_string(value='7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'), - 'hash', - 'sha256 single string: wrong data type.' + self.extractor.check_string( + value="7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4" + ), + "hash", + "sha256 single string: wrong data type.", ) def test_single_useragent(self): self.assertEqual( - self.extractor.check_string(value='Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 ' - 'Firefox/52.0'), - 'user-agent', - 'user-agent single string: wrong data type.' + self.extractor.check_string( + value="Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 " + "Firefox/52.0" + ), + "user-agent", + "user-agent single string: wrong data type.", ) def test_single_mail(self): self.assertEqual( - self.extractor.check_string(value='VeryImportant@mail.org'), - 'mail', - 'mail single string: wrong data type.' + self.extractor.check_string(value="VeryImportant@mail.org"), + "mail", + "mail single string: wrong data type.", ) def test_single_regkey(self): self.assertEqual( - self.extractor.check_string(value='HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion\\Run'), - 'registry', - 'registry single string: wrong data type.' + self.extractor.check_string( + value=( + "HKEY_LOCAL_MACHINE\\Software\\Microsoft\\" + "Windows\\CurrentVersion\\Run" + ) + ), + "registry", + "registry single string: wrong data type.", ) def test_iterable(self): - l_real = self.extractor.check_iterable({ - 'results': [ - { - 'This is an totally unimportant key': '127.0.0.1' - }, - { - 'Totally nested!': ['https://nestedurl.verynested.com'] - } - ], - 'some_more': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4', - 'another_list': ['google.de', 'bing.com', 'www.fqdn.de'] - }) - l_expected = [ - { - 'dataType': 'hash', - 'data': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4' - }, - { - 'dataType': 'ip', - 'data': '127.0.0.1' - }, - { - 'dataType': 'url', - 'data': 'https://nestedurl.verynested.com' - }, - { - 'dataType': 'domain', - 'data': 'google.de' - }, - { - 'dataType': 'domain', - 'data': 'bing.com' - }, + l_real = self.extractor.check_iterable( { - 'dataType': 'fqdn', - 'data': 'www.fqdn.de' + "results": [ + {"This is an totally unimportant key": "127.0.0.1"}, + {"Totally nested!": ["https://nestedurl.verynested.com"]}, + ], + "some_more": "94d4d48ba9a79304617f8291982bf69a8ce16fb0", + "another_list": ["google.de", "bing.com", "www.fqdn.de"], } + ) + l_expected = [ + {"dataType": "hash", "data": "94d4d48ba9a79304617f8291982bf69a8ce16fb0"}, + {"dataType": "ip", "data": "127.0.0.1"}, + {"dataType": "url", "data": "https://nestedurl.verynested.com"}, + {"dataType": "domain", "data": "google.de"}, + {"dataType": "domain", "data": "bing.com"}, + {"dataType": "fqdn", "data": "www.fqdn.de"}, ] # Sorting the lists - l_real = sorted(l_real, key=lambda k: k['data']) - l_expected = sorted(l_expected, key=lambda k: k['data']) + l_real = sorted(l_real, key=lambda k: k["data"]) + l_expected = sorted(l_expected, key=lambda k: k["data"]) + + self.assertEqual(l_real, l_expected, "Check_iterable: wrong list returned.") - self.assertEqual( - l_real, - l_expected, - 'Check_iterable: wrong list returned.' - ) - def test_float_domain(self): self.assertEqual( - self.extractor.check_string(value='0.001234'), - '', - 'Check_float: float was recognized as domain, but should not.' + self.extractor.check_string(value="0.001234"), + "", + "Check_float: float was recognized as domain, but should not.", ) def test_float_fqdn(self): self.assertEqual( - self.extractor.check_string(value='0.1234.5678'), - '', - 'Check_float_fqdn: float was recognized as fqdn but should not.' + self.extractor.check_string(value="0.1234.5678"), + "", + "Check_float_fqdn: float was recognized as fqdn but should not.", ) diff --git a/tests/test_suite_integration.py b/tests/test_suite_integration.py index e9fe0cd..585af2b 100644 --- a/tests/test_suite_integration.py +++ b/tests/test_suite_integration.py @@ -15,16 +15,13 @@ class AnalyzerExtractorOutputTest(unittest.TestCase): def setUp(self): - sys.stdin = StringIO(json.dumps({ - "data": "8.8.8.8", - "dataType": "ip" - })) + sys.stdin = StringIO(json.dumps({"data": "8.8.8.8", "dataType": "ip"})) sys.stdout = StringIO() self.analyzer = Analyzer() def test_output(self): # Run the report method - self.analyzer.report({'result': '1.2.3.4'}) + self.analyzer.report({"result": "1.2.3.4"}) # Grab the output output = sys.stdout.getvalue().strip() @@ -32,27 +29,23 @@ def test_output(self): # Checks self.assertNotIn(self.analyzer.get_data(), output) - self.assertEqual(json_output['artifacts'][0]['data'], '1.2.3.4') - self.assertEqual(json_output['artifacts'][0]['dataType'], 'ip') + self.assertEqual(json_output["artifacts"][0]["data"], "1.2.3.4") + self.assertEqual(json_output["artifacts"][0]["dataType"], "ip") + class AnalyzerExtractorNoResultTest(unittest.TestCase): def setUp(self): - sys.stdin = StringIO(json.dumps({ - "data": "8.8.8.8", - "dataType": "ip" - })) + sys.stdin = StringIO(json.dumps({"data": "8.8.8.8", "dataType": "ip"})) sys.stdout = StringIO() self.analyzer = Analyzer() def test_output(self): # Run report method - self.analyzer.report({ - 'message': '8.8.8.8 was not found in database.' - }) + self.analyzer.report({"message": "8.8.8.8 was not found in database."}) # Grab the output output = sys.stdout.getvalue().strip() json_output = json.loads(output) # Check for empty artifact list - self.assertEqual(json_output['artifacts'], [], 'Artifact list should be empty.') + self.assertEqual(json_output["artifacts"], [], "Artifact list should be empty.")