-
Notifications
You must be signed in to change notification settings - Fork 152
New Scanner
This guide explains how to add your own custom scanner (analyzer) to the LitterBox malware analysis framework.
LitterBox uses a 3-level inheritance structure for analyzers:
- BaseAnalyzer (abstract in manager.py)
- StaticAnalyzer/DynamicAnalyzer (in respective base.py files)
- Specific analyzer implementations (your custom analyzer)
Determine whether you need a static analyzer (for files) or dynamic analyzer (for processes):
- Static Analyzers: Analyze file content without execution
- Dynamic Analyzers: Monitor running processes by PID
Create a new Python file in the appropriate directory:
- Static:
app/analyzers/static/your_analyzer_name.py - Dynamic:
app/analyzers/dynamic/your_analyzer_name.py
# app/analyzers/static/your_analyzer_name.py
import subprocess
import os
from .base import StaticAnalyzer
class YourAnalyzer(StaticAnalyzer):
def __init__(self, config):
super().__init__(config)
# Optional: set up logging
# self.logger = logging.getLogger("LitterBox")
def analyze(self, file_path):
"""
Analyze a file and store results in self.results
"""
try:
# Get tool configuration
tool_config = self.config['analysis']['static']['your_analyzer_name']
# Build and execute command
command = tool_config['command'].format(
tool_path=os.path.abspath(tool_config['tool_path']),
file_path=os.path.abspath(file_path)
)
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
stdout, stderr = process.communicate(timeout=tool_config.get('timeout', 300))
# Parse output
findings = self._parse_output(stdout)
# Store results
self.results = {
'status': 'completed' if process.returncode == 0 else 'failed',
'scan_info': {
'target': file_path,
'tool': 'YourAnalyzer'
},
'findings': findings,
'errors': stderr if stderr else None
}
except Exception as e:
self.results = {
'status': 'error',
'error': str(e)
}
def _parse_output(self, output):
"""
Parse your tool's output into a structured format.
The exact structure will depend on your tool's output format.
"""
findings = {} # Create appropriate structure based on your tool
# Example parsing logic:
# for line in output.splitlines():
# if "Finding:" in line:
# # Extract information
return findings
def cleanup(self):
"""Most static analyzers don't need cleanup"""
pass# app/analyzers/dynamic/your_analyzer_name.py
import subprocess
import os
from .base import DynamicAnalyzer
class YourDynamicAnalyzer(DynamicAnalyzer):
def __init__(self, config):
super().__init__(config)
# self.pid will be set in analyze()
def analyze(self, pid):
"""
Analyze a running process by PID
"""
self.pid = pid
try:
# Get configuration
tool_config = self.config['analysis']['dynamic']['your_analyzer_name']
# Build command
command = tool_config['command'].format(
tool_path=tool_config['tool_path'],
pid=pid
)
# Execute command
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
stdout, stderr = process.communicate(timeout=tool_config.get('timeout', 300))
# Parse output
findings = self._parse_output(stdout)
# Store results
self.results = {
'status': 'completed' if process.returncode == 0 else 'failed',
'findings': findings,
'errors': stderr if stderr else None
}
except Exception as e:
self.results = {
'status': 'error',
'error': str(e)
}
def _parse_output(self, output):
"""
Parse your tool's output into a structured format.
"""
findings = {}
# Your parsing logic here
return findings
def cleanup(self):
"""Clean up any resources if needed"""
passAdd your analyzer configuration to Config/config.yaml:
analysis:
static:
your_analyzer_name:
enabled: true
tool_path: ".\\Scanners\\YourTool\\your_tool.exe"
command: "{tool_path} -options {file_path}"
timeout: 120analysis:
dynamic:
your_analyzer_name:
enabled: true
tool_path: ".\\Scanners\\YourTool\\your_tool.exe"
command: "{tool_path} -options {pid}"
timeout: 120Import and register your analyzer in app/analyzers/manager.py:
# Import at top of file
from .static.your_analyzer_name import YourAnalyzer
# OR
from .dynamic.your_analyzer_name import YourDynamicAnalyzer
class AnalysisManager:
# Add to appropriate dictionary
STATIC_ANALYZERS = {
'yara': YaraStaticAnalyzer,
'checkplz': CheckPlzAnalyzer,
'stringnalyzer': StringsAnalyzer,
'your_analyzer_name': YourAnalyzer # Add here - key must match config name
}
DYNAMIC_ANALYZERS = {
'yara': YaraDynamicAnalyzer,
'pe_sieve': PESieveAnalyzer,
'moneta': MonetaAnalyzer,
'patriot': PatriotAnalyzer,
'hsb': HSBAnalyzer,
'rededr': RedEdrAnalyzer,
'your_analyzer_name': YourDynamicAnalyzer # Add here - key must match config name
}The standard pattern uses Python string formatting for commands:
-
{tool_path}- Path to your tool executable -
{file_path}- For static analyzers, path to the file -
{pid}- For dynamic analyzers, process ID to analyze
Always wrap analysis in try-except blocks and provide informative error messages:
try:
# Analysis code
except Exception as e:
self.results = {
'status': 'error',
'error': str(e)
}Follow this pattern for consistency with other analyzers:
self.results = {
'status': 'completed', # or 'failed' or 'error'
'findings': { # Structure depends on your analyzer
# Tool-specific findings
},
'errors': stderr if stderr else None
}Always use os.path.abspath() for file paths to prevent relative path issues:
tool_path = os.path.abspath(tool_config['tool_path'])
file_path = os.path.abspath(file_path)The _parse_output() method structure varies by analyzer but should convert raw tool output to a structured format:
- For line-based output: See
MonetaAnalyzer._parse_output() - For JSON output: See
HollowsHunterAnalyzer.analyze() - For complex parsing: See
HSBAnalyzer._parse_output()
Here's a complete example of a static analyzer for calculating file hashes:
# app/analyzers/static/hash_analyzer.py
import hashlib
import os
from .base import StaticAnalyzer
class HashAnalyzer(StaticAnalyzer):
def analyze(self, file_path):
"""Calculate various hashes for the input file."""
try:
tool_config = self.config['analysis']['static'].get('hash_analyzer', {})
block_size = tool_config.get('block_size', 65536)
hashes = self._calculate_hashes(file_path, block_size)
self.results = {
'status': 'completed',
'scan_info': {
'target': file_path,
'tool': 'HashAnalyzer'
},
'findings': {
'hashes': hashes,
'file_size': os.path.getsize(file_path)
},
'errors': None
}
except Exception as e:
self.results = {
'status': 'error',
'error': str(e)
}
def _calculate_hashes(self, file_path, block_size):
"""Calculate MD5, SHA1, and SHA256 hashes of a file."""
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
for block in iter(lambda: f.read(block_size), b''):
md5.update(block)
sha1.update(block)
sha256.update(block)
return {
'md5': md5.hexdigest(),
'sha1': sha1.hexdigest(),
'sha256': sha256.hexdigest()
}
def cleanup(self):
passConfiguration in config.yaml:
analysis:
static:
hash_analyzer:
enabled: true
block_size: 65536