diff --git a/.gitmodules b/.gitmodules index 973d05aba..451614982 100644 --- a/.gitmodules +++ b/.gitmodules @@ -14,3 +14,7 @@ path = feel_project url = https://github.com/stratosphereips/feel_project branch = main + +[submodule "SlipsWeb"] + path = SlipsWeb + url = https://github.com/stratosphereips/SlipsWeb.git diff --git a/SlipsWeb b/SlipsWeb new file mode 160000 index 000000000..131238cfb --- /dev/null +++ b/SlipsWeb @@ -0,0 +1 @@ +Subproject commit 131238cfbdc5c9db2b833e1a83c52ee8b3f23462 diff --git a/config/slips.yaml b/config/slips.yaml index 02adc7f1b..e714e46c6 100644 --- a/config/slips.yaml +++ b/config/slips.yaml @@ -347,7 +347,7 @@ exporting_alerts: # Configuer all the methods Slips will export data with # Available options are slack or stix - # export_to : [stix] + # export_to : [stix] (And a TAXII server) # export_to : [slack] export_to: [] @@ -373,12 +373,11 @@ exporting_alerts: # For Stix, if Slips should use TLS use_https: false - # TAXII - discovery_path: /services/discovery-a - inbox_path: /services/inbox-a + # TAXII 2 discovery endpoint (relative path or full URL) + discovery_path: /taxii2/ - # Collection on the server you want to push stix data to - collection_name: collection-a + # Collection (ID or title) on the server you want to push STIX data to + collection_name: Alerts # This value is only used when Slips is running non-stop (e.g with -i ) # push_delay is the time to wait before pushing STIX data to server @@ -390,13 +389,7 @@ exporting_alerts: # TAXII server credentials taxii_username: admin - taxii_password: admin - - # URL used to obtain JWT token. set this to '' if you don't want to use it - # is required for JWT based authentication. - # (JWT based authentication is Optional) - # It's usually /management/auth - jwt_auth_path: /management/auth + taxii_password: changeme_before_installing_a_medallion_server ############################# CESNET: diff --git a/docker/Dockerfile b/docker/Dockerfile index afb498405..56bb05961 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -64,7 +64,7 @@ RUN apt update && apt install -y --no-install-recommends \ && curl -fsSL https://download.opensuse.org/repositories/security:zeek/xUbuntu_22.04/Release.key | gpg --dearmor | tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null \ && apt update \ && apt install -y --no-install-recommends --fix-missing \ - zeek \ + zeek-8.0 \ npm \ && ln -s /opt/zeek/bin/zeek /usr/local/bin/bro \ && apt clean \ diff --git a/docker/light/Dockerfile b/docker/light/Dockerfile index 355f35a0d..a8b8085b4 100644 --- a/docker/light/Dockerfile +++ b/docker/light/Dockerfile @@ -27,7 +27,7 @@ RUN set -eux; \ | tee /etc/apt/sources.list.d/security:zeek.list \ && curl -fsSL https://download.opensuse.org/repositories/security:zeek/xUbuntu_22.04/Release.key \ | gpg --dearmor | tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null \ - && apt-get update && apt-get install -y --no-install-recommends --fix-missing zeek \ + && apt-get update && apt-get install -y --no-install-recommends --fix-missing zeek-8.0 \ && ln -s /opt/zeek/bin/zeek /usr/local/bin/bro \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* diff --git a/docker/light/excluded_libs.txt b/docker/light/excluded_libs.txt index a132b1d09..941d02078 100644 --- a/docker/light/excluded_libs.txt +++ b/docker/light/excluded_libs.txt @@ -16,7 +16,6 @@ scikit_learn slackclient matplotlib stix2 -cabby pandas setuptools numpy diff --git a/docs/exporting.md b/docs/exporting.md index 18079ef07..e93d515c9 100644 --- a/docs/exporting.md +++ b/docs/exporting.md @@ -40,46 +40,44 @@ You can do this by going to the channel, then clicking on the channel's name. Th ## STIX -If you want to export alerts to your TAXII server using STIX format, change ```export_to``` variable to export to STIX, and Slips will automatically generate a -```STIX_data.json``` containing all alerts it detects. +If you want to export alerts to your TAXII 2 server using STIX 2.1 format, +set ```export_to``` to ```stix``` and Slips will automatically generate a +```STIX_data.json``` bundle containing the indicators it detects and push it to +your collection. [ExportingAlerts] export_to = [stix] -You can add your TAXII server details in the following variables: +Configure the TAXII client by editing the following variables: -```TAXII_server```: link to your TAXII server +```TAXII_server```: host name or IP address of the TAXII server. -```port```: port to be used +```port```: TCP port (optional, defaults to 80/443). -```use_https```: use https or not. +```use_https```: set to true to connect over HTTPS (be careful that the default TAXII server in SlipsWeb, Medallion, do not support HTTPS yet) -```discovery_path``` and ```inbox_path``` should contain URIs not full urls. For example: +```discovery_path```: TAXII discovery endpoint path or full URL + (for example ```/taxii2/```). -```python -discovery_path = /services/discovery-a -inbox_path = /services/inbox-a -``` - -```collection_name```: the collection on the server you want to push your STIX data to. - -```push_delay```: the time to wait before pushing STIX data to server (in seconds). -It is used when slips is running non-stop (e.g with -i ) +```collection_name```: ID or title of the TAXII collection that should receive your indicators. Be default `Alerts`. -```taxii_username```: TAXII server user credentials +```push_delay```: time between automatic pushes (in seconds) when Slips is +running continuously. -```taxii_password```: TAXII server user password +```taxii_username``` / ```taxii_password```: credentials used for HTTP Basic authentication. -```jwt_auth_path```: auth path if JWT based authentication is used. It's usually /management/auth. this is what we -use to get a token. +**Change the default config password of the TAXII servers you are going to export to in ```config/medallion_config.yaml```** -if your TAXII server is a remote server, you can set the ```port``` to 443 or 80. +Slips stores the generated bundle for each run in the output directory of that +execution (for example `output//STIX_data.json`), so you can inspect the +exact STIX objects that were pushed. -If running on a file, Slips will export to server after analysis is done. -If running on an interface, Slips will export to server every push_delay seconds. by default it's 1h. +If running on a file, Slips will export once before shutdown. +If running on an interface, Slips will export to the server every +```push_delay``` seconds (default 1 hour). ## JSON format diff --git a/docs/index.rst b/docs/index.rst index dba182359..b65092bfd 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -63,3 +63,4 @@ This documentation gives an overview how Slips works, how to use it and how to h contributing code_documentation related_repos + visualisation diff --git a/docs/installation.md b/docs/installation.md index 1548f7793..a0e5742e6 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -203,26 +203,25 @@ You can read more about it [here](https://stratospherelinuxips.readthedocs.io/en ## Installing Slips natively -Slips is dependent on three major elements: +Slips depends on three major elements: - Python 3.10.12 - Zeek 8.0.0 - Redis database 7.0.4 -To install these elements we will use APT package manager. After that, we will install python packages required for Slips to run and its modules to work. Also, Slips' interface Kalipso depend on Node.JS and several npm packages. +To install these elements, the script will use the APT package manager. After that, it will install python packages required for Slips to run and its modules to work. Also, Slips' interface Kalipso depend on Node JS and several npm packages. **Instructions to download everything for Slips are below.**
### Install Slips using shell script -You can install it using install.sh +You can install it using [install.sh](https://github.com/stratosphereips/StratosphereLinuxIPS/blob/master/install/install.sh) sudo chmod +x install.sh sudo ./install.sh - ### Installing Slips manually #### Installing Python, Redis, NodeJs, and required python and npm libraries. @@ -314,6 +313,7 @@ You can kill this redis database by running: ``` then choosing 1. +After these steps, if you need the submodules, you will need to clone them as done in the `install.sh` script. ## Installing Slips on a Raspberry PI diff --git a/docs/web_visualization.md b/docs/web_visualization.md new file mode 100644 index 000000000..a819d539e --- /dev/null +++ b/docs/web_visualization.md @@ -0,0 +1,9 @@ +# Slips Web Visualization + +To see the alerts of Slips in a visual way, the methodology is the following + +1. Slips must be configured to export the alerts in STIX format to a TAXII server, as explained in [exporting](https://stratospherelinuxips.readthedocs.io/en/develop/exporting.html). +2. You need to install a TAXII server (available in the SlipsWeb submodule folder). See its README.md +3. Use the program `SlipsWeb` that is availbale in the StratosphereWeb submodule that reads from the TAXII server. + +All the setup does not consume many resources, so you can run this visualization even in small servers like a Raspberry Pi. However, by having many Slips exporting to the same server you can centralize the visualization of many sensors in a unique location, probably with more hardware if needed. \ No newline at end of file diff --git a/install/install.sh b/install/install.sh index 0ed8aab37..821eea8dc 100755 --- a/install/install.sh +++ b/install/install.sh @@ -94,7 +94,7 @@ ZEEK_REPO_URL="download.opensuse.org/repositories/security:/zeek/xUbuntu_${UBUNT # Add the repository to the sources list echo "deb http://${ZEEK_REPO_URL}/ /" | tee /etc/apt/sources.list.d/security:zeek.list \ && curl -fsSL "https://${ZEEK_REPO_URL}/Release.key" | gpg --dearmor | tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null \ -&& sudo apt update && sudo apt install -y --no-install-recommends zeek +&& sudo apt update && sudo apt install -y --no-install-recommends --fix-missing zeek-8.0 # create a symlink to zeek so that slips can find it ln -s /opt/zeek/bin/zeek /usr/local/bin/bro diff --git a/install/requirements.txt b/install/requirements.txt index 58f2771fc..95bd849e4 100644 --- a/install/requirements.txt +++ b/install/requirements.txt @@ -8,6 +8,7 @@ pandas==2.3.3 tzlocal==5.3.1 cabby==0.1.23 stix2==3.0.1 +taxii2-client==2.3.0 certifi==2025.10.5 tensorflow==2.16.1 Keras diff --git a/modules/exporting_alerts/exporting_alerts.py b/modules/exporting_alerts/exporting_alerts.py index 9a527ad21..d341d2bbf 100644 --- a/modules/exporting_alerts/exporting_alerts.py +++ b/modules/exporting_alerts/exporting_alerts.py @@ -24,6 +24,7 @@ def init(self): self.stix = StixExporter(self.logger, self.db) self.c1 = self.db.subscribe("export_evidence") self.channels = {"export_evidence": self.c1} + self.print("Subscribed to export_evidence channel.", 1, 0) def shutdown_gracefully(self): self.slack.shutdown_gracefully() @@ -35,18 +36,23 @@ def pre_main(self): export_to_slack = self.slack.should_export() export_to_stix = self.stix.should_export() + if not export_to_slack and not export_to_stix: + self.print( + "Exporting Alerts module disabled (no export targets configured).", + 0, + 2, + ) + return 1 + if export_to_slack: self.slack.send_init_msg() - if export_to_stix: + if export_to_stix and self.stix.is_running_non_stop: # This thread is responsible for waiting n seconds before # each push to the stix server # it starts the timer when the first alert happens self.stix.start_exporting_thread() - if not export_to_slack or export_to_stix: - return 1 - def remove_sensitive_info(self, evidence: dict) -> str: """ removes the leaked location co-ords from the evidence @@ -63,6 +69,12 @@ def main(self): # a msg is sent here for each evidence that was part of an alert if msg := self.get_msg("export_evidence"): evidence = json.loads(msg["data"]) + self.print( + f"[ExportingAlerts] Evidence {evidence.get('id')} " + f"type={evidence.get('evidence_type')} received.", + 2, + 0, + ) description = self.remove_sensitive_info(evidence) if self.slack.should_export(): srcip = evidence["profile"]["ip"] @@ -70,11 +82,7 @@ def main(self): self.slack.export(msg_to_send) if self.stix.should_export(): - msg_to_send = ( - evidence["evidence_type"], - evidence["attacker"]["value"], - ) - added_to_stix: bool = self.stix.add_to_stix_file(msg_to_send) + added_to_stix: bool = self.stix.add_to_stix_file(evidence) if added_to_stix: # now export to taxii self.stix.export() diff --git a/modules/exporting_alerts/stix_exporter.py b/modules/exporting_alerts/stix_exporter.py index 183325898..3957053f7 100644 --- a/modules/exporting_alerts/stix_exporter.py +++ b/modules/exporting_alerts/stix_exporter.py @@ -1,10 +1,16 @@ # SPDX-FileCopyrightText: 2021 Sebastian Garcia # SPDX-License-Identifier: GPL-2.0-only -from stix2 import Indicator, Bundle -from cabby import create_client -import time -import threading +import json import os +import threading +import time +from datetime import datetime, timezone +from uuid import uuid4 +from typing import Dict, List, Optional +from urllib.parse import urljoin + +from stix2 import Bundle, Indicator, parse +from taxii2client.v21 import Server from slips_files.common.abstracts.iexporter import IExporter from slips_files.common.parsers.config_parser import ConfigParser @@ -15,113 +21,244 @@ class StixExporter(IExporter): def init(self): self.port = None self.is_running_non_stop: bool = self.db.is_running_non_stop() - self.stix_filename = "STIX_data.json" + self.output_dir = self._resolve_output_dir() + self.stix_filename = os.path.join(self.output_dir, "STIX_data.json") self.configs_read: bool = self.read_configuration() + self.export_to_taxii_thread = None + self.last_exported_count = 0 if self.should_export(): self.print( - f"Exporting to Stix & TAXII very " + f"Exporting alerts to STIX 2 / TAXII every " f"{self.push_delay} seconds." ) - # This bundle should be created once and we should - # append all indicators to it - self.is_bundle_created = False - # To avoid duplicates in STIX_data.json - self.added_ips = set() - self.export_to_taxii_thread = threading.Thread( - target=self.schedule_sending_to_taxii_server, - daemon=True, - name="stix_exporter_to_taxii_thread", - ) + self.exported_evidence_ids = set() + self.bundle_objects: List[Indicator] = [] + self.last_exported_count = 0 + self._load_existing_bundle() + self._ensure_bundle_file() + if self.is_running_non_stop: + self.export_to_taxii_thread = threading.Thread( + target=self.schedule_sending_to_taxii_server, + daemon=True, + name="stix_exporter_to_taxii_thread", + ) def start_exporting_thread(self): # This thread is responsible for waiting n seconds before # each push to the stix server # it starts the timer when the first alert happens - utils.start_thread(self.export_to_taxii_thread, self.db) + if self.export_to_taxii_thread: + utils.start_thread(self.export_to_taxii_thread, self.db) @property def name(self): return "StixExporter" - def create_client(self): - client = create_client( - self.TAXII_server, - use_https=self.use_https, - port=self.port, - discovery_path=self.discovery_path, - ) + def _base_url(self) -> str: + scheme = "https" if self.use_https else "http" + default_port = 443 if self.use_https else 80 + if self.port: + try: + port = int(self.port) + except (TypeError, ValueError): + port = None + if port and port != default_port: + return f"{scheme}://{self.TAXII_server}:{port}" + return f"{scheme}://{self.TAXII_server}" - if self.jwt_auth_path != "": - client.set_auth( - username=self.taxii_username, - password=self.taxii_password, - # URL used to obtain JWT token - jwt_auth_url=self.jwt_auth_path, - ) - else: - # User didn't provide jwt_auth_path in slips.yaml - client.set_auth( - username=self.taxii_username, - password=self.taxii_password, - ) - return client + def _build_url(self, path: str) -> str: + if not path: + return self._base_url() + if path.startswith("http://") or path.startswith("https://"): + return path + # urljoin discards url path if relative path does not start with / + adjusted = path if path.startswith("/") else f"/{path}" + return urljoin(self._base_url(), adjusted) - def inbox_service_exists_in_taxii_server(self, services): + def _resolve_output_dir(self) -> str: """ - Checks if inbox service is available in the taxii server + Determines the directory where STIX_data.json should be stored. + Falls back to the current working directory if the DB does not + have an output directory set yet. """ - for service in services: - if "inbox" in service.type.lower(): - return True + output_dir = getattr(self.db, "output_dir", None) + if not output_dir: + output_dir = self.db.get_output_dir() + if isinstance(output_dir, bytes): + output_dir = output_dir.decode("utf-8") + if not output_dir: + output_dir = os.getcwd() + output_dir = os.path.abspath(output_dir) + os.makedirs(output_dir, exist_ok=True) + return output_dir + + def _load_existing_bundle(self) -> None: + """ + Loads indicators from an existing STIX_data.json file so we can resume + without creating duplicates if Slips was restarted. + """ + if not os.path.exists(self.stix_filename): + return + try: + with open(self.stix_filename, "r") as stix_file: + data = stix_file.read().strip() + except OSError as err: + self.print(f"Unable to read {self.stix_filename}: {err}", 0, 3) + return + + if not data: + return + + try: + bundle = parse(data, allow_custom=True) + except Exception as err: # stix2 raises generic Exception + self.print(f"Invalid STIX bundle, starting fresh: {err}", 0, 3) + return + + if not isinstance(bundle, Bundle): + self.print("STIX_data.json does not contain a bundle.", 0, 3) + return + + self.bundle_objects = list(bundle.objects) + self.last_exported_count = len(self.bundle_objects) + for indicator in self.bundle_objects: + evidence_id = self._extract_evidence_id(indicator) + if evidence_id: + self.exported_evidence_ids.add(evidence_id) + + def _ensure_bundle_file(self) -> None: + """ + Guarantee that STIX_data.json exists even before the first indicator + arrives so the user can inspect the file immediately. + """ + if os.path.exists(self.stix_filename): + return + bundle_stub = { + "type": "bundle", + "id": f"bundle--{uuid4()}", + "objects": [], + } + with open(self.stix_filename, "w") as stix_file: + json.dump(bundle_stub, stix_file, indent=2) + + def _extract_evidence_id(self, indicator: Indicator) -> Optional[str]: + try: + return indicator.get("x_slips_evidence_id") # type: ignore[index] + except AttributeError: + return None + + def _serialize_bundle(self) -> str: + bundle = Bundle(*self.bundle_objects, allow_custom=True) + return bundle.serialize(pretty=True) + + def _write_bundle(self) -> None: + if not self.bundle_objects: + self._ensure_bundle_file() + return + + with open(self.stix_filename, "w") as stix_file: + stix_file.write(self._serialize_bundle()) + + def create_collection(self): + if not self.collection_name: + self.print( + "collection_name is missing in slips.yaml; cannot export STIX.", + 0, + 3, + ) + return None + + discovery_url = self._build_url(self.discovery_path) + try: + server = Server( + discovery_url, + user=self.taxii_username or None, + password=self.taxii_password or None, + ) + except Exception as err: + self.print(f"Failed to connect to TAXII discovery: {err}", 0, 3) + return None + + if not server.api_roots: + self.print("TAXII server returned no API roots.", 0, 3) + return None + + for api_root in server.api_roots: + try: + for collection in api_root.collections: + if collection.id == self.collection_name: + return collection + if ( + hasattr(collection, "title") + and collection.title == self.collection_name + ): + return collection + except Exception as err: + self.print( + f"Could not list collections for API root {api_root.url}: {err}", + 0, + 3, + ) self.print( - "Server doesn't have inbox available. " - "Exporting STIX_data.json is cancelled.", + f"Collection '{self.collection_name}' was not found on the TAXII " + f"server.", 0, - 2, + 3, ) - return False + return None def read_stix_file(self) -> str: - with open(self.stix_filename) as stix_file: - stix_data = stix_file.read() - return stix_data + if not os.path.exists(self.stix_filename): + return "" + + with open(self.stix_filename, "r") as stix_file: + return stix_file.read() def export(self) -> bool: """ - Exports evidence/alerts to the TAXII server - Uses Inbox Service (TAXII Service to Support Producer-initiated - pushes of cyber threat information) to publish - our STIX_data.json file + Exports evidence/alerts to a TAXII 2.x collection by pushing the + STIX_data.json bundle as a TAXII envelope. """ - if not self.should_export: + if not self.should_export(): return False - client = self.create_client() + stix_data: str = self.read_stix_file() + if len(stix_data.strip()) == 0: + return False - # Check the available services to make sure inbox service is there - services = client.discover_services() - if not self.inbox_service_exists_in_taxii_server(services): + try: + bundle_dict = json.loads(stix_data) + except json.JSONDecodeError as err: + self.print(f"STIX_data.json is not valid JSON: {err}", 0, 3) return False - stix_data: str = self.read_stix_file() + objects = bundle_dict.get("objects") or [] + if not objects: + return False - # Make sure we don't push empty files - if len(stix_data) == 0: + new_objects = objects[self.last_exported_count :] + if not new_objects: return False - binding = "urn:stix.mitre.org:json:2.1" - # URI is the path to the inbox service we want to - # use in the taxii server - client.push( - stix_data, - binding, - collection_names=[self.collection_name], - uri=self.inbox_path, - ) + collection = self.create_collection() + if not collection: + return False + + envelope = {"objects": new_objects} + + try: + collection.add_objects(envelope) + except Exception as err: + self.print(f"Failed to push bundle to TAXII collection: {err}", 0, 3) + return False + + self.last_exported_count = len(objects) + self.print( - f"Successfully exported to TAXII server: " f"{self.TAXII_server}.", - 1, + f"Successfully exported {len(new_objects)} indicators to TAXII " + f"collection '{self.collection_name}'.", + 2, 0, ) return True @@ -134,7 +271,7 @@ def shutdown_gracefully(self): def should_export(self) -> bool: """Determines whether to export or not""" - return self.is_running_non_stop and "stix" in self.export_to + return "stix" in self.export_to def read_configuration(self) -> bool: """Reads configuration""" @@ -149,91 +286,161 @@ def read_configuration(self) -> bool: self.port = conf.taxii_port() self.use_https = conf.use_https() self.discovery_path = conf.discovery_path() - self.inbox_path = conf.inbox_path() # push_delay is only used when slips is running using -i self.push_delay = conf.push_delay() self.collection_name = conf.collection_name() self.taxii_username = conf.taxii_username() self.taxii_password = conf.taxii_password() - self.jwt_auth_path = conf.jwt_auth_path() # push delay exists -> create a thread that waits # push delay doesn't exist -> running using file not interface # -> only push to taxii server once before # stopping return True - def ip_exists_in_stix_file(self, ip): - """Searches for ip in STIX_data.json to avoid exporting duplicates""" - return ip in self.added_ips - def get_ioc_pattern(self, ioc_type: str, attacker) -> str: patterns_map = { "ip": f"[ip-addr:value = '{attacker}']", "domain": f"[domain-name:value = '{attacker}']", "url": f"[url:value = '{attacker}']", } - if ioc_type not in ioc_type: + pattern = patterns_map.get(ioc_type) + if not pattern: self.print(f"Can't set pattern for STIX. {attacker}", 0, 3) - return False - return patterns_map[ioc_type] + return "" + return pattern + + def _build_indicator_labels(self, evidence: dict) -> List[str]: + labels = [] + evidence_type = evidence.get("evidence_type") + if evidence_type: + labels.append(str(evidence_type).lower()) + threat_level = evidence.get("threat_level") + if threat_level: + labels.append(str(threat_level).lower()) + return labels + + def _build_valid_from(self, evidence: dict) -> Optional[datetime]: + timestamp = evidence.get("timestamp") + if not timestamp: + return None + try: + dt_obj = utils.convert_to_datetime(timestamp) + except Exception: + return None + if not utils.is_aware(dt_obj): + dt_obj = utils.convert_ts_to_tz_aware(dt_obj) + return dt_obj.astimezone(timezone.utc) - def add_to_stix_file(self, to_add: tuple) -> bool: + def _build_custom_properties( + self, evidence: dict, date_added: Optional[str] + ) -> Dict[str, object]: + victim = evidence.get("victim") or {} + attacker = evidence.get("attacker") or {} + timewindow = evidence.get("timewindow") or {} + profile = evidence.get("profile") or {} + + custom_properties: Dict[str, object] = { + "x_slips_evidence_id": evidence.get("id"), + "x_slips_threat_level": evidence.get("threat_level"), + "x_slips_profile_ip": profile.get("ip"), + "x_slips_timewindow": timewindow.get("number"), + "x_slips_attacker_direction": attacker.get("direction"), + "x_slips_attacker_ti": attacker.get("TI"), + "date_added": date_added, + } + + victim_value = victim.get("value") + if victim_value: + custom_properties["x_slips_victim"] = victim_value + + uids = evidence.get("uid") + if uids: + custom_properties["x_slips_flow_uids"] = uids + + dst_port = evidence.get("dst_port") + if dst_port: + custom_properties["x_slips_dst_port"] = dst_port + + src_port = evidence.get("src_port") + if src_port: + custom_properties["x_slips_src_port"] = src_port + + return { + key: value + for key, value in custom_properties.items() + if value not in (None, "", [], {}) + } + + def add_to_stix_file(self, evidence: dict) -> bool: """ Function to export evidence to a STIX_data.json file in the cwd. It keeps appending the given indicator to STIX_data.json until they're sent to the taxii server - msg_to_send is a tuple: (evidence_type,attacker) - evidence_type: e.g PortScan, ThreatIntelligence etc - attacker: ip of the attcker + evidence is a dictionary that contains the alert data """ - evidence_type, attacker = ( - to_add[0], - to_add[1], + attacker = (evidence.get("attacker") or {}).get("value") + if not attacker: + attacker = (evidence.get("profile") or {}).get("ip") + if not attacker: + attacker = (evidence.get("victim") or {}).get("value") + if not attacker: + self.print("Evidence missing attacker value; skipping.", 0, 3) + return False + + evidence_id = evidence.get("id") + if evidence_id and evidence_id in self.exported_evidence_ids: + self.print( + f"Evidence {evidence_id} already exported; skipping.", + 3, + 0, + ) + return False + + self.print( + f"Processing evidence {evidence_id or attacker} " + f"(profile={evidence.get('profile')}, attacker={evidence.get('attacker')})", + 2, + 0, ) - # Get the right description to use in stix - name = evidence_type + ioc_type = utils.detect_ioc_type(attacker) pattern: str = self.get_ioc_pattern(ioc_type, attacker) - # Required Indicator Properties: type, spec_version, id, created, - # modified , all are set automatically - # Valid_from, created and modified attribute will - # be set to the current time - # ID will be generated randomly - # ref https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_6khi84u7y58g + if not pattern: + self.print( + f"Unable to build STIX pattern for attacker {attacker}.", 0, 3 + ) + return False + + indicator_labels = self._build_indicator_labels(evidence) + valid_from = self._build_valid_from(evidence) + date_added = ( + valid_from.isoformat() + if isinstance(valid_from, datetime) + else datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() + ) + custom_properties = self._build_custom_properties(evidence, date_added) + indicator = Indicator( - name=name, pattern=pattern, pattern_type="stix" + name=evidence.get("evidence_type", "Slips Alert"), + description=evidence.get("description"), + pattern=pattern, + pattern_type="stix", + valid_from=valid_from, + labels=indicator_labels or None, + allow_custom=True, + custom_properties=custom_properties or None, ) # the pattern language that the indicator pattern is expressed in. - # Create and Populate Bundle. - # All our indicators will be inside bundle['objects']. - bundle = Bundle() - if not self.is_bundle_created: - bundle = Bundle(indicator) - # Clear everything in the existing STIX_data.json - # if it's not empty - open(self.stix_filename, "w").close() - # Write the bundle. - with open(self.stix_filename, "w") as stix_file: - stix_file.write(str(bundle)) - self.is_bundle_created = True - elif not self.ip_exists_in_stix_file(attacker): - # Bundle is already created just append to it - # r+ to delete last 4 chars - with open(self.stix_filename, "r+") as stix_file: - # delete the last 4 characters in the file ']\n}\n' so we - # can append to the objects array and add them back later - stix_file.seek(0, os.SEEK_END) - stix_file.seek(stix_file.tell() - 4, 0) - stix_file.truncate() - - # Append mode to add the new indicator to the objects array - with open(self.stix_filename, "a") as stix_file: - # Append the indicator in the objects array - stix_file.write(f",{str(indicator)}" + "]\n}\n") - - # Set of unique ips added to stix_data.json to avoid duplicates - self.added_ips.add(attacker) - self.print("Indicator added to STIX_data.json", 2, 0) + + self.bundle_objects.append(indicator) + self._write_bundle() + + if evidence_id: + self.exported_evidence_ids.add(evidence_id) + + self.print( + f"Indicator added to STIX bundle at {self.stix_filename}", 2, 0 + ) return True def schedule_sending_to_taxii_server(self): @@ -250,9 +457,6 @@ def schedule_sending_to_taxii_server(self): # new alerts in stix_data.json yet if os.path.exists(self.stix_filename): self.export() - # Delete stix_data.json file so we don't send duplicates - os.remove(self.stix_filename) - self.is_bundle_created = False else: self.print( f"{self.push_delay} seconds passed, " diff --git a/slips_files/common/parsers/config_parser.py b/slips_files/common/parsers/config_parser.py index c1cb91a10..c44e62547 100644 --- a/slips_files/common/parsers/config_parser.py +++ b/slips_files/common/parsers/config_parser.py @@ -341,9 +341,6 @@ def discovery_path(self): "exporting_alerts", "discovery_path", False ) - def inbox_path(self): - return self.read_configuration("exporting_alerts", "inbox_path", False) - def push_delay(self): # 3600 = 1h delay = self.read_configuration("exporting_alerts", "push_delay", 3600) @@ -368,11 +365,6 @@ def taxii_password(self): "exporting_alerts", "taxii_password", False ) - def jwt_auth_path(self): - return self.read_configuration( - "exporting_alerts", "jwt_auth_path", False - ) - def long_connection_threshold(self): """ returns threshold in seconds diff --git a/slips_files/core/evidence_handler.py b/slips_files/core/evidence_handler.py index cea250214..085f6a264 100644 --- a/slips_files/core/evidence_handler.py +++ b/slips_files/core/evidence_handler.py @@ -380,8 +380,24 @@ def send_to_exporting_module(self, tw_evidence: Dict[str, Evidence]): """ for evidence in tw_evidence.values(): evidence: Evidence - evidence: dict = utils.to_dict(evidence) - self.db.publish("export_evidence", json.dumps(evidence)) + evidence_dict: dict = utils.to_dict(evidence) + self.print( + f"[EvidenceHandler] Exporting evidence {evidence_dict.get('id')} " + f"type={evidence_dict.get('evidence_type')} via export_evidence.", + 2, + 0, + ) + self.db.publish("export_evidence", json.dumps(evidence_dict)) + + def publish_single_evidence(self, evidence: Evidence): + evidence_dict: dict = utils.to_dict(evidence) + self.print( + f"[EvidenceHandler] Export streaming {evidence_dict.get('id')} " + f"type={evidence_dict.get('evidence_type')} via export_evidence.", + 2, + 0, + ) + self.db.publish("export_evidence", json.dumps(evidence_dict)) def is_blocking_modules_supported(self) -> bool: """ @@ -616,6 +632,9 @@ def main(self): accumulated_threat_level, ) + # stream every evidence toward exporting modules immediately + self.publish_single_evidence(evidence) + evidence_dict: dict = utils.to_dict(evidence) self.db.publish("report_to_peers", json.dumps(evidence_dict)) diff --git a/tests/integration_tests/fides_config.yaml b/tests/integration_tests/fides_config.yaml index 55b7a9c06..52d748df2 100644 --- a/tests/integration_tests/fides_config.yaml +++ b/tests/integration_tests/fides_config.yaml @@ -305,11 +305,11 @@ exporting_alerts: # you can set the port to 443 or 80. port : 1234 use_https : False - discovery_path : /services/discovery-a - inbox_path : /services/inbox-a + # TAXII 2 discovery endpoint (relative path or full URL) + discovery_path : /taxii2/ - # Collection on the server you want to push stix data to - collection_name : collection-a + # Collection (ID or title) on the server you want to push STIX data to + collection_name : Alerts # This value is only used when slips is running non-stop (e.g with -i ) # push_delay is the time to wait before pushing STIX data to server (in seconds) @@ -322,11 +322,6 @@ exporting_alerts: taxii_username : admin taxii_password : admin - # URL used to obtain JWT token. set this to '' if you don't want to use it - # is required for JWT based authentication. (JWT based authentication is Optional) - # It's usually /management/auth - jwt_auth_path : /management/auth - ############################# CESNET: @@ -429,4 +424,3 @@ local_p2p: # create p2p.log with additional info about peer communications? create_p2p_logfile : False use_p2p : False -