- These spreadsheets are built from the STIX dataset and provide a more human-accessible view into the
- knowledge base while also supporting rudimentary querying/filtering capabilities.
+ These spreadsheets are built from the STIX dataset and provide a more human-accessible view into the
+ knowledge base while also supporting rudimentary querying/filtering capabilities.
- The Excel representation of the ATT&CK dataset below includes both master spreadsheets, containing all
- object types, and individual spreadsheets for each object type. The individual type spreadsheets break
- out relationships (e.g procedure examples connecting groups to techniques) into separate sheets by
+ The Excel representation of the ATT&CK dataset below includes both master spreadsheets, containing all
+ object types, and individual spreadsheets for each object type. The individual type spreadsheets break
+ out relationships (e.g procedure examples connecting groups to techniques) into separate sheets by
relationship type, while the master spreadsheet includes all relationship types in a single sheet.
- A citations sheet can be used to look up the in-text citations which appear in some fields. For domains
- that include multiple matrices, such as Mobile ATT&CK, each matrix gets its own named sheet. Unlike the
- STIX dataset, objects that have been revoked or deprecated are not included in the spreadsheets.
+ A citations sheet can be used to look up the in-text citations which appear in some fields. For domains
+ that include multiple matrices, such as Mobile ATT&CK, each matrix gets its own named sheet. Unlike the
+ STIX dataset, objects that have been revoked or deprecated are not included in the spreadsheets.
@@ -314,12 +314,96 @@
ATT&CK in Excel
-
- Excel Spreadsheets representing the ATT&CK dataset:
-
-
- {{filetree.filetree(parsed.excel_files)}}
+ {# Tabbed section for ATT&CK Excel files by version #}
+ {% if parsed.excel_files_by_version and parsed.excel_files_by_version|length > 0 %}
+
+
ATT&CK Excel Files by Version
+
+ {% set versions = parsed.excel_files_by_version.keys()|list %}
+ {% set visible_count = 3 %}
+ {% if versions|length <= visible_count %}
+ {% for version in versions %}
+ {% if loop.first %}
+ -
+
+ {{ version }}
+
+
+ {% else %}
+ -
+
+ {{ version }}
+
+
+ {% endif %}
+ {% endfor %}
+ {% else %}
+ {% for version in versions[:visible_count] %}
+ {% if loop.first %}
+ -
+
+ {{ version }}
+
+
+ {% else %}
+ -
+
+ {{ version }}
+
+
+ {% endif %}
+ {% endfor %}
+ -
+
+
+
+
+ Older versions
+
+
+
+ {% endif %}
+
+
+ {% for version in versions %}
+ {% set domains = parsed.excel_files_by_version[version] %}
+
+
Version {{ version[1:] }}
+
+ {{ filetree.filetree(domains) }}
+
+
+ {% endfor %}
+
+ {% endif %}
@@ -329,8 +413,8 @@
ATT&CK in Excel
- This folder within the pip module contains an additional module for converting ATT&CK STIX data
- to Excel spreadsheets. It also provides a means to access ATT&CK data as Pandas DataFrames for data analysis.
+ This folder within the pip module contains an additional module for converting ATT&CK STIX data
+ to Excel spreadsheets. It also provides a means to access ATT&CK data as Pandas DataFrames for data analysis.
@@ -342,5 +426,32 @@
ATT&CK in Excel
{% block scripts %}
{{ super() }}
+
+
+
+
{% endblock %}
\ No newline at end of file
diff --git a/modules/search/search.py b/modules/search/search.py
index 7829b1ea920..777fa0d7dae 100644
--- a/modules/search/search.py
+++ b/modules/search/search.py
@@ -159,8 +159,7 @@ def clean(filepath):
def preserve_current_version():
- """Preserve current version"""
-
+ """Preserve current version."""
# Check for intermodule dependency
if [key["module_name"] for key in modules.run_ptr if key["module_name"] == "versions"]:
versions.versions.deploy_current_version()
diff --git a/modules/site_config.py b/modules/site_config.py
index 69167a5df16..c60f62f01ee 100644
--- a/modules/site_config.py
+++ b/modules/site_config.py
@@ -40,6 +40,10 @@
{"name": "pre-attack", "location": STIX_LOCATION_PRE, "alias": "PRE-ATT&CK", "deprecated": True},
]
+# Directory for attack version archives
+default_archive_dir = "attack-version-archives"
+ATTACK_VERSION_ARCHIVES = os.getenv("ATTACK_VERSION_ARCHIVES", default_archive_dir)
+
# banner for the website
default_banner_message = "This is a custom instance of the MITRE ATT&CK Website. The official website can be found at
attack.mitre.org."
BANNER_ENABLED = os.getenv("BANNER_ENABLED", True)
diff --git a/modules/software/__init__.py b/modules/software/__init__.py
index e65709aeede..eb38ecb3e52 100644
--- a/modules/software/__init__.py
+++ b/modules/software/__init__.py
@@ -1,18 +1,11 @@
-from . import software
-from . import software_config
import json
+from . import software, software_config
+
def get_priority():
return software_config.priority
-# TODO commented out to resolve infinite redirect loop when run locally. Needs further testing before code removal.
-# def get_redirections():
-# with open(software_config.software_redirection_location , "r", encoding="utf8") as json_redirections:
-# return json.load(json_redirections)
-# return []
-
-
def run_module():
return software.generate_software(), software_config.module_name
diff --git a/modules/software/software.py b/modules/software/software.py
index 3d05d55320b..e2e5f5bfd60 100644
--- a/modules/software/software.py
+++ b/modules/software/software.py
@@ -1,13 +1,13 @@
-from collections.abc import Iterable
import json
import os
+from collections.abc import Iterable
from loguru import logger
from modules import util
-from . import software_config
from .. import site_config
+from . import software_config
def generate_software():
@@ -22,12 +22,6 @@ def generate_software():
if not os.path.isdir(software_config.software_markdown_path):
os.mkdir(software_config.software_markdown_path)
- # TODO resolve infinite redirect loop when run locally. Needs further testing before code removal.
- # Generate redirections
- util.buildhelpers.generate_redirections(
- redirections_filename=software_config.software_redirection_location, redirect_md=site_config.redirect_md
- )
-
# Generates the markdown files to be used for page generation and verifies if a software was generated
software_generated = generate_markdown_files()
if not software_generated:
diff --git a/modules/software/software_redirections.json b/modules/software/software_redirections.json
deleted file mode 100644
index f177ca67dcd..00000000000
--- a/modules/software/software_redirections.json
+++ /dev/null
@@ -1,7 +0,0 @@
-[
- {
- "title": "software-redirect",
- "from": "software.html",
- "to": "/software/"
- }
-]
\ No newline at end of file
diff --git a/modules/tactics/__init__.py b/modules/tactics/__init__.py
index 015f14636a8..4e8d7cab9e0 100644
--- a/modules/tactics/__init__.py
+++ b/modules/tactics/__init__.py
@@ -1,7 +1,7 @@
-from . import tactics
-from . import tactics_config
import json
+from . import tactics, tactics_config
+
def get_priority():
return tactics_config.priority
@@ -22,12 +22,5 @@ def get_menu():
}
-# TODO resolve infinite redirect loop when run locally. Needs further testing before code removal.
-def get_redirections():
- with open(tactics_config.tactics_redirection_location, "r", encoding="utf8") as json_redirections:
- return json.load(json_redirections)
- return []
-
-
def run_module():
return tactics.generate_tactics(), tactics_config.module_name
diff --git a/modules/tactics/tactics.py b/modules/tactics/tactics.py
index 2037c9bfc6d..34f4d29cab8 100644
--- a/modules/tactics/tactics.py
+++ b/modules/tactics/tactics.py
@@ -9,9 +9,7 @@
def generate_tactics():
- """Responsible for verifying tactic directory and generating tactic
- index markdown
- """
+ """Responsible for verifying tactic directory and generating tactic index markdown."""
# Create content pages directory if does not already exist
util.buildhelpers.create_content_pages_dir()
@@ -22,12 +20,6 @@ def generate_tactics():
if not os.path.isdir(tactics_config.tactics_markdown_path):
os.mkdir(tactics_config.tactics_markdown_path)
- # TODO resolve infinite redirect loop when run locally. Needs further testing before code removal.
- # Generate redirections
- util.buildhelpers.generate_redirections(
- redirections_filename=tactics_config.tactics_redirection_location, redirect_md=site_config.redirect_md
- )
-
# To verify if a technique was generated
tactic_generated = False
@@ -62,9 +54,7 @@ def generate_tactics():
def generate_domain_markdown(domain, techniques, tactics, side_nav_data, notes, deprecated=None):
- """Generate tactic index markdown for each domain and generates
- shared data for tactics
- """
+ """Generate tactic index markdown for each domain and generates shared data for tactics."""
if tactics[domain]:
# Write out the markdown file for overview of domain
data = {"domain": domain.split("-")[0], "tactics_list_len": str(len(tactics[domain]))}
@@ -97,7 +87,7 @@ def generate_domain_markdown(domain, techniques, tactics, side_nav_data, notes,
def generate_tactic_md(tactic, domain, tactic_list, techniques, side_nav_data, notes):
- """Generate markdown for given tactic"""
+ """Generate markdown for given tactic."""
attack_id = util.buildhelpers.get_attack_id(tactic)
# Add if attack id is found
@@ -152,9 +142,7 @@ def generate_tactic_md(tactic, domain, tactic_list, techniques, side_nav_data, n
def get_domain_table_data(tactic_list):
- """Given a tactic list, returns an array of jsons with tactic name, id
- and their description
- """
+ """Given a tactic list, returns an array of jsons with tactic name, id and their description."""
tactic_table = []
# Set up the tactics table for a domain
@@ -173,9 +161,7 @@ def get_domain_table_data(tactic_list):
def get_techniques_of_tactic(tactic, techniques):
- """Given a tactic and a full list of techniques, return techniques that
- appear inside of tactic
- """
+ """Given a tactic and a full list of techniques, return techniques that appear inside of tactic."""
techniques_list = []
for technique in techniques:
diff --git a/modules/tactics/tactics_redirections.json b/modules/tactics/tactics_redirections.json
deleted file mode 100644
index ad5b7755d0b..00000000000
--- a/modules/tactics/tactics_redirections.json
+++ /dev/null
@@ -1,7 +0,0 @@
-[
- {
- "title": "tactics-redirect",
- "from": "tactics.html",
- "to": "/tactics/"
- }
-]
\ No newline at end of file
diff --git a/modules/techniques/__init__.py b/modules/techniques/__init__.py
index 1cd380f62de..6ab3867e6b0 100644
--- a/modules/techniques/__init__.py
+++ b/modules/techniques/__init__.py
@@ -1,7 +1,7 @@
-from . import techniques
-from . import techniques_config
import json
+from . import techniques, techniques_config
+
def get_priority():
return techniques_config.priority
@@ -22,12 +22,5 @@ def get_menu():
}
-# TODO resolve infinite redirect loop when run locally. Needs further testing before code removal.
-def get_redirections():
- with open(techniques_config.techniques_redirection_location, "r", encoding="utf8") as json_redirections:
- return json.load(json_redirections)
- return []
-
-
def run_module():
return techniques.generate_techniques(), techniques_config.module_name
diff --git a/modules/techniques/techniques.py b/modules/techniques/techniques.py
index 3f3fb7918be..e254d1a2cb1 100644
--- a/modules/techniques/techniques.py
+++ b/modules/techniques/techniques.py
@@ -21,12 +21,6 @@ def generate_techniques():
if not os.path.isdir(techniques_config.techniques_markdown_path):
os.mkdir(techniques_config.techniques_markdown_path)
- # TODO resolve infinite redirect loop when run locally. Needs further testing before code removal.
- # Generate redirections
- util.buildhelpers.generate_redirections(
- redirections_filename=techniques_config.techniques_redirection_location, redirect_md=site_config.redirect_md
- )
-
# Write the technique index.html page
with open(os.path.join(techniques_config.techniques_markdown_path, "overview.md"), "w", encoding="utf8") as md_file:
md_file.write(techniques_config.technique_overview_md)
diff --git a/modules/techniques/techniques_redirections.json b/modules/techniques/techniques_redirections.json
deleted file mode 100644
index 4424bcaa696..00000000000
--- a/modules/techniques/techniques_redirections.json
+++ /dev/null
@@ -1,7 +0,0 @@
-[
- {
- "title": "techniques-redirect",
- "from": "techniques.html",
- "to": "/techniques/"
- }
-]
\ No newline at end of file
diff --git a/modules/tests/__init__.py b/modules/tests/__init__.py
index e403602f5c6..26dbffafc53 100644
--- a/modules/tests/__init__.py
+++ b/modules/tests/__init__.py
@@ -1,5 +1,4 @@
-from . import tests
-from . import tests_config
+from . import tests, tests_config
def get_priority():
diff --git a/modules/tests/linkchecker.py b/modules/tests/linkchecker.py
index 9e2a7f1f8fb..95ed1efde39 100644
--- a/modules/tests/linkchecker.py
+++ b/modules/tests/linkchecker.py
@@ -1,7 +1,10 @@
import os
import re
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
+from loguru import logger
import modules
from modules import site_config
@@ -10,9 +13,25 @@
# STATIC PROPERTIES
-allowed_in_link_with_external_links = r"-?\w\$\.!\*'()/:"
+IGNORED_LINKS = ["full-coverage.html", "macro-technique-refinement.html"]
-allowed_in_link = r"-?\w\$\.!\*'()/"
+ALLOWED_IN_LINK_INTERNAL = r"-?\w\$\.!\*'()/"
+ALLOWED_IN_LINK_EXTERNAL = r"-?\w\$\.!\*'()/:"
+LINK_POSTFIX = r']+)["\']'
+
+# Pre-compiled regex patterns
+LINK_REGEXES = {
+ "href_internal": re.compile(r'href\s?=\s?["\']([' + ALLOWED_IN_LINK_INTERNAL + LINK_POSTFIX),
+ "href_external": re.compile(r'href\s?=\s?["\']([' + ALLOWED_IN_LINK_EXTERNAL + LINK_POSTFIX),
+ "src_internal": re.compile(r'src\s?=\s?["\']([' + ALLOWED_IN_LINK_INTERNAL + LINK_POSTFIX),
+ "src_external": re.compile(r'src\s?=\s?["\']([' + ALLOWED_IN_LINK_EXTERNAL + LINK_POSTFIX),
+ "cache_bust": re.compile(r"(css|js)\?\w+"),
+}
+
+# Caches
+file_exists_cache = {}
+get_correct_link_cache = {}
+relative_link_cache = {}
links_list = {}
in_use_links = {}
@@ -44,8 +63,13 @@ def remove_extra_from_path(filepath):
def get_correct_link(path):
"""Given a path, return the correct path by adding index.html or removing cache-disabling query string suffix."""
+ # Use cache to avoid redundant work
+ if path in get_correct_link_cache:
+ return get_correct_link_cache[path]
+
# Ignore if it starts with http
if path.startswith("http"):
+ get_correct_link_cache[path] = path
return path
# All paths need to start with /
@@ -71,7 +95,7 @@ def get_correct_link(path):
"docx",
"rtf",
]:
- if re.search(r"(css|js)\?\w+", sort_of_extension):
+ if LINK_REGEXES["cache_bust"].search(sort_of_extension):
# CSS & JavaScript: check for cache-disabling query string suffix, e.g style.min.css?f8be4c06
path = path.split("?")[0] # remove suffix
else:
@@ -80,10 +104,10 @@ def get_correct_link(path):
# serving a file add index.html to replicate webserver
# functionality
if not path.endswith("/"):
- # logger.debug(f"does this even happen? even once? {path}")
path += "/"
path += "index.html"
+ get_correct_link_cache[path] = path
return path
@@ -109,7 +133,7 @@ def internal_link_test(link):
web_dir = site_config.web_directory.split(site_config.subdirectory)[0]
else:
web_dir = site_config.web_directory
-
+
# Get correct link path
path = web_dir + get_correct_link(link)
@@ -125,18 +149,26 @@ def internal_link_test(link):
from_index_path = path.split("/index.html")
from_index_path = from_index_path[0] + ".html"
- if os.path.exists(path) or os.path.exists(to_index_path) or os.path.exists(from_index_path):
- return False
- else:
- return True
+ # Use cache for file existence checks
+ for test_path in (path, to_index_path, from_index_path):
+ if test_path in file_exists_cache:
+ if file_exists_cache[test_path]:
+ return False
+ else:
+ exists = os.path.exists(test_path)
+ file_exists_cache[test_path] = exists
+ if exists:
+ return False
+ return True
def check_if_relative_link(link):
"""Given a link, return true if it is a relative path."""
- if not link.startswith("http"):
- if not link.startswith("/"):
- return True
- return False
+ if link in relative_link_cache:
+ return relative_link_cache[link]
+ result = not link.startswith("http") and not link.startswith("/")
+ relative_link_cache[link] = result
+ return result
def internal_external_link_checker(filepath, html_str):
@@ -154,9 +186,9 @@ def internal_external_link_checker(filepath, html_str):
if (
"/versions/" in filepath
): # don't check links with data-test-ignore attribute, or live version link name, when on previous versions
- linkregex = rf'{prefix}\s?=\s?["\']([{allowed_in_link_with_external_links}]+)["\'](?! ?data-test-ignore="true")(?!>live version)'
+ linkregex = rf'{prefix}\s?=\s?["\']([{ALLOWED_IN_LINK_EXTERNAL}]+)["\'](?! ?data-test-ignore="true")(?!>live version)'
else:
- linkregex = rf"{prefix}\s?=\s?[\"']([{allowed_in_link_with_external_links}]+)[\"']"
+ linkregex = rf"{prefix}\s?=\s?[\"']([{ALLOWED_IN_LINK_EXTERNAL}]+)[\"']"
links = re.findall(linkregex, html_str)
# check if link has a dest
@@ -215,7 +247,7 @@ def internal_link_checker(filepath, html_str):
# find all links
for prefix in ["href", "src"]:
- links = re.findall(rf"{prefix}\s?=\s?[\"']([{allowed_in_link}]+)[\"']", html_str)
+ links = re.findall(rf"{prefix}\s?=\s?[\"']([{ALLOWED_IN_LINK_INTERNAL}]+)[\"']", html_str)
# check if link has a dest
for link in links:
# Check if link is relative path
@@ -229,6 +261,11 @@ def internal_link_checker(filepath, html_str):
# Get correct path
link = get_correct_link(link)
+ for ignored_link in IGNORED_LINKS:
+ if ignored_link in link:
+ logger.debug(f"Ignoring link: {link}")
+ links_list[link] = None
+
# Check if link is in use
check_if_link_in_use(filepath, link)
@@ -350,34 +387,39 @@ def check_links(external_links=False):
filenames = []
internal_problem = False
+ html_filepaths = []
for directory, _, files in os.walk(site_config.web_directory):
for filename in filter(lambda f: f.endswith(".html"), files):
filepath = os.path.join(directory, filename)
-
filenames.append(filepath)
-
- # Do not check previous dir with external links
- if external_links and "previous" not in directory and "versions" not in directory:
- report = check_links_on_page(filepath, True)
+ html_filepaths.append(filepath)
+
+ # Parallelize link checking
+ max_workers = min(32, os.cpu_count() or 4)
+ lock = threading.Lock()
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ future_to_filepath = {}
+ for filepath in html_filepaths:
+ if external_links and "previous" not in filepath and "versions" not in filepath:
+ future = executor.submit(check_links_on_page, filepath, True)
else:
- report = check_links_on_page(filepath)
-
- # Set internal problem flag to true if internal
- # problem is found. We want to exit out on error if an internal
- # link is broken
- if not internal_problem:
- if report.get("internal_problem"):
+ future = executor.submit(check_links_on_page, filepath)
+ future_to_filepath[future] = filepath
+
+ for future in as_completed(future_to_filepath):
+ report = future.result()
+ with lock:
+ # Set internal problem flag to true if internal link is broken
+ if not internal_problem and report.get("internal_problem"):
internal_problem = True
-
- if report.get("problems"):
- broken_pages.append(report)
-
- if report.get("relative_links"):
- relative_links_report = {
- "path": report["path"],
- "relative_links": report["relative_links"],
- }
- relative_links.append(relative_links_report)
+ if report.get("problems"):
+ broken_pages.append(report)
+ if report.get("relative_links"):
+ relative_links_report = {
+ "path": report["path"],
+ "relative_links": report["relative_links"],
+ }
+ relative_links.append(relative_links_report)
# Get unlinked pages list
unlinked_pages = check_unlinked_pages(filenames)
diff --git a/modules/tests/tests.py b/modules/tests/tests.py
index 05c9595064e..3d656dfa956 100644
--- a/modules/tests/tests.py
+++ b/modules/tests/tests.py
@@ -12,7 +12,7 @@
def run_tests():
- """Run tests"""
+ """Run tests."""
error_list = []
tests = 0
@@ -118,7 +118,7 @@ def display_error_report(report_file, error_count, error_type):
def check_links(external_links):
- """Wrapper to check internal and/or external links"""
+ """Check internal and/or external links."""
# Link test
TEST = "Links"
@@ -171,7 +171,7 @@ def check_links(external_links):
def check_citations():
- """Wrapper to check for broken citations"""
+ """Check for broken citations."""
TEST = "Broken Citations"
logger.info(f"RUNNING {TEST}")
@@ -190,7 +190,7 @@ def check_citations():
def check_size():
- """Wrapper to check output size for Github's limit"""
+ """Check output size for Github's limit."""
TEST = "Output Folder Size"
logger.info(f"RUNNING {TEST}")
diff --git a/modules/util/__init__.py b/modules/util/__init__.py
index 5994d03cd71..6d2cca402b3 100644
--- a/modules/util/__init__.py
+++ b/modules/util/__init__.py
@@ -1,4 +1,8 @@
-from . import relationshipgetters
-from . import relationshiphelpers
-from . import buildhelpers
-from . import stixhelpers
+from . import buildhelpers, relationshipgetters, relationshiphelpers, stixhelpers
+
+__all__ = [
+ "relationshipgetters",
+ "relationshiphelpers",
+ "buildhelpers",
+ "stixhelpers",
+]
diff --git a/modules/util/buildhelpers.py b/modules/util/buildhelpers.py
index 94b952c059b..97371a7d4d0 100644
--- a/modules/util/buildhelpers.py
+++ b/modules/util/buildhelpers.py
@@ -1,12 +1,9 @@
import datetime
import json
-import math
import os
import re
import shutil
-import string
import sys
-import uuid
import bleach
from loguru import logger
@@ -18,7 +15,7 @@
def timestamp():
- """This method is here to return a timestamp."""
+ """Return a timestamp."""
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
return timestamp
@@ -56,8 +53,10 @@ def get_first_last_seen_dates(obj):
def get_first_last_seen_citations(obj):
- """Given an object, return the first seen and last seen citations. This function generates
- the descriptions/citations for the first/last seen fields."""
+ """Return the first seen and last seen citations of an object.
+
+ This function generates the descriptions/citations for the first/last seen fields.
+ """
data = {}
if obj.get("x_mitre_first_seen_citation"):
data["first_seen_citation"] = obj.get("x_mitre_first_seen_citation")
@@ -75,7 +74,7 @@ def format_date_as_month_year(date):
def find_index_id(ext_ref):
- """This method will search for the index of the external_id in the external reference list."""
+ """Search for the index of the external_id in the external reference list."""
count = 0
flag = True
while count < len(ext_ref) and flag:
@@ -132,8 +131,7 @@ def update_reference_list(reference_list, obj):
def get_reference_set(reflist):
- """This function retrieves the unique set of references in the given list of descriptions and
- returns them in string format to be displayed as citations."""
+ """Retrieve the unique set of references in the given list of descriptions and return them in string format to be displayed as citations."""
p = re.compile(r"\(Citation: (.*?)\)")
citations = {}
for c in reflist:
@@ -146,7 +144,7 @@ def get_reference_set(reflist):
def get_alias_data(alias_list, ext_refs):
- """This function generates the Alias Description section for the pages."""
+ """Generate the Alias Description section for the pages."""
if not alias_list:
return []
@@ -228,7 +226,7 @@ def get_technique_table_data(tactic, techniques_list):
sub_data["name"] = subtechnique["object"]["name"]
sub_attack_id = get_attack_id(subtechnique["object"])
if sub_attack_id:
- if not "." in sub_attack_id:
+ if "." not in sub_attack_id:
raise Exception(f"{attack_id} subtechnique's attackID '{sub_attack_id}' is malformed")
sub_data["id"] = sub_attack_id.split(".")[1]
sub_data["descr"] = subtechnique["object"]["description"]
@@ -296,7 +294,6 @@ def get_element_data(element):
def get_side_menu_data(side_nav_title, path_prefix, elements_list, domain=None):
"""Responsible for generating the links that are located on the left side of pages for desktop clients."""
-
elements_data = []
for element in elements_list:
@@ -426,7 +423,7 @@ def technique_used_helper(technique_list, technique, reference_list, inherited=F
# Check if parent ID was added by sub-technique
# parent technique will be marked as not used
- elif technique_list[attack_id]["technique_used"] == False:
+ elif not technique_list[attack_id]["technique_used"]:
# Include as a technique used
technique_list[attack_id]["technique_used"] = True
@@ -505,7 +502,6 @@ def replace_html_chars(to_be_replaced):
def get_navigator_layers(name, attack_id, obj_type, rel_type, version, techniques_used, inheritance=False):
"""Generate the Enterprise, Mobile, and ICS Navigator JSON layers for the given object."""
-
# Generate Enterprise base layer
enterprise_layer = build_base_layer("enterprise-attack", name, obj_type, rel_type, attack_id, version, inheritance)
@@ -679,31 +675,6 @@ def add_platform_path(platforms):
return platforms
-def print_start(name):
- """Given a name and a time, display current progress."""
- number_of_hyphens = 40
- name_space = 22
-
- hyphens = "-" * number_of_hyphens
-
- sys.stdout.write(f"\r{name: <{name_space}} : {hyphens} Running...")
-
- sys.stdout.flush()
-
-
-def print_end(name, start_time, end_time):
- """Given a name and a time, display current progress."""
- number_of_hyphens = 40
- name_space = 22
-
- hyphens = "-" * number_of_hyphens
-
- # spaces here because we need to overwrite the word "running"
- sys.stdout.write(f"\r{name: <{name_space}} : {hyphens} {end_time - start_time:.2f}s \n")
-
- sys.stdout.flush()
-
-
def filter_techniques_by_platform(tech_list, platforms):
"""Given a technique list and a platforms list, filter out techniques that are not part of the platforms."""
if not platforms:
diff --git a/modules/util/stixhelpers.py b/modules/util/stixhelpers.py
index 7437fd4e0c0..540e7bb37cf 100644
--- a/modules/util/stixhelpers.py
+++ b/modules/util/stixhelpers.py
@@ -5,7 +5,6 @@
import requests
import stix2
-import urllib3
from loguru import logger
from requests.adapters import HTTPAdapter, Retry
@@ -392,7 +391,6 @@ def get_domain_resources(types):
def get_stix_memory_stores():
"""Read the json files for each domain and create a dict that contains the memory stores for each domain."""
-
ms = {}
srcs = []
@@ -401,6 +399,7 @@ def get_stix_memory_stores():
for domain in site_config.domains:
stix_filename = None
+ logger.info(f"Loading {domain['name']} domain STIX from: {domain['location']}")
# Download json from http or https
stix_filename = f"{stix_output_dir}/{domain['name']}.json"
@@ -410,7 +409,6 @@ def get_stix_memory_stores():
shutil.copy(domain["location"], str(stix_filename))
if os.path.exists(stix_filename):
- logger.info(f"Loading STIX file from: {stix_filename}")
ms[domain["name"]] = stix2.MemoryStore()
ms[domain["name"]].load_from_file(stix_filename)
else:
diff --git a/modules/util/util_config.py b/modules/util/util_config.py
index 50ca1aa43cb..10bf894164c 100644
--- a/modules/util/util_config.py
+++ b/modules/util/util_config.py
@@ -1,7 +1,5 @@
import shutil
-import colorama
-
# Not found constant
NOT_FOUND = -1
diff --git a/modules/versions/templates/versions.html b/modules/versions/templates/versions.html
index 27884dade2f..9c1747eca97 100644
--- a/modules/versions/templates/versions.html
+++ b/modules/versions/templates/versions.html
@@ -1,4 +1,4 @@
-{% extends "general/two-column.html" %}
+{% extends "general/two-column.html" %}
{% set active_page = "resources" -%}
{% set title = "Version History | MITRE ATT&CK®" -%}
{% set version_history = page.data | from_json %}
@@ -21,8 +21,8 @@
Version History
- The overall ATT&CK catalog is versioned using a major.minor version schema.
- The bi-annual content releases listed on the updates pages increment the major version number.
+ The overall ATT&CK catalog is versioned using a major.minor version schema.
+ The ATT&CK releases listed on the updates pages increment the major version number.
The minor version number increments for our other small releases, which include typo and data corrections but not typically new content.
diff --git a/modules/versions/versions.py b/modules/versions/versions.py
index a2baea51ff4..df685b4ba1b 100644
--- a/modules/versions/versions.py
+++ b/modules/versions/versions.py
@@ -1,126 +1,87 @@
+import concurrent.futures as cf
+import functools
+import io
import json
import os
import re
import shutil
-import stat
+import subprocess
+import tarfile
+import tempfile
from datetime import datetime
+from pathlib import Path
-from git import Repo
+import requests
from loguru import logger
from modules import site_config, util
from . import versions_config
+ALLOWED = r"-?\w\$\.!\*'()/"
+SRC_RE = re.compile(rf'src=["\'](?!/versions/)([{ALLOWED}]+)["\']')
+HREF_RE = re.compile(r'href=(["\'])((?!/(versions|resources)/)/[^"\']*)\1')
+META_REDIR_RE = re.compile(r'content="0; url=(["\']?)(?!/(versions|resources)/|https?://)(/[^"\'>]*)\1')
+LIVE_BTN_RE = re.compile(rf'href=["\']/versions/v[\w-]+/([{ALLOWED}]+)["\'](.*)>[Ll]ive [Vv]ersion')
+
def generate_versions():
"""Responsible for generating the versions pages."""
# Move templates to templates directory
util.buildhelpers.move_templates(versions_config.module_name, versions_config.versions_templates_path)
- # Verify if resources directory exists
- if not os.path.isdir(site_config.resources_markdown_path):
- os.mkdir(site_config.resources_markdown_path)
-
- # Verify if resources directory exists
- if not os.path.isdir(versions_config.versions_markdown_path):
- os.mkdir(versions_config.versions_markdown_path)
-
- deploy()
-
-
-# Error handler for windows by:
-# https://stackoverflow.com/questions/2656322/shutil-rmtree-fails-on-windows-with-access-is-denied
-def onerror(func, path, exc_info):
- """
- Error handler for ``shutil.rmtree``.
-
- If the error is due to an access error (read only file)
- it attempts to add write permission and then retries.
-
- If the error is for another reason it re-raises the error.
-
- Usage : ``shutil.rmtree(path, onerror=onerror)``
- """
- try:
- if not os.access(path, os.W_OK):
- # Is the error an access error ?
- os.chmod(path, stat.S_IWUSR)
- func(path)
- except:
- raise
-
-
-# allowed characters inside of hyperlinks
-allowed_in_link = r"-?\w\$\.!\*'()/"
-
-def versionPath(version):
- """Get the path of a given version."""
- if "path" in version:
- return version["path"]
- else:
- return version["name"].split(".")[0] # parse path from name if not given explicitly
-
+ # Create resources and versions directories
+ os.makedirs(site_config.resources_markdown_path, exist_ok=True)
+ os.makedirs(versions_config.versions_markdown_path, exist_ok=True)
-def deploy():
- """Deploy previous versions to website directory."""
+ # Ensure directories exist
versions_config.prev_versions_deploy_folder = os.path.join(
site_config.web_directory, versions_config.prev_versions_path
)
-
- # TODO we probably don't need to re-clone the website here, just a git pull should be sufficient
- # delete previous copy of attack-versions
- if os.path.exists(versions_config.versions_directory):
- shutil.rmtree(versions_config.versions_directory, onerror=onerror)
- # download new version of attack-website for use in versioning
- logger.info(f"git cloning {versions_config.versions_repo} >>> {versions_config.versions_directory}")
- versions_repo = Repo.clone_from(versions_config.versions_repo, versions_config.versions_directory)
-
- # remove previously deployed previous versions
- if os.path.exists(versions_config.prev_versions_deploy_folder):
- for child in os.listdir(versions_config.prev_versions_deploy_folder):
- if os.path.isdir(os.path.join(versions_config.prev_versions_deploy_folder, child)):
- shutil.rmtree(versions_config.prev_versions_deploy_folder)
+ os.makedirs(versions_config.prev_versions_deploy_folder, exist_ok=True)
with open("data/versions.json", "r") as f:
versions = json.load(f)
- # build previous versions
- for version in versions["previous"]:
- deploy_previous_version(version, versions_repo)
-
- # build the versions page
- build_markdown(versions)
+ logger.info("Deploying preserved versions …")
+ with cf.ThreadPoolExecutor() as executor:
+ list(executor.map(deploy_previous_version, versions["previous"]))
- # Create directory if it does not exist
- if not os.path.isdir(site_config.web_directory):
- os.makedirs(site_config.web_directory)
+ build_markdown(versions=versions)
# write robots.txt to disallow crawlers
+ os.makedirs(site_config.web_directory, exist_ok=True)
with open(os.path.join(site_config.web_directory, "robots.txt"), "w", encoding="utf8") as robots:
robots.write(
- f"User-agent: *\nDisallow: {site_config.subdirectory}/previous/\nDisallow: {site_config.subdirectory}/{versions_config.prev_versions_path}/"
+ f"User-agent: *\n"
+ f"Disallow: {site_config.subdirectory}/previous/\n"
+ f"Disallow: {site_config.subdirectory}/{versions_config.prev_versions_path}/"
)
def deploy_current_version():
- """Build a permalink of the current version."""
+ """Build a permalink of the current version.
+
+ This is only called by the search module's preserve_current_version().
+ """
versions_config.prev_versions_deploy_folder = os.path.join(
site_config.web_directory, versions_config.prev_versions_path
)
with open("data/versions.json", "r") as f:
- version = json.load(f)["current"]
+ version_data = json.load(f)["current"]
- if not os.path.exists(os.path.join(versions_config.prev_versions_deploy_folder, versionPath(version))):
- os.mkdir(os.path.join(versions_config.prev_versions_deploy_folder, versionPath(version)))
+ version_path = version_data["name"].split(".")[0]
+ version_full_path = os.path.join(versions_config.prev_versions_deploy_folder, version_path)
+
+ os.makedirs(version_full_path, exist_ok=True)
for item in os.listdir(site_config.web_directory):
- # skip previous and versions directories when copying
- if item == "previous" or item == "versions":
+ # skip versions directories when copying
+ if item == "versions":
continue
# copy the current version into a preserved version
src = os.path.join(site_config.web_directory, item)
- dest = os.path.join(versions_config.prev_versions_deploy_folder, versionPath(version), item)
+ dest = os.path.join(version_full_path, item)
# copy depending on file type
if os.path.exists(dest):
print(f"error copying {src}: path {dest} already exists | {item}")
@@ -130,215 +91,352 @@ def deploy_current_version():
shutil.copy(src, dest)
# run archival scripts
- archive(version, is_current=True)
+ archive(
+ version_data=version_data,
+ version_path=version_full_path,
+ is_current=True,
+ )
-def deploy_previous_version(version, repo):
- """Build a version of the site to /prev_versions_path. version is a version from versions.json, repo is a reference to the attack-website Repo object."""
- logger.info(f"Building website for ATT&CK {version}")
- # check out the commit for that version
- repo.git.checkout(version["commit"])
- # copy over files
- ignored_stuff = shutil.ignore_patterns(
- ".git", "beta", "CNAME", "robots.txt", "previous", "previous-versions", "versions"
- )
- shutil.copytree(
- os.path.join(versions_config.versions_directory),
- os.path.join(versions_config.prev_versions_deploy_folder, versionPath(version)),
- ignore=ignored_stuff,
+def create_tar_gz(source_dir, output_path):
+ """Create a tar.gz archive from source_dir."""
+ logger.info(f"Creating tar.gz archive: {output_path}")
+ with tarfile.open(output_path, "w:gz") as tar:
+ tar.add(source_dir, arcname=".")
+ logger.info(f"Archive created: {output_path}")
+
+
+def extract_tar_gz(archive_path, dest_path):
+ """Extract a .tar.gz archive to dest_path."""
+ os.makedirs(dest_path, exist_ok=True)
+ with tarfile.open(archive_path, "r:gz") as tar:
+ tar.extractall(path=dest_path)
+
+
+def download_archive(url, local_path):
+ """Download a website archive file from GitHub. Returns True if download succeeded."""
+ try:
+ logger.info(f"Downloading website archive from {url} to {local_path}")
+ r = requests.get(url, timeout=15)
+ r.raise_for_status()
+ with open(local_path, "wb") as f:
+ f.write(r.content)
+ logger.info(f"Successfully downloaded archive: {local_path}")
+ return True
+ except Exception as ex:
+ logger.warning(f"Failed to download archive from {url}: {ex}")
+ return False
+
+
+def export_commit(commit: str, dest_path: str) -> None:
+ """Materialise `commit` into `dest_path` using `git archive`.
+
+ Equivalent to: git archive | tar -x -C
+ """
+ os.makedirs(dest_path, exist_ok=True)
+
+ # Create a tar stream of the commit
+ proc = subprocess.run(
+ ["git", "archive", "--format=tar", commit],
+ stdout=subprocess.PIPE,
+ check=True,
)
- # run archival scripts on version
- archive(version)
- # build alias for version
- for alias in version["aliases"]:
- build_alias(versionPath(version), alias)
+ # Extract the tar stream in-memory
+ with tarfile.open(fileobj=io.BytesIO(proc.stdout)) as tar:
+ tar.extractall(path=dest_path)
+
+
+def export_git_archive_to_file(commit: str, output_path: Path) -> None:
+ """Create a git archive file for the given commit."""
+ logger.info(f"Creating git archive for commit {commit} > {output_path}")
+ with open(output_path, "wb") as out:
+ subprocess.run(
+ ["git", "archive", "--format=tar.gz", commit],
+ stdout=out,
+ check=True,
+ )
+ logger.info(f"Git archive created: {output_path}")
-def archive(version_data, is_current=False):
- """Perform archival operations on a version in /prev_versions_path.
+def deploy_previous_version(version_data):
+ """
+ Build a version of the site to /prev_versions_path.
- - remove unnecessary files (.git, CNAME, preserved versions for that version)
- - replace links on all pages
- - add archived version banner to all pages
+ Attempts to deploy using a local archive, then remote URL,
+ falling back to git archive if neither is available.
"""
- version = versionPath(version_data)
-
- version_path = os.path.join(
- versions_config.prev_versions_deploy_folder, version
- ) # root of the filesystem containing the version
- version_url_path = os.path.join(
- versions_config.prev_versions_path, version
- ) # root of the URL of the version, for prefixing URLs
-
- def saferemove(path, type):
- if not os.path.exists(path):
- return
- if type == "file":
- os.remove(path)
- elif type == "directory":
- shutil.rmtree(path, onerror=onerror)
-
- # remove .git
- saferemove(os.path.join(version_path, ".git"), "directory")
- # remove beta directory
- saferemove(os.path.join(version_path, "beta"), "directory")
- # remove CNAME
- saferemove(os.path.join(version_path, "CNAME"), "file")
- # remove robots
- saferemove(os.path.join(version_path, "robots.txt"), "file")
-
- # remove previous versions from this previous version
- for prev_directory in map(
- lambda d: os.path.join(version_path, d),
- [
- "previous",
- versions_config.prev_versions_path,
- os.path.join("resources", "previous-versions"),
- os.path.join("resources", "versions"),
- ],
- ):
- if os.path.exists(prev_directory):
- shutil.rmtree(prev_directory, onerror=onerror)
-
- # remove updates page
- updates_dir = os.path.join(version_path, "resources", "updates")
- if os.path.exists(updates_dir):
- shutil.rmtree(updates_dir, onerror=onerror)
-
- # walk version HTML files
- for directory, _, files in os.walk(version_path):
- for filename in filter(lambda f: f.endswith(".html"), files):
- # replace links in the file
-
- # open the file
- filepath = os.path.join(directory, filename)
- with open(filepath, mode="r", encoding="utf8") as html:
- html_str = html.read()
-
- # helper function to substitute links so that they point to /versions/
- dest_link_format = rf"/{version_url_path}\g<1>"
-
- def substitute(prefix, html_str):
- fromstr = rf"{prefix}=[\"'](?!\/versions\/)([{allowed_in_link}]+)[\"']"
- tostr = f'{prefix}="{dest_link_format}"'
- return re.sub(fromstr, tostr, html_str)
-
- # ditto, but for redirections
- def substitute_redirection(prefix, html_str):
- from_str = f"{prefix}=([{allowed_in_link}]+)[\"']"
- to_str = f'{prefix}={dest_link_format}"'
- return re.sub(from_str, to_str, html_str)
-
- # replace links so that they properly point to where the version is stored
- html_str = substitute("src", html_str)
- html_str = substitute("href", html_str)
- html_str = substitute_redirection('content="0; url', html_str)
- # update links to previous-versions to point to the main site instead of an archived page
- for previous_page in ["previous-versions", "versions"]: # backwards compatability
- html_str = html_str.replace(f"/{version_url_path}/resources/{previous_page}/", "/resources/versions/")
- # update links to updates to point to main site instead of archied page
- html_str = html_str.replace(f"/{version_url_path}/resources/updates/", "/resources/updates/")
-
- # update versioning button to show the permalink site version, aka "back to main site"
- html_str = html_str.replace("version-button live", "version-button permalink")
- # update live version links on the versioning button
- from_str = rf"href=[\"']\/versions\/v[\w-]+\/([{allowed_in_link}]+)[\"'](.*)>[Ll]ive [Vv]ersion<\/a>"
- to_str = r'href="/\g<1>"\g<2>>Live Version'
- html_str = re.sub(from_str, to_str, html_str)
-
- # remove banner message if it is present
- for banner_class in ["banner-message", "under-development"]: # backwards compatability
- html_str = html_str.replace(banner_class, "d-none") # hide the banner
-
- # format banner depending on if this is the current version or a previous version
- if is_current:
- version_marking = f'Currently viewing ATT&CK {version_data["name"]} which is the current version of ATT&CK.'
- else:
- version_marking = f'Currently viewing ATT&CK {version_data["name"]} which was live between {version_data["date_start"]} and {version_data["date_end"]}.'
-
- # add versions banner
- for banner_tag in [
- "",
- "",
- ]: # backwards compatability
- html_str = html_str.replace(
- banner_tag,
- (
- ''
- ),
- )
+ version_name = version_data["name"] # e.g. v16.1
+ logger.info(f"Building website for ATT&CK {version_name}")
+
+ vpath = version_name.split(".")[0]
+ dest_path = os.path.join(versions_config.prev_versions_deploy_folder, vpath)
+
+ # Find the archive directory to use based on environment variable or command line argument
+ archive_dir = site_config.ATTACK_VERSION_ARCHIVES
+ if site_config.args.version_archive_dir:
+ archive_dir = site_config.args.version_archive_dir
+
+ os.makedirs(archive_dir, exist_ok=True)
+ archive_filename = f"website-{version_name}.tar.gz"
+ archive_path = os.path.join(archive_dir, archive_filename)
+ archive_url = (
+ f"https://github.com/mitre-attack/attack-website/releases/download/archived-website-files/{archive_filename}"
+ )
- # overwrite with updated html
- with open(filepath, mode="w", encoding="utf8") as updated_html:
- updated_html.write(html_str)
+ if os.path.exists(archive_path):
+ logger.info(f"{version_name}: extracting from local archive {archive_filename}")
+ extract_tar_gz(archive_path=archive_path, dest_path=dest_path)
+ return
- # update settings js file
- settings_path = os.path.join(version_path, "theme", "scripts", "settings.js")
- if os.path.exists(settings_path):
- with open(settings_path, mode="r", encoding="utf8") as settings_file:
- settings_contents = settings_file.read()
+ if download_archive(url=archive_url, local_path=archive_path):
+ logger.info(f"{version_name}: extracting downloaded archive {archive_filename}")
+ extract_tar_gz(archive_path=archive_path, dest_path=dest_path)
+ return
+
+ # this saves the cleaned up version archive for the future
+ logger.warning(f"{version_name}: download failed, falling back to git archive")
+ create_version_archive(version_data=version_data, output_dir=archive_dir)
+ extract_tar_gz(archive_path=archive_path, dest_path=dest_path)
+
+
+def process_html_file(path: str, version_url_path: str, is_current: bool, version_data: dict):
+ """Rewrite one HTML file in place."""
+ try:
+ with open(path, "r", encoding="utf8") as f:
+ html = f.read()
+
+ ### regex replacements
+ # replace links so that they properly point to where the version is stored
+ html = SRC_RE.sub(lambda m: f'src="/{version_url_path}{m.group(1)}"', html)
+ html = HREF_RE.sub(lambda m: f"href={m.group(1)}/{version_url_path}/{m.group(2)[1:]}{m.group(1)}", html)
+ html = META_REDIR_RE.sub(
+ lambda m: f'content="0; url=/{version_url_path}{"/" if m.group(3) == "/" else m.group(3)}'
+ + (m.group(1) if m.group(1) else ""),
+ html,
+ )
+ # update live version links on the versioning button
+ html = LIVE_BTN_RE.sub(lambda m: f'href="/{m.group(1)}"{m.group(2)}>Live Version', html)
+
+ ### simple string replacements
+ # update links to previous-versions to point to the main site instead of an archived page
+ html = html.replace(f"/{version_url_path}/resources/previous-versions/", "/resources/versions/")
+ html = html.replace(f"/{version_url_path}/resources/versions/", "/resources/versions/")
+ # update links to updates to point to main site instead of archived page
+ html = html.replace(f"/{version_url_path}/resources/updates/", "/resources/updates/")
+
+ # update links to docs
+ html = html.replace(f"/{version_url_path}/docs/", "/docs/")
+
+ # update versioning button to show the permalink site version: "back to main site"
+ html = html.replace("version-button live", "version-button permalink")
+ # remove banner message if it is present
+ html = html.replace("banner-message", "d-none")
+ html = html.replace("under-development", "d-none")
+
+ # banner injection
+ if is_current:
+ marking = (
+ "Currently viewing "
+ f''
+ f"ATT&CK {version_data['name']}"
+ " "
+ "which is the current version of ATT&CK."
+ )
+ else:
+ marking = (
+ "Currently viewing "
+ f''
+ f"ATT&CK {version_data['name']}"
+ " "
+ f"which was live between {version_data['date_start']} and {version_data['date_end']}."
+ )
+ banner_html = (
+ ''
+ )
+ html = html.replace("", banner_html)
+ html = html.replace("", banner_html)
+
+ # Special case!
+ if version_url_path == "versions/v13" and "techniques/T1037/004/index.html" in path:
+ logger.info("REMOVING BROKEN LINK in /versions/v13/techniques/T1037/004/index.html")
+ # Replace Launchd with Launchd
+ # ATT&CK v13 somehow missed T1053.004 in the STIX and it was later brought back and deprecated
+ html = re.sub(
+ r']*>(.*?)',
+ r"\1",
+ html,
+ flags=re.IGNORECASE | re.DOTALL,
+ )
+
+ # overwrite with updated html
+ with open(path, "w", encoding="utf8") as f:
+ f.write(html)
+
+ except Exception as e:
+ logger.warning(f"Failed to process {path}: {e}")
+
+
+def process_html_files(version_data: dict, version_path: str, is_current: bool = False):
+ """Process all HTML files in a version directory."""
+ version = version_data["name"].split(".")[0]
+ version_url_path = f"versions/{version}"
+
+ logger.info(f"{version}: rewriting html")
+
+ worker = functools.partial(
+ process_html_file, version_url_path=version_url_path, is_current=is_current, version_data=version_data
+ )
+ html_files = []
+ for root, _, files in os.walk(version_path):
+ html_files.extend(os.path.join(root, f) for f in files if f.endswith(".html"))
+
+ with cf.ThreadPoolExecutor() as pool:
+ pool.map(worker, html_files)
- settings_contents = re.sub('base_url ?= ?"(.*)"', rf'base_url = "/{version_url_path}\1"', settings_contents)
- settings_contents = re.sub("tour_steps ?= .*;", "tour_steps = {};", settings_contents)
- with open(settings_path, mode="w", encoding="utf8") as settings_file:
- settings_file.write(settings_contents)
+def process_search_files(version_data: dict, version_path: str):
+ """Process search-related JavaScript files in a version directory."""
+ version = version_data["name"].split(".")[0]
+ version_url_path = f"versions/{version}"
+
+ logger.info(f"{version}: rewriting search files")
+
+ # tweak settings.js / for search capability. works for ATT&CK v7 onwards
+ settings_path = os.path.join(version_path, "theme", "scripts", "settings.js")
+ if os.path.exists(settings_path):
+ with open(settings_path, "r", encoding="utf8") as sf:
+ contents = sf.read()
+ contents = re.sub(r'base_url ?= ?"(.+?)"', rf'base_url = "/{version_url_path}\1"', contents)
+ contents = re.sub(r"tour_steps ?= .*?;", "tour_steps = {};", contents)
+ with open(settings_path, "w", encoding="utf8") as sf:
+ sf.write(contents)
else:
- # update search page for old versions of the site
- for search_file_name in ["search_bundle.js"]:
+ # legacy search path for ATT&CK version 6 and prior
+ # NOTE: search.js and search_babelized.js are in v7-v12, but they don't need to be updated since settings.js is updated above
+ for search_file_name in ["search.js", "search_babelized.js"]:
search_file_path = os.path.join(version_path, "theme", "scripts", search_file_name)
if os.path.exists(search_file_path):
- with open(search_file_path, mode="r", encoding="utf8") as search_file:
- search_contents = search_file.read()
-
+ with open(search_file_path, "r", encoding="utf8") as f:
+ search_contents = f.read()
search_contents = re.sub(
- 'site_base_url ?= ?""', f'site_base_url = "/{version_url_path}"', search_contents
+ r'site_base_url ?= ?""', f'site_base_url = "/{version_url_path}"', search_contents
)
+ with open(search_file_path, "w", encoding="utf8") as f:
+ f.write(search_contents)
+
+
+def saferemove(p):
+ """Safely remove a file or directory."""
+ if not os.path.exists(p):
+ return
+ if os.path.isfile(p):
+ os.remove(p)
+ elif os.path.isdir(p):
+ shutil.rmtree(p)
+
+
+def remove_unwanted_files(extract_dir):
+ """Remove unnecessary files and directories from previous website version."""
+ logger.info("Cleaning extracted archive (removing unnecessary files/folders)…")
+ targets = [
+ ".git",
+ ".well-known",
+ "beta",
+ "docs",
+ "mobile",
+ "previous",
+ "versions",
+ "w",
+ "wiki",
+ "resources",
+ "CNAME",
+ "robots.txt",
+ "assets.html",
+ "campaigns.html",
+ "groups.html",
+ "software.html",
+ "tactics.html",
+ "techniques.html",
+ "full-coverage.html",
+ "macro-technique-refinement.html",
+ ]
+ for rel_path in targets:
+ target_path = os.path.join(extract_dir, rel_path)
+ saferemove(target_path)
+ logger.info("Cleaning complete.")
+
+
+def archive(version_data: dict, version_path: str, is_current: bool = False):
+ """Post-process an exported version folder.
+
+ – remove unnecessary files,
+ – rewrite all HTML files (parallel),
+ – tweak settings.js / search_bundle.js (kept from the old code).
+ """
+ remove_unwanted_files(extract_dir=version_path)
+ process_html_files(version_data, version_path, is_current)
+ process_search_files(version_data, version_path)
+ fix_permissions(version_path)
- with open(search_file_path, mode="w", encoding="utf8") as search_file:
- search_file.write(search_contents)
+def fix_permissions(root_dir):
+ """Set directories to 755 and files to 644 recursively."""
+ for dirpath, dirnames, filenames in os.walk(root_dir):
+ os.chmod(dirpath, 0o755)
+ for filename in filenames:
+ os.chmod(os.path.join(dirpath, filename), 0o644)
-def build_alias(version, alias):
- """Build redirects from alias to version.
- version is the path of the version, e.g "v5"
- alias is the alias to build, e.g "october2018"
+def create_version_archive(version_data: dict, output_dir: str = "attack-version-archives"):
+ """Create a cleaned archive for a specific version.
+
+ This function is designed to be called from archive-website.py
+ for batch processing versions into archives.
"""
- for root, folder, files in os.walk(os.path.join(versions_config.prev_versions_deploy_folder, version)):
- for thefile in files:
- # where the file should go
- newRoot = root.replace(version, alias).replace(versions_config.prev_versions_path, "previous")
- # file to build
- redirectFrom = os.path.join(newRoot, thefile)
-
- # where this file should point to
- if thefile == "index.html":
- redirectTo = root # index.html is implicit
- else:
- redirectTo = "/".join([root, thefile]) # file is not index.html so it needs to be specified explicitly
- redirectTo = redirectTo.split(site_config.parent_web_directory)[1] # remove parent web folder from path
-
- # write the redirect file
- if not os.path.isdir(newRoot):
- os.makedirs(newRoot, exist_ok=True) # make parents as well
- with open(redirectFrom, "w") as f:
- f.write(f'')
+ version_label = version_data["name"]
+ commit_to_use = version_data["commit"]
+
+ logger.info(f"--- Processing version {version_label} ---")
+
+ cleaned_dir = Path(output_dir)
+ cleaned_dir.mkdir(parents=True, exist_ok=True)
+
+ cleaned_archive_name = f"website-{version_label}.tar.gz"
+ cleaned_archive_path = cleaned_dir / cleaned_archive_name
+
+ # main temp dir for extracted/cleaned contents
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # dedicated temp dir for git archive tarball
+ with tempfile.TemporaryDirectory() as archive_tmpdir:
+ archive_tmp_path = Path(archive_tmpdir)
+ git_archive_name = f"website-{version_label}-git-archive.tar.gz"
+ git_archive_path = archive_tmp_path / git_archive_name
+
+ export_git_archive_to_file(commit_to_use, git_archive_path)
+ extract_tar_gz(git_archive_path, tmpdir)
+
+ archive(version_data=version_data, version_path=tmpdir, is_current=False)
+
+ create_tar_gz(tmpdir, cleaned_archive_path)
+
+ logger.info(f"--- Finished version {version_label} ---\n")
def build_markdown(versions):
"""Build markdown for the versions list page."""
# build urls
- versions["current"]["url"] = versionPath(versions["current"])
+ versions["current"]["url"] = versions["current"]["name"].split(".")[0]
versions["current"]["changelog_label"] = " ".join(versions["current"]["changelog"].split("-")[1:]).title()
for versionGroup in ["previous", "older"]: # apply transforms to both previous and older
- for version in versions[versionGroup]:
- version["url"] = versionPath(version)
- version["changelog_label"] = " ".join(version["changelog"].split("-")[1:]).title()
+ for version_data in versions[versionGroup]:
+ version_data["url"] = version_data["name"].split(".")[0]
+ version_data["changelog_label"] = " ".join(version_data["changelog"].split("-")[1:]).title()
# sort previous versions by date
versions_data = {
diff --git a/modules/versions/versions_config.py b/modules/versions/versions_config.py
index 8bc15a181f7..50249a52047 100644
--- a/modules/versions/versions_config.py
+++ b/modules/versions/versions_config.py
@@ -1,7 +1,3 @@
-import os
-
-from modules import site_config
-
module_name = "Versions"
priority = 8.1
@@ -14,6 +10,5 @@
prev_versions_path = "versions"
prev_versions_deploy_folder = ""
-versions_repo = "https://github.com/mitre-attack/attack-website.git"
versions_directory = "attack-versions"
versions_md = "Title: Version History\nTemplate: versions/versions\nsave_as: resources/versions/index.html\ndata: "
diff --git a/modules/website_build/website_build.py b/modules/website_build/website_build.py
index 28ab9aeb8e8..586b7131d46 100644
--- a/modules/website_build/website_build.py
+++ b/modules/website_build/website_build.py
@@ -1,7 +1,6 @@
import hashlib
import json
import os
-import shutil
import subprocess
from string import Template
@@ -42,7 +41,6 @@ def generate_website():
pelican_content()
# this is nice to have if you want to run pelican manually later
# remove_pelican_settings()
- remove_unwanted_output()
def generate_uuid_from_seeds(content_version, website_version):
@@ -53,7 +51,8 @@ def generate_uuid_from_seeds(content_version, website_version):
- content_version (str): Semantic version of the content without a leading 'v'.
- website_version (str): Semantic version of the website with a leading 'v'.
- Returns:
+ Returns
+ -------
- str: A UUID generated based on the two versions.
"""
# Combine and hash the values
@@ -269,45 +268,6 @@ def remove_pelican_settings():
os.remove(pelican_settings_f)
-def remove_unwanted_output():
- """Remove unwanted files from the output directory."""
- logger.info("Removing unwanted files from the output directory")
- # Files to be deleted:
- # archives.html, authors.html, categories.html, tags.html,
- # author\blake-strom.html, category\cyber-threat-intelligence.html
-
- archives_path = os.path.join(site_config.web_directory, "archives.html")
- authors_path = os.path.join(site_config.web_directory, "authors.html")
- categories_path = os.path.join(site_config.web_directory, "categories.html")
- tags_path = os.path.join(site_config.web_directory, "tags.html")
- author_path = os.path.join(site_config.web_directory, "author")
- category_path = os.path.join(site_config.web_directory, "category")
-
- if os.path.exists(archives_path):
- logger.debug(f"Removing: {archives_path}")
- os.remove(archives_path)
-
- if os.path.exists(authors_path):
- logger.debug(f"Removing: {authors_path}")
- os.remove(authors_path)
-
- if os.path.exists(categories_path):
- logger.debug(f"Removing: {categories_path}")
- os.remove(categories_path)
-
- if os.path.exists(tags_path):
- logger.debug(f"Removing: {tags_path}")
- os.remove(tags_path)
-
- if os.path.exists(author_path):
- logger.debug(f"Removing: {author_path}")
- shutil.rmtree(author_path)
-
- if os.path.exists(category_path):
- logger.debug(f"Removing: {category_path}")
- shutil.rmtree(category_path)
-
-
def generate_static_pages():
"""Read markdown files from the static pages directory and copies them into the markdown directory."""
logger.info("Generating static pages")
diff --git a/pelicanconf.py b/pelicanconf.py
index 320118d5e71..85152bb2547 100644
--- a/pelicanconf.py
+++ b/pelicanconf.py
@@ -22,6 +22,12 @@
DEFAULT_LANG = os.environ.get("PELICAN_DEFAULT_LANG", "en")
THEME = "attack-theme"
+ARCHIVES_SAVE_AS = ""
+AUTHOR_SAVE_AS = ""
+AUTHORS_SAVE_AS = ""
+CATEGORY_SAVE_AS = ""
+CATEGORIES_SAVE_AS = ""
+TAGS_SAVE_AS = ""
# Feed generation is usually not desired when developing
FEED_ALL_ATOM = None
diff --git a/requirements.txt b/requirements.txt
index 5845c2d13fd..289ccc84aa9 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,7 +4,7 @@ bleach==6.1.0
colorama==0.4.6
future==1.0.0
loguru==0.7.2
-mitreattack-python==4.0.2
+mitreattack-python==5.1.0
pelican==4.10.2
python-dotenv==1.0.1
requests==2.32.3
diff --git a/update-attack.py b/update-attack.py
index a712b787d0f..9e72f9d29b8 100644
--- a/update-attack.py
+++ b/update-attack.py
@@ -1,12 +1,11 @@
import argparse
import time
-import colorama
from dotenv import load_dotenv
from loguru import logger
import modules
-from modules import site_config, util
+from modules import site_config
load_dotenv()
@@ -131,6 +130,13 @@ def get_parsed_args():
action="store_true",
help="Forces application to exit with success status codes even if tests fail.",
)
+ parser.add_argument(
+ "--version-archive-dir",
+ type=str,
+ help=(
+ "If specified, sets the directory for the ATT&CK version archives. Defaults to attack-version-archives"
+ ),
+ )
parser.add_argument(
"--banner",
type=str,
@@ -184,7 +190,7 @@ def remove_from_build(arg_modules, arg_extras):
"""Given a list of modules from command line, remove modules that appear in module directory that are not in list."""
def remove_from_running_pool():
- """Remove modules from running pool if they are not in modules list from argument"""
+ """Remove modules from running pool if they are not in modules list from argument."""
copy_of_modules = []
for module in modules.run_ptr:
@@ -194,7 +200,7 @@ def remove_from_running_pool():
modules.run_ptr = copy_of_modules
def remove_from_menu():
- """Remove modules from menu if they are not in modules list from argument"""
+ """Remove modules from menu if they are not in modules list from argument."""
copy_of_menu = []
for module in modules.menu_ptr:
@@ -211,31 +217,41 @@ def remove_from_menu():
remove_from_menu()
-if __name__ == "__main__":
- """Beginning of ATT&CK update module"""
+def main():
+ """Entry point for the update script."""
# Get args
args = get_parsed_args()
# Remove modules from build
remove_from_build(args.modules, args.extras)
+ # Print only the modules that will be run, marking extras
+ logger.info("Building website using the following modules in this order:")
+ for m in modules.run_ptr:
+ mod_name = m["module_name"]
+ if mod_name.lower() in extras:
+ logger.info(f"{mod_name} [extra]")
+ else:
+ logger.info(f"{mod_name}")
+
# Arguments used for pelican
site_config.send_to_pelican("no_stix_link_replacement", args.no_stix_link_replacement)
# Start time of update
update_start = time.time()
- # Init colorama for output
- colorama.init()
-
# Get running modules and priorities
for ptr in modules.run_ptr:
- util.buildhelpers.print_start(ptr["module_name"])
+ logger.info(f"RUNNING MODULE: {ptr['module_name']}")
start_time = time.time()
ptr["run_module"]()
end_time = time.time()
- util.buildhelpers.print_end(ptr["module_name"], start_time, end_time)
+ logger.info(f"FINISHED MODULE: {ptr['module_name']} in {end_time - start_time:.2f} seconds")
# Print end of module
update_end = time.time()
- util.buildhelpers.print_end("TOTAL Update Time", update_start, update_end)
+ logger.info(f"TOTAL Update Time: {update_end - update_start:.2f} seconds")
+
+
+if __name__ == "__main__":
+ main()