Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 153 additions & 17 deletions api_app/analyzers_manager/file_analyzers/yara_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import logging
import math
import os
import shutil
import tempfile
import zipfile
from pathlib import PosixPath
from pathlib import Path, PosixPath
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse

Expand Down Expand Up @@ -86,8 +88,10 @@ def directory(self) -> PosixPath:

def update(self):
logger.info(f"Starting update of {self.url}")
if self.is_zip():
# private url not supported at the moment for private

if self.is_unprotect_api():
self._update_unprotect_api()
elif self.is_zip():
self._update_zip()
else:
self._update_git()
Expand Down Expand Up @@ -149,9 +153,111 @@ def _update_git(self):
if settings.GIT_KEY_PATH.exists():
os.remove(settings.GIT_KEY_PATH)

@staticmethod
def _write_rule_to_temp(rule, temp_dir):
"""Helper to write an individual rule to the temp directory."""
try:
# Check for YARA rule content in multiple possible fields
yara_rule = rule.get("yara_rule") or rule.get("yara_rules") or rule.get("rule")

if not yara_rule or not isinstance(yara_rule, str):
return False

# Sanitize filename: Remove spaces and non-alphanumeric chars
rule_name = rule.get("name") or "unprotect_rule"
clean_name = "".join(filter(str.isalnum, str(rule_name))) or "unprotect_rule"
rule_id = rule.get("id", "unknown")

filename = f"{clean_name}_{rule_id}.yar"
# Ensure temp_dir is treated as a Path object
save_path = Path(temp_dir) / filename

with open(save_path, "w", encoding="utf-8") as f:
f.write(yara_rule)
return True
except Exception as e:
logger.error(f"Error writing rule {rule.get('id')}: {e}")
return False

def _update_unprotect_api(self):
logger.info(f"Fetching Unprotect rules from {self.url}")

# Setup temporary workspace
os.makedirs(self.directory.parent, exist_ok=True)
temp_dir_raw = tempfile.mkdtemp(prefix="unprotect_tmp_", dir=str(self.directory.parent))
temp_dir_path = Path(temp_dir_raw)

current_url = self.url
valid_rules = 0

try:
while current_url:
try:
response = requests.get(current_url, timeout=30)
response.raise_for_status()
data = response.json()
except (requests.RequestException, ValueError) as e:
logger.error(f"Failed fetching/decoding Unprotect page {current_url}: {e}")
break

results = data.get("results", [])
if not results:
break

for rule in results:
# Call it via the class name or just remove 'self.'
YaraRepo._write_rule_to_temp(rule, temp_dir_path)

current_url = data.get("next")

# --- VALIDATION BLOCK ---
for rule_file in temp_dir_path.glob("*.yar"):
try:
yara.compile(filepath=str(rule_file))
valid_rules += 1
except (yara.SyntaxError, yara.Error) as e:
logger.error(
"Invalid YARA rule in temp file %s: %s. Discarding.",
rule_file,
e,
)
rule_file.unlink(missing_ok=True)

if valid_rules > 0:
self._finalize_rules(temp_dir_path, valid_rules)
else:
logger.warning("No valid YARA rules were fetched; keeping existing rules.")

except Exception:
logger.exception("Unexpected error during Unprotect rules ingestion")
finally:
shutil.rmtree(temp_dir_raw, ignore_errors=True)

def _finalize_rules(self, temp_dir, count):
"""Moves rules from temp to final destination and cleans up old rules."""
os.makedirs(self.directory, exist_ok=True)

# First, move all new rules into place, keeping track of their names.
new_rule_names = set()
for new_file in temp_dir.glob("*.yar"):
destination = self.directory / new_file.name
shutil.move(str(new_file), str(destination))
new_rule_names.add(new_file.name)

# Only after successfully moving new rules, remove any stale old rules.
for old_file in self.directory.glob("*.yar"):
if old_file.name not in new_rule_names:
old_file.unlink(missing_ok=True)

logger.info(
"Successfully updated Unprotect repository with %d rules in %s",
count,
self.directory,
)

def delete_lock_file(self):
lock_file_path = self.directory / ".git" / "index.lock"
lock_file_path.unlink(missing_ok=False)
lock_file_path.unlink(missing_ok=True)

@property
def compiled_file_name(self):
Expand All @@ -173,6 +279,18 @@ def compiled_paths(self) -> List[PosixPath]:
def is_zip(self):
return self.url.endswith(".zip")

def is_unprotect_api(self):
"""
Return True if the configured URL points to the Unprotect.it
detection rules API endpoint.
"""
parsed = urlparse(self.url)
netloc = parsed.netloc.lower()

# Normalize path to handle optional leading/trailing slash
path = parsed.path.strip("/")
return netloc == "unprotect.it" and path.startswith("api/detection_rules")
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_unprotect_api() currently matches any path starting with api/detection_rules, which can yield false positives (e.g., api/detection_rules_old). Since this branch changes update behavior significantly, it would be safer to match the endpoint exactly (after normalizing slashes) or check path segment boundaries (e.g., equality to api/detection_rules or prefix api/detection_rules/).

Suggested change
return netloc == "unprotect.it" and path.startswith("api/detection_rules")
return netloc == "unprotect.it" and (
path == "api/detection_rules" or path.startswith("api/detection_rules/")
)

Copilot uses AI. Check for mistakes.

@cached_property
def head_branch(self) -> str:
return git.Repo(self.directory).head.ref.name
Expand Down Expand Up @@ -206,29 +324,47 @@ def compile(self) -> List[yara.Rules]:
logger.info(f"Starting compile for {self}")
compiled_rules = []

# We check the specific repo directory and any first-level subdirectories
for directory in self.first_level_directories + [self.directory]:
if directory != self.directory:
# recursive
rules = directory.rglob("*")
rules = directory.rglob("*") # recursive for subfolders
else:
# not recursive
rules = directory.glob("*")
rules = directory.glob("*") # non-recursive for main folder

Comment on lines +327 to +333
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first_level_directories/compiled_paths are @cached_property values derived from the filesystem, but update() mutates the repo contents (git pull/clone, zip extract, Unprotect ingestion). If those cached values were computed before the update (e.g., _update_git() calls self.compiled_paths before pulling), compile() can use a stale directory list and miss newly added first-level subfolders (and therefore skip compiling rules in them). Consider removing caching here or explicitly invalidating these cached properties at the start/end of update() (e.g., self.__dict__.pop("first_level_directories", None) / ...pop("compiled_paths", None) / ...pop("head_branch", None) as appropriate).

Copilot uses AI. Check for mistakes.
valid_rules_path = []
for rule in rules:
if rule.stem.endswith("index") or rule.stem.startswith("index"):
if rule.stem.lower().startswith("index") or rule.stem.lower().endswith("index"):
continue
if rule.suffix in [".yara", ".yar", ".rule"]:
try:
yara.compile(str(rule))
except yara.SyntaxError:
yara.compile(filepath=str(rule))
valid_rules_path.append(rule)
except yara.SyntaxError as e:
logger.warning(f"Syntax error in YARA rule: {rule}: {e}")
continue
else:
valid_rules_path.append(str(rule))
except yara.Error as e:
logger.warning(f"Error compiling YARA rule: {rule}: {e}")
continue

if not valid_rules_path:
continue

logger.info(f"Compiling {len(valid_rules_path)} rules for {self} at {directory}")
compiled_rule = yara.compile(filepaths={str(path): str(path) for path in valid_rules_path})
compiled_rule.save(str(directory / self.compiled_file_name))
compiled_rules.append(compiled_rule)
logger.info(f"Rules {self} saved on file")

try:
compiled_rule = yara.compile(filepaths={str(path): str(path) for path in valid_rules_path})

# Ensure the path exists before saving to avoid "could not open file" error
directory.mkdir(parents=True, exist_ok=True)

save_path = str(directory / self.compiled_file_name)
compiled_rule.save(save_path)

compiled_rules.append(compiled_rule)
logger.info(f"Rules for {self} compiled and saved to {save_path}")
except Exception:
logger.exception("Failed to compile or save YARA rules in %s", directory)

return compiled_rules

def analyze(self, file_path: str, filename: str) -> List[Dict]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest.mock import patch
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

from api_app.analyzers_manager.file_analyzers.yara_scan import YaraScan
from api_app.analyzers_manager.file_analyzers.yara_scan import YaraRepo, YaraScan

from .base_test_class import BaseFileAnalyzerTest

Expand Down Expand Up @@ -32,3 +34,69 @@ def get_mocked_response(self):
],
)
]

def setUp(self):
super().setUp()

self.analyzer = YaraScan(
{
"repositories": [],
"local_rules": "",
"_private_repositories": {},
}
)

@patch("api_app.analyzers_manager.file_analyzers.yara_scan.requests.get")
def test_unprotect_update_downloads_yara_rules(self, mock_get):
"""
Ensure Unprotect API repository downloads YARA rules with a non-empty
yara_rule field and creates .yar files correctly.
"""

# Mock API response
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {
"results": [
{
"id": 1,
"name": "Test Rule",
"yara_rule": "rule test_rule { condition: true }",
},
{
Comment on lines +62 to +66
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new validation behavior that discards syntactically invalid YARA rules (compile failure -> unlink) isn’t exercised here. Add a test case with an invalid yara_rule string and assert that no .yar file remains for it after update().

Copilot uses AI. Check for mistakes.
"id": 2,
"name": "CAPA Rule",
"yara_rule": None,
},
],
"next": None,
}

mock_get.return_value = mock_response

with tempfile.TemporaryDirectory() as tmpdir:
base_dir = Path(tmpdir)

unprotect_repo = YaraRepo(url="https://unprotect.it/api/detection_rules/", directory=base_dir)

# Call update on the correct repo
unprotect_repo.update()

# Verify HTTP request was made correctly
mock_get.assert_called()
called_url = mock_get.call_args[0][0]
self.assertIn("unprotect.it/api/detection_rules/", called_url)

# Verify .yar file was created
created_files = list(base_dir.iterdir())
self.assertEqual(len(created_files), 1)

created_file = created_files[0]
self.assertEqual(created_file.suffix, ".yar")
self.assertEqual(created_file.name, "TestRule_1.yar")

# Verify file content
with open(created_file, "r", encoding="utf-8") as f:
content = f.read()

self.assertEqual(content, "rule test_rule { condition: true }")
Loading