diff --git a/analyzers/urlDNA.io/UrlDNA_New_Scan.json b/analyzers/urlDNA.io/UrlDNA_New_Scan.json new file mode 100644 index 000000000..29987db33 --- /dev/null +++ b/analyzers/urlDNA.io/UrlDNA_New_Scan.json @@ -0,0 +1,77 @@ +{ + "name": "UrlDNA_New_Scan", + "author": "urlDNA.io (@urldna); Fabien Bloume, StrangeBee", + "license": "MIT", + "url": "https://github.com/TheHive-Project/Cortex-Analyzers", + "version": "0.1.0", + "description": "Perform a new scan on urlDNA.io", + "dataTypeList": ["url"], + "command": "urlDNA.io/urldna_analyzer.py", + "baseConfig": "UrlDNA", + "config": { + "service":"new_scan" + }, + "configurationItems": [ + { + "name": "key", + "description": "UrlDNA API Key", + "type": "string", + "multi": false, + "required": true + }, + { + "name": "private_scan", + "description": "The visibility setting for the new scan: False will make it publicly visible and searchable", + "type": "boolean", + "multi": false, + "required": true, + "defaultValue": true + }, + { + "name": "device", + "description": "The device type used for scraping, either DESKTOP or MOBILE. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "string", + "multi": false, + "required": false + }, + { + "name": "user_agent", + "description": "The browser User Agent used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "string", + "multi": false, + "required": false + }, + { + "name": "viewport_width", + "description": "The screen width of the viewport used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "number", + "multi": false, + "required": false + }, + { + "name": "viewport_height", + "description": "The screen height of the viewport used for the scan. Ensure that the selected Device, User Agent, and Viewport settings are consistent to maintain coherence", + "type": "number", + "multi": false, + "required": false + }, + { + "name": "scanned_from", + "description": "Specifies the country from which the new scan is performed. This feature is available exclusively with the API Premium plan.", + "type": "string", + "multi": false, + "required": false + }, + { + "name": "waiting_time", + "description": "The waiting time for the page to load during the scan. It can be a number between 5 and 15 seconds", + "type": "number", + "multi": false, + "required": false + } + ], + "registration_required": true, + "subscription_required": false, + "free_subscription": true, + "service_homepage": "https://urldna.io" +} diff --git a/analyzers/urlDNA.io/UrlDNA_Search.json b/analyzers/urlDNA.io/UrlDNA_Search.json new file mode 100644 index 000000000..2e6d10b60 --- /dev/null +++ b/analyzers/urlDNA.io/UrlDNA_Search.json @@ -0,0 +1,27 @@ +{ + "name": "UrlDNA_Search", + "author": "urlDNA.io (@urldna); Fabien Bloume, StrangeBee", + "license": "MIT", + "url": "https://github.com/TheHive-Project/Cortex-Analyzers", + "version": "0.1.0", + "description": "Perform a search on urlDNA.io for IPs, domains or URLs", + "dataTypeList": ["ip", "domain", "url"], + "command": "urlDNA.io/urldna_analyzer.py", + "baseConfig": "UrlDNA", + "config": { + "service":"search" + }, + "configurationItems": [ + { + "name": "key", + "description": "UrlDNA API Key", + "type": "string", + "multi": false, + "required": true + } + ], + "registration_required": true, + "subscription_required": false, + "free_subscription": true, + "service_homepage": "https://urldna.io" +} \ No newline at end of file diff --git a/analyzers/urlDNA.io/requirements.txt b/analyzers/urlDNA.io/requirements.txt new file mode 100644 index 000000000..6aabc3cfa --- /dev/null +++ b/analyzers/urlDNA.io/requirements.txt @@ -0,0 +1,2 @@ +cortexutils +requests diff --git a/analyzers/urlDNA.io/urldna.py b/analyzers/urlDNA.io/urldna.py new file mode 100644 index 000000000..426cb27df --- /dev/null +++ b/analyzers/urlDNA.io/urldna.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +import requests +import time +import base64 + + +class UrlDNAException(Exception): + """Custom exception for errors related to UrlDNA operations.""" + pass + + +class UrlDNA: + """A client for interacting with the UrlDNA API.""" + + BASE_URL = "https://api.urldna.io" + + def __init__(self, query, data_type="url"): + """ + Initializes the UrlDNA instance with a query and data type. + + :param query: The query to be processed (e.g., URL, domain, or IP). + :param data_type: Type of the query ('url', 'domain', or 'ip'). + :raises ValueError: If the query is empty or the data type is unsupported. + """ + if not query: + raise ValueError("Query must be defined.") + if data_type not in ["url", "domain", "ip"]: + raise ValueError(f"Unsupported data type: {data_type}") + + self.query = query + self.data_type = data_type + self.session = None + + def search(self, api_key): + """ + Performs a search query on the UrlDNA API. + + :param api_key: API key for authentication. + :return: A dictionary containing the search results. + """ + self._init_session(api_key) + uri = "/search" + data = {"query": self._build_query()} + response = self.session.post(f"{self.BASE_URL}{uri}", json=data) + response.raise_for_status() + return response.json() + + def new_scan(self, api_key, device=None, user_agent=None, viewport_width=None, viewport_height=None, + waiting_time=None, private_scan=False, scanned_from="DEFAULT"): + """ + Initiates a new scan and polls for results until completion. + + :param api_key: API key for authentication. + :param device: The device type ('MOBILE' or 'DESKTOP'). Defaults to 'DESKTOP'. + :param user_agent: The user agent string for the scan. Defaults to a common desktop user agent. + :param viewport_width: Width of the viewport. Defaults to 1920. + :param viewport_height: Height of the viewport. Defaults to 1080. + :param waiting_time: Time to wait before starting the scan. Defaults to 5 seconds. + :param private_scan: Whether the scan is private. Defaults to False. + :param scanned_from: The origin of the scan. Defaults to 'DEFAULT'. + :return: A dictionary containing the scan results. + :raises UrlDNAException: If the scan fails or polling times out. + """ + self._init_session(api_key) + try: + scan_id = self._initiate_scan(device, user_agent, viewport_width, viewport_height, + waiting_time, private_scan, scanned_from) + return self._poll_for_result(scan_id) + except requests.RequestException as exc: + raise UrlDNAException(f"HTTP error during scan: {exc}") + except Exception as exc: + raise UrlDNAException(f"Error during scan: {exc}") + + def _build_query(self): + """ + Builds the query string based on the data type. + + :return: A formatted query string. + """ + if self.data_type == "url": + return f"submitted_url = {self.query}" + if self.data_type == "domain": + return f"domain = {self.query}" + if self.data_type == "ip": + return f"ip = {self.query}" + return self.query + + def _initiate_scan(self, device, user_agent, viewport_width, viewport_height, waiting_time, + private_scan, scanned_from): + """ + Sends a request to initiate a new scan. + + :param device: The device type for the scan. + :param user_agent: The user agent string for the scan. + :param viewport_width: The viewport width for the scan. + :param viewport_height: The viewport height for the scan. + :param waiting_time: Time to wait before starting the scan. + :param private_scan: Whether the scan is private. + :param scanned_from: The origin of the scan. + :return: The scan ID for the initiated scan. + :raises UrlDNAException: If the scan ID is not returned. + """ + data = { + "submitted_url": self.query, + "device": device or "DESKTOP", + "user_agent": user_agent or ( + "Mozilla/5.0 (Windows NT 10.0;Win64;x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"), + "width": viewport_width or 1920, + "height": viewport_height or 1080, + "waiting_time": waiting_time or 5, + "private_scan": private_scan if private_scan is not None else False, + } + + # Only include scanned_from if explicitly set (requires Premium API plan) + if scanned_from: + data["scanned_from"] = scanned_from + + response = self.session.post(f"{self.BASE_URL}/scan", json=data) + response.raise_for_status() + scan_id = response.json().get("id") + if not scan_id: + raise UrlDNAException("Scan ID not returned.") + return scan_id + + def _poll_for_result(self, scan_id): + """ + Polls the API for the scan results until they are available. + + :param scan_id: The scan ID to poll. + :return: A dictionary containing the scan results. + :raises UrlDNAException: If the polling times out. + """ + uri = f"/scan/{scan_id}" + max_attempts = 10 + poll_interval = 10 + + for attempt in range(max_attempts): + if attempt > 0: + time.sleep(poll_interval) + response = self.session.get(f"{self.BASE_URL}{uri}") + response.raise_for_status() + result = response.json() + + status = result.get("scan", {}).get("status") + if status not in ["RUNNING", "PENDING"]: + # Convert screenshot and favicon to base64 + self._convert_images_to_base64(result) + return result + + raise UrlDNAException("Polling timed out before the scan completed.") + + def _convert_images_to_base64(self, result): + """ + Downloads images from blob_uri and converts them to base64. + + :param result: The scan result dictionary to modify in-place. + """ + # Convert screenshot to base64 + screenshot = result.get("screenshot") + if screenshot and screenshot.get("blob_uri"): + try: + img_data = self._download_image(screenshot["blob_uri"]) + screenshot["base64_data"] = img_data + except Exception as e: + # If download fails, just log and continue without base64 data + print(f"Failed to download screenshot: {e}") + + # Convert favicon to base64 + favicon = result.get("favicon") + if favicon and favicon.get("blob_uri"): + try: + img_data = self._download_image(favicon["blob_uri"]) + favicon["base64_data"] = img_data + except Exception as e: + # If download fails, just log and continue without base64 data + print(f"Failed to download favicon: {e}") + + def _download_image(self, url): + """ + Downloads an image from a URL and returns it as base64. + + :param url: The URL of the image to download. + :return: Base64-encoded image data. + """ + response = requests.get(url, timeout=30) + response.raise_for_status() + return base64.b64encode(response.content).decode('utf-8') + + def _init_session(self, api_key): + """ + Initializes an HTTP session with the API key for authentication. + + :param api_key: The API key for authentication. + """ + if not self.session: + self.session = requests.Session() + self.session.headers.update({ + "Content-Type": "application/json", + "User-Agent": "strangebee-thehive", + "Authorization": api_key + }) diff --git a/analyzers/urlDNA.io/urldna_analyzer.py b/analyzers/urlDNA.io/urldna_analyzer.py new file mode 100644 index 000000000..a064bb941 --- /dev/null +++ b/analyzers/urlDNA.io/urldna_analyzer.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +from cortexutils.analyzer import Analyzer +from urldna import UrlDNA, UrlDNAException + + +class UrlDNAAnalyzer(Analyzer): + """Analyzer for performing UrlDNA operations.""" + + def __init__(self): + """ + Initializes the analyzer with configuration parameters. + """ + Analyzer.__init__(self) + self.service = self.get_param('config.service', None, 'Service parameter is missing') + self.api_key = self.get_param('config.key', None, 'Missing UrlDNA API key') + self.device = self.get_param('config.device') + self.user_agent = self.get_param('config.user_agent') + self.viewport_width = self.get_param('config.viewport_width') + self.viewport_height = self.get_param('config.viewport_height') + self.waiting_time = self.get_param('config.waiting_time') + self.private_scan = self.get_param('config.private_scan') + self.scanned_from = self.get_param('config.scanned_from') + + def new_scan(self, indicator): + """ + Scans a website or resource for indicators. + + :param indicator: The URL to scan. + :return: A dictionary containing the scan results. + """ + try: + urldna = UrlDNA(indicator) + return urldna.new_scan(self.api_key, self.device, self.user_agent, self.viewport_width, + self.viewport_height, self.waiting_time, self.private_scan, self.scanned_from) + except UrlDNAException as exc: + self.error(f"Error during urlDNA scan: {exc}") + except Exception as exc: + self.error(f"Unexpected error: {exc}") + + def search(self, query): + """ + Performs a search query on the UrlDNA API. + + :param query: The query string. + :return: A dictionary containing the search results. + """ + try: + urldna = UrlDNA(query, self.data_type) + return urldna.search(self.api_key) + except UrlDNAException as exc: + self.error(f"Error during search: {exc}") + except Exception as exc: + self.error(f"Unexpected error: {exc}") + + def run(self): + """ + Executes the analyzer logic based on the configured service and data type. + """ + if not self.service or not self.data_type: + self.error('Service or data_type is missing.') + raise ValueError('Invalid configuration.') + + if self.service == 'new_scan' and self.data_type == 'url': + indicator = self.get_data() + try: + result = self.new_scan(indicator) + self.report({ + 'type': self.data_type, + 'query': indicator, + 'service': self.service, + 'indicator': result + }) + except Exception as exc: + self.error(f"Run failed: {exc}") + elif self.service == 'search': + query = self.get_data() + try: + result = self.search(query) + self.report({ + 'type': self.data_type, + 'query': query, + 'service': self.service, + 'indicator': result + }) + except Exception as exc: + self.error(f"Run failed: {exc}") + else: + self.error('Invalid service or unsupported data type.') + raise ValueError('Unsupported service or data type.') + + def summary(self, raw): + """ + Generates a summary based on the scan results. + + :param raw: The raw scan data. + :return: A dictionary containing summary taxonomies. + """ + taxonomies = [] + level = "info" + namespace = "urlDNA.io" + predicate = "Scan" if raw["service"] == 'new_scan' else "Search" + + indicator = raw.get("indicator", {}) + if predicate == "Search": + total = len(indicator) + value = f"{total} result{'s' if total != 1 else ''}" if total > 0 else "No results found" + else: + malicious = indicator.get("malicious", {}) + is_malicious = malicious.get("malicious", False) + threat_type = malicious.get("threat", "Unknown") + level = 'malicious' if is_malicious else 'info' + value = f"Malicious: {is_malicious}, Threat Type: {threat_type}" + + taxonomies.append(self.build_taxonomy(level, namespace, predicate, value)) + return {"taxonomies": taxonomies} + + +if __name__ == '__main__': + UrlDNAAnalyzer().run() diff --git a/thehive-templates/UrlDNA_New_Scan_0_1_0/long.html b/thehive-templates/UrlDNA_New_Scan_0_1_0/long.html new file mode 100644 index 000000000..ad19ddd0c --- /dev/null +++ b/thehive-templates/UrlDNA_New_Scan_0_1_0/long.html @@ -0,0 +1,515 @@ +
No scan results were found for the queried URL.
++ This may indicate that the URL has not been previously scanned or is not in the urlDNA.io database. +
+| Method | +URL | +Status | +Content Type | +
|---|---|---|---|
| {{tx.method}} | + ++ + {{tx.status_code}} + + | +{{tx.content_type}} | +
| Name | +Domain | +Path | +Secure | +HTTP Only | +Expiry | +
|---|---|---|---|---|---|
| {{cookie.name}} | +{{cookie.domain}} | +{{cookie.path}} | ++ + {{cookie.secure ? 'Yes' : 'No'}} + + | ++ + {{cookie.http_only ? 'Yes' : 'No'}} + + | +{{cookie.expiry}} | +
{{msg.text}}
+ No search results were found for the queried URL.
++ This URL has not been previously scanned in the urlDNA.io database. +
+| Scan Date | +Domain | +Status | +Device | +Protocol | +Origin | +Scan ID | +Actions | +
|---|---|---|---|---|---|---|---|
| {{scan.submitted_date | date:'yyyy-MM-dd HH:mm:ss'}} | +{{scan.domain}} | ++ + {{scan.status}} + + | ++ {{scan.device}} + | +{{scan.protocol}} | +{{scan.origin}} | ++ + {{scan.id}} + + | ++ + View Full Scan + + | +