Skip to content

Malware Analysis

xwings edited this page Jul 6, 2025 · 2 revisions

Malware Analysis

Comprehensive guide to using Qiling Framework for professional malware analysis, including advanced evasion detection, behavioral analysis, and automated threat assessment.

Overview

Qiling Framework provides a sophisticated platform for malware analysis that combines the power of dynamic analysis with the safety of emulation. This guide covers enterprise-grade malware analysis techniques, from basic sample triage to advanced persistent threat (APT) analysis.

Malware Analysis Workflow

┌─────────────────────────────────────────────────────────────┐
│                Malware Analysis Pipeline                     │
├─────────────────────────────────────────────────────────────┤
│  Sample Acquisition & Triage                               │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │
│  │   Sample    │ │  Metadata   │ │  Initial    │           │
│  │ Collection  │ │ Extraction  │ │  Assessment │           │
│  └─────────────┘ └─────────────┘ └─────────────┘           │
├─────────────────────────────────────────────────────────────┤
│  Static Analysis                                            │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │
│  │ File Format │ │  String     │ │ Signature   │           │
│  │  Analysis   │ │ Extraction  │ │  Matching   │           │
│  └─────────────┘ └─────────────┘ └─────────────┘           │
├─────────────────────────────────────────────────────────────┤
│  Dynamic Analysis (Qiling)                                 │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │
│  │ Behavioral  │ │  Evasion    │ │ Capability  │           │
│  │  Analysis   │ │ Detection   │ │ Extraction  │           │
│  └─────────────┘ └─────────────┘ └─────────────┘           │
├─────────────────────────────────────────────────────────────┤
│  Advanced Analysis                                          │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │
│  │  Unpacking  │ │   Memory    │ │   Network   │           │
│  │   Analysis  │ │  Forensics  │ │  Analysis   │           │
│  └─────────────┘ └─────────────┘ └─────────────┘           │
├─────────────────────────────────────────────────────────────┤
│  Threat Intelligence & Reporting                           │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │
│  │    IOC      │ │   MITRE     │ │   Report    │           │
│  │ Generation  │ │   ATT&CK    │ │ Generation  │           │
│  └─────────────┘ └─────────────┘ └─────────────┘           │
└─────────────────────────────────────────────────────────────┘

Enterprise Malware Analysis Framework

Automated Malware Analysis System

import os
import json
import hashlib
import time
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from qiling import Qiling
from qiling.const import QL_VERBOSE

@dataclass
class MalwareSample:
    """Malware sample metadata"""
    file_path: str
    md5: str
    sha1: str
    sha256: str
    file_size: int
    file_type: str
    architecture: str
    submission_time: float
    source: str
    tags: List[str]

class EnterprisemalwareAnalyzer:
    def __init__(self, config_path: str):
        self.config = self.load_config(config_path)
        self.analysis_results = {}
        self.threat_indicators = []
        self.behavioral_patterns = {}
        
    def load_config(self, config_path: str) -> Dict:
        """Load analysis configuration"""
        default_config = {
            'analysis_timeout': 300,  # 5 minutes
            'rootfs_paths': {
                'windows_x86': 'rootfs/x86_windows',
                'windows_x64': 'rootfs/x8664_windows',
                'linux_x86': 'rootfs/x86_linux',
                'linux_x64': 'rootfs/x8664_linux'
            },
            'evasion_detection': True,
            'network_simulation': True,
            'unpacking_analysis': True,
            'memory_forensics': True,
            'ioc_extraction': True,
            'mitre_mapping': True
        }
        
        if os.path.exists(config_path):
            with open(config_path, 'r') as f:
                user_config = json.load(f)
                default_config.update(user_config)
        
        return default_config
    
    def analyze_sample(self, sample_path: str, metadata: Optional[Dict] = None) -> Dict:
        """Comprehensive malware sample analysis"""
        
        # Create sample metadata
        sample = self.create_sample_metadata(sample_path, metadata)
        
        # Initialize analysis result
        analysis_result = {
            'sample': sample.__dict__,
            'analysis_start': time.time(),
            'static_analysis': {},
            'dynamic_analysis': {},
            'threat_assessment': {},
            'iocs': [],
            'mitre_techniques': [],
            'recommendations': []
        }
        
        try:
            # Phase 1: Static Analysis
            print(f"[+] Starting static analysis of {sample.file_path}")
            analysis_result['static_analysis'] = self.perform_static_analysis(sample)
            
            # Phase 2: Dynamic Analysis with Qiling
            print(f"[+] Starting dynamic analysis of {sample.file_path}")
            analysis_result['dynamic_analysis'] = self.perform_dynamic_analysis(sample)
            
            # Phase 3: Advanced Analysis
            print(f"[+] Performing advanced analysis of {sample.file_path}")
            analysis_result['advanced_analysis'] = self.perform_advanced_analysis(sample, analysis_result)
            
            # Phase 4: Threat Assessment
            print(f"[+] Generating threat assessment for {sample.file_path}")
            analysis_result['threat_assessment'] = self.generate_threat_assessment(analysis_result)
            
            # Phase 5: IOC and MITRE ATT&CK Mapping
            print(f"[+] Extracting IOCs and mapping to MITRE ATT&CK")
            analysis_result['iocs'] = self.extract_iocs(analysis_result)
            analysis_result['mitre_techniques'] = self.map_mitre_techniques(analysis_result)
            
        except Exception as e:
            analysis_result['error'] = str(e)
            print(f"[-] Analysis failed: {e}")
        
        analysis_result['analysis_end'] = time.time()
        analysis_result['analysis_duration'] = analysis_result['analysis_end'] - analysis_result['analysis_start']
        
        # Store results
        self.analysis_results[sample.sha256] = analysis_result
        
        return analysis_result
    
    def create_sample_metadata(self, file_path: str, metadata: Optional[Dict] = None) -> MalwareSample:
        """Create comprehensive sample metadata"""
        
        with open(file_path, 'rb') as f:
            file_data = f.read()
        
        return MalwareSample(
            file_path=file_path,
            md5=hashlib.md5(file_data).hexdigest(),
            sha1=hashlib.sha1(file_data).hexdigest(),
            sha256=hashlib.sha256(file_data).hexdigest(),
            file_size=len(file_data),
            file_type=self.detect_file_type(file_data),
            architecture=self.detect_architecture(file_data),
            submission_time=time.time(),
            source=metadata.get('source', 'unknown') if metadata else 'unknown',
            tags=metadata.get('tags', []) if metadata else []
        )
    
    def perform_dynamic_analysis(self, sample: MalwareSample) -> Dict:
        """Perform comprehensive dynamic analysis using Qiling"""
        
        # Determine appropriate rootfs
        rootfs = self.select_rootfs(sample.architecture, sample.file_type)
        
        # Initialize Qiling with comprehensive monitoring
        ql = Qiling([sample.file_path], rootfs,
                   verbose=QL_VERBOSE.DEBUG,
                   libcache=True,
                   multithread=True)
        
        # Set up comprehensive analysis modules
        behavior_analyzer = MalwareBehaviorAnalyzer(ql)
        evasion_detector = AntiAnalysisDetector(ql)
        network_analyzer = NetworkBehaviorAnalyzer(ql)
        memory_analyzer = MalwareMemoryAnalyzer(ql)
        capability_extractor = CapabilityExtractor(ql)
        
        # Initialize all analyzers
        behavior_analyzer.setup_monitoring()
        evasion_detector.setup_detection()
        network_analyzer.setup_monitoring()
        memory_analyzer.setup_monitoring()
        capability_extractor.setup_extraction()
        
        # Execute sample with timeout
        try:
            ql.run(timeout=self.config['analysis_timeout'] * 1000000)  # Convert to microseconds
        except Exception as e:
            print(f"[!] Execution completed: {e}")
        
        # Collect analysis results
        return {
            'behavioral_analysis': behavior_analyzer.get_results(),
            'evasion_techniques': evasion_detector.get_results(),
            'network_behavior': network_analyzer.get_results(),
            'memory_analysis': memory_analyzer.get_results(),
            'capabilities': capability_extractor.get_results(),
            'execution_summary': {
                'executed_instructions': getattr(ql, 'instruction_count', 0),
                'api_calls': len(behavior_analyzer.api_calls),
                'file_operations': len(behavior_analyzer.file_operations),
                'registry_operations': len(behavior_analyzer.registry_operations),
                'network_connections': len(network_analyzer.network_connections)
            }
        }

class MalwareBehaviorAnalyzer:
    """Comprehensive malware behavior analysis"""
    
    def __init__(self, ql):
        self.ql = ql
        self.api_calls = []
        self.file_operations = []
        self.registry_operations = []
        self.process_operations = []
        self.persistence_mechanisms = []
        self.privilege_escalation = []
        self.data_exfiltration = []
        
    def setup_monitoring(self):
        """Set up comprehensive behavioral monitoring"""
        
        # File system operations
        self.setup_file_monitoring()
        
        # Registry operations
        self.setup_registry_monitoring()
        
        # Process operations
        self.setup_process_monitoring()
        
        # Network operations
        self.setup_network_monitoring()
        
        # Persistence mechanisms
        self.setup_persistence_detection()
        
        # Privilege escalation
        self.setup_privilege_monitoring()
    
    def setup_file_monitoring(self):
        """Monitor file system operations"""
        
        def hook_createfile(ql, lpFileName, dwDesiredAccess, dwShareMode, 
                           lpSecurityAttributes, dwCreationDisposition, 
                           dwFlagsAndAttributes, hTemplateFile):
            filename = ql.os.utils.read_wstring(lpFileName) if lpFileName else "Unknown"
            
            operation = {
                'api': 'CreateFileW',
                'filename': filename,
                'access': dwDesiredAccess,
                'disposition': dwCreationDisposition,
                'attributes': dwFlagsAndAttributes,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            
            self.file_operations.append(operation)
            self.analyze_file_operation(operation)
            
            return None  # Call original
        
        def hook_writefile(ql, hFile, lpBuffer, nNumberOfBytesToWrite, 
                          lpNumberOfBytesWritten, lpOverlapped):
            if lpBuffer and nNumberOfBytesToWrite > 0:
                try:
                    data = ql.mem.read(lpBuffer, min(nNumberOfBytesToWrite, 1024))
                    
                    operation = {
                        'api': 'WriteFile',
                        'handle': hFile,
                        'size': nNumberOfBytesToWrite,
                        'data_preview': data[:100].hex(),
                        'timestamp': time.time(),
                        'caller_pc': ql.arch.regs.arch_pc
                    }
                    
                    self.file_operations.append(operation)
                    self.analyze_write_operation(data)
                    
                except Exception:
                    pass
            
            return None
        
        def hook_deletefile(ql, lpFileName):
            filename = ql.os.utils.read_wstring(lpFileName) if lpFileName else "Unknown"
            
            operation = {
                'api': 'DeleteFileW',
                'filename': filename,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            
            self.file_operations.append(operation)
            
            return None
        
        self.ql.set_api("CreateFileW", hook_createfile)
        self.ql.set_api("WriteFile", hook_writefile)
        self.ql.set_api("DeleteFileW", hook_deletefile)
    
    def setup_registry_monitoring(self):
        """Monitor registry operations"""
        
        def hook_regcreatekey(ql, hKey, lpSubKey, phkResult):
            subkey = ql.os.utils.read_wstring(lpSubKey) if lpSubKey else "Unknown"
            
            operation = {
                'api': 'RegCreateKeyW',
                'hkey': hKey,
                'subkey': subkey,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            
            self.registry_operations.append(operation)
            self.analyze_registry_operation(operation)
            
            return None
        
        def hook_regsetvalue(ql, hKey, lpValueName, Reserved, dwType, lpData, cbData):
            value_name = ql.os.utils.read_wstring(lpValueName) if lpValueName else "Unknown"
            
            data = None
            if lpData and cbData > 0:
                try:
                    data = ql.mem.read(lpData, min(cbData, 256)).hex()
                except Exception:
                    pass
            
            operation = {
                'api': 'RegSetValueW',
                'hkey': hKey,
                'value_name': value_name,
                'type': dwType,
                'data': data,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            
            self.registry_operations.append(operation)
            self.analyze_registry_operation(operation)
            
            return None
        
        self.ql.set_api("RegCreateKeyW", hook_regcreatekey)
        self.ql.set_api("RegSetValueW", hook_regsetvalue)
    
    def setup_process_monitoring(self):
        """Monitor process operations"""
        
        def hook_createprocess(ql, lpApplicationName, lpCommandLine, 
                              lpProcessAttributes, lpThreadAttributes,
                              bInheritHandles, dwCreationFlags, lpEnvironment,
                              lpCurrentDirectory, lpStartupInfo, lpProcessInformation):
            
            app_name = ql.os.utils.read_wstring(lpApplicationName) if lpApplicationName else "Unknown"
            cmd_line = ql.os.utils.read_wstring(lpCommandLine) if lpCommandLine else "Unknown"
            
            operation = {
                'api': 'CreateProcessW',
                'application': app_name,
                'command_line': cmd_line,
                'creation_flags': dwCreationFlags,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            
            self.process_operations.append(operation)
            self.analyze_process_operation(operation)
            
            return None
        
        self.ql.set_api("CreateProcessW", hook_createprocess)
    
    def analyze_file_operation(self, operation):
        """Analyze file operations for malicious patterns"""
        
        filename = operation.get('filename', '').lower()
        
        # Check for system file modification
        system_paths = ['system32', 'syswow64', 'windows', 'program files']
        if any(path in filename for path in system_paths):
            self.persistence_mechanisms.append({
                'type': 'system_file_modification',
                'details': operation
            })
        
        # Check for startup folder usage
        startup_paths = ['startup', 'run', 'runonce']
        if any(path in filename for path in startup_paths):
            self.persistence_mechanisms.append({
                'type': 'startup_persistence',
                'details': operation
            })
    
    def analyze_registry_operation(self, operation):
        """Analyze registry operations for malicious patterns"""
        
        subkey = operation.get('subkey', '').lower()
        
        # Check for autorun registry keys
        autorun_keys = [
            'run', 'runonce', 'runonceex', 'winlogon',
            'userinit', 'shell', 'load', 'startup'
        ]
        
        if any(key in subkey for key in autorun_keys):
            self.persistence_mechanisms.append({
                'type': 'registry_autorun',
                'details': operation
            })
        
        # Check for security-related registry modifications
        security_keys = ['policies', 'uac', 'firewall', 'defender']
        if any(key in subkey for key in security_keys):
            self.privilege_escalation.append({
                'type': 'security_registry_modification',
                'details': operation
            })
    
    def analyze_process_operation(self, operation):
        """Analyze process operations for malicious patterns"""
        
        cmd_line = operation.get('command_line', '').lower()
        creation_flags = operation.get('creation_flags', 0)
        
        # Check for process hollowing indicators
        if creation_flags & 0x4:  # CREATE_SUSPENDED
            self.privilege_escalation.append({
                'type': 'process_hollowing_indicator',
                'details': operation
            })
        
        # Check for suspicious command line patterns
        suspicious_patterns = [
            'powershell', 'cmd.exe', 'rundll32', 'regsvr32',
            'mshta', 'wscript', 'cscript'
        ]
        
        if any(pattern in cmd_line for pattern in suspicious_patterns):
            self.privilege_escalation.append({
                'type': 'suspicious_process_execution',
                'details': operation
            })
    
    def get_results(self):
        """Get comprehensive behavioral analysis results"""
        
        return {
            'api_calls': len(self.api_calls),
            'file_operations': self.file_operations,
            'registry_operations': self.registry_operations,
            'process_operations': self.process_operations,
            'persistence_mechanisms': self.persistence_mechanisms,
            'privilege_escalation': self.privilege_escalation,
            'behavioral_summary': {
                'total_file_ops': len(self.file_operations),
                'total_registry_ops': len(self.registry_operations),
                'total_process_ops': len(self.process_operations),
                'persistence_techniques': len(self.persistence_mechanisms),
                'escalation_attempts': len(self.privilege_escalation)
            }
        }

class AntiAnalysisDetector:
    """Detect anti-analysis and evasion techniques"""
    
    def __init__(self, ql):
        self.ql = ql
        self.evasion_techniques = []
        self.timing_checks = []
        self.environment_checks = []
        
    def setup_detection(self):
        """Set up anti-analysis detection hooks"""
        
        # Debugger detection
        def hook_isdebuggerpresent(ql):
            self.evasion_techniques.append({
                'technique': 'IsDebuggerPresent',
                'type': 'debugger_detection',
                'address': ql.arch.regs.arch_pc,
                'timestamp': time.time()
            })
            return 0  # Return FALSE
        
        def hook_checkremotedebuggerpresent(ql, hProcess, pbDebuggerPresent):
            self.evasion_techniques.append({
                'technique': 'CheckRemoteDebuggerPresent',
                'type': 'debugger_detection',
                'address': ql.arch.regs.arch_pc,
                'timestamp': time.time()
            })
            if pbDebuggerPresent:
                ql.mem.write(pbDebuggerPresent, b'\x00')
            return 1
        
        # Timing-based detection
        def hook_gettickcount(ql):
            self.timing_checks.append({
                'api': 'GetTickCount',
                'address': ql.arch.regs.arch_pc,
                'timestamp': time.time()
            })
            return 12345678  # Fixed value
        
        def hook_queryperformancecounter(ql, lpPerformanceCount):
            self.timing_checks.append({
                'api': 'QueryPerformanceCounter',
                'address': ql.arch.regs.arch_pc,
                'timestamp': time.time()
            })
            if lpPerformanceCount:
                ql.mem.write(lpPerformanceCount, (87654321).to_bytes(8, 'little'))
            return 1
        
        # Sleep-based evasion
        def hook_sleep(ql, dwMilliseconds):
            if dwMilliseconds > 10000:  # More than 10 seconds
                self.evasion_techniques.append({
                    'technique': 'Long Sleep',
                    'type': 'timing_evasion',
                    'duration': dwMilliseconds,
                    'address': ql.arch.regs.arch_pc,
                    'timestamp': time.time()
                })
                return 0  # Skip sleep
            return None
        
        # Environment checks
        def hook_getcomputername(ql, lpBuffer, nSize):
            self.environment_checks.append({
                'api': 'GetComputerName',
                'type': 'environment_fingerprinting',
                'address': ql.arch.regs.arch_pc,
                'timestamp': time.time()
            })
            
            # Provide realistic computer name
            computer_name = "DESKTOP-ANALYSIS"
            if lpBuffer:
                ql.mem.write(lpBuffer, computer_name.encode('utf-16le'))
            return 1
        
        def hook_getusername(ql, lpBuffer, nSize):
            self.environment_checks.append({
                'api': 'GetUserName',
                'type': 'environment_fingerprinting',
                'address': ql.arch.regs.arch_pc,
                'timestamp': time.time()
            })
            
            # Provide realistic username
            username = "Administrator"
            if lpBuffer:
                ql.mem.write(lpBuffer, username.encode('utf-16le'))
            return 1
        
        # Set up hooks
        self.ql.set_api("IsDebuggerPresent", hook_isdebuggerpresent)
        self.ql.set_api("CheckRemoteDebuggerPresent", hook_checkremotedebuggerpresent)
        self.ql.set_api("GetTickCount", hook_gettickcount)
        self.ql.set_api("QueryPerformanceCounter", hook_queryperformancecounter)
        self.ql.set_api("Sleep", hook_sleep)
        self.ql.set_api("GetComputerNameW", hook_getcomputername)
        self.ql.set_api("GetUserNameW", hook_getusername)
    
    def get_results(self):
        """Get anti-analysis detection results"""
        
        return {
            'evasion_techniques': self.evasion_techniques,
            'timing_checks': self.timing_checks,
            'environment_checks': self.environment_checks,
            'evasion_summary': {
                'total_evasion_attempts': len(self.evasion_techniques),
                'debugger_checks': len([t for t in self.evasion_techniques if t['type'] == 'debugger_detection']),
                'timing_evasion': len([t for t in self.evasion_techniques if t['type'] == 'timing_evasion']),
                'environment_fingerprinting': len(self.environment_checks)
            }
        }

class NetworkBehaviorAnalyzer:
    """Analyze network behavior and C2 communication"""
    
    def __init__(self, ql):
        self.ql = ql
        self.network_connections = []
        self.dns_queries = []
        self.http_requests = []
        self.suspicious_domains = []
        
    def setup_monitoring(self):
        """Set up network behavior monitoring"""
        
        # Socket operations
        def hook_socket(ql, af, type, protocol):
            connection = {
                'api': 'socket',
                'family': af,
                'type': type,
                'protocol': protocol,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            self.network_connections.append(connection)
            return None
        
        def hook_connect(ql, s, name, namelen):
            # Parse sockaddr structure for connection details
            try:
                if namelen >= 8:
                    sockaddr_data = ql.mem.read(name, namelen)
                    family = struct.unpack('<H', sockaddr_data[:2])[0]
                    
                    if family == 2:  # AF_INET
                        port = struct.unpack('>H', sockaddr_data[2:4])[0]
                        ip_bytes = sockaddr_data[4:8]
                        ip_addr = '.'.join(str(b) for b in ip_bytes)
                        
                        connection = {
                            'api': 'connect',
                            'ip': ip_addr,
                            'port': port,
                            'timestamp': time.time(),
                            'caller_pc': ql.arch.regs.arch_pc
                        }
                        
                        self.network_connections.append(connection)
                        self.analyze_connection(connection)
                        
            except Exception:
                pass
            
            return None
        
        # HTTP operations
        def hook_internetopen(ql, lpszAgent, dwAccessType, lpszProxy, lpszProxyBypass, dwFlags):
            user_agent = ql.os.utils.read_wstring(lpszAgent) if lpszAgent else "Unknown"
            
            request = {
                'api': 'InternetOpen',
                'user_agent': user_agent,
                'access_type': dwAccessType,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            
            self.http_requests.append(request)
            self.analyze_user_agent(user_agent)
            
            return None
        
        def hook_internetconnect(ql, hInternet, lpszServerName, nServerPort, 
                                lpszUserName, lpszPassword, dwService, dwFlags, dwContext):
            server = ql.os.utils.read_wstring(lpszServerName) if lpszServerName else "Unknown"
            
            connection = {
                'api': 'InternetConnect',
                'server': server,
                'port': nServerPort,
                'service': dwService,
                'timestamp': time.time(),
                'caller_pc': ql.arch.regs.arch_pc
            }
            
            self.network_connections.append(connection)
            self.analyze_domain(server)
            
            return None
        
        self.ql.set_api("socket", hook_socket)
        self.ql.set_api("connect", hook_connect)
        self.ql.set_api("InternetOpenW", hook_internetopen)
        self.ql.set_api("InternetConnectW", hook_internetconnect)
    
    def analyze_connection(self, connection):
        """Analyze network connection for suspicious patterns"""
        
        ip = connection.get('ip', '')
        port = connection.get('port', 0)
        
        # Check for suspicious ports
        suspicious_ports = [4444, 5555, 6666, 8080, 9999]
        if port in suspicious_ports:
            self.suspicious_domains.append({
                'type': 'suspicious_port',
                'details': connection
            })
        
        # Check for private IP ranges (potential C2)
        if self.is_public_ip(ip):
            self.suspicious_domains.append({
                'type': 'external_connection',
                'details': connection
            })
    
    def analyze_domain(self, domain):
        """Analyze domain for suspicious characteristics"""
        
        # Check domain length and characteristics
        if len(domain) > 20 and domain.count('.') == 1:
            # Long domain with single dot - potential DGA
            self.suspicious_domains.append({
                'type': 'potential_dga',
                'domain': domain
            })
        
        # Check for suspicious TLDs
        suspicious_tlds = ['.tk', '.ml', '.ga', '.cf', '.pw', '.top']
        if any(domain.endswith(tld) for tld in suspicious_tlds):
            self.suspicious_domains.append({
                'type': 'suspicious_tld',
                'domain': domain
            })
    
    def analyze_user_agent(self, user_agent):
        """Analyze HTTP user agent for suspicious patterns"""
        
        # Check for common malware user agents
        malware_agents = ['curl', 'wget', 'python', 'powershell']
        if any(agent in user_agent.lower() for agent in malware_agents):
            self.suspicious_domains.append({
                'type': 'suspicious_user_agent',
                'user_agent': user_agent
            })
    
    def is_public_ip(self, ip):
        """Check if IP address is public (not private/reserved)"""
        
        try:
            octets = [int(x) for x in ip.split('.')]
            
            # Private ranges
            if octets[0] == 10:
                return False
            if octets[0] == 172 and 16 <= octets[1] <= 31:
                return False
            if octets[0] == 192 and octets[1] == 168:
                return False
            if octets[0] == 127:  # Loopback
                return False
            
            return True
            
        except:
            return False
    
    def get_results(self):
        """Get network behavior analysis results"""
        
        return {
            'network_connections': self.network_connections,
            'dns_queries': self.dns_queries,
            'http_requests': self.http_requests,
            'suspicious_indicators': self.suspicious_domains,
            'network_summary': {
                'total_connections': len(self.network_connections),
                'unique_ips': len(set(conn.get('ip', '') for conn in self.network_connections)),
                'unique_domains': len(set(conn.get('server', '') for conn in self.network_connections)),
                'suspicious_indicators': len(self.suspicious_domains)
            }
        }

# Usage example for enterprise malware analysis
def analyze_malware_collection(samples_directory: str, output_directory: str):
    """Analyze a collection of malware samples"""
    
    analyzer = EnterprisemalwareAnalyzer('config/malware_analysis.json')
    
    # Process all samples in directory
    for sample_path in Path(samples_directory).glob('*'):
        if sample_path.is_file():
            print(f"[+] Analyzing {sample_path}")
            
            try:
                result = analyzer.analyze_sample(str(sample_path))
                
                # Save individual report
                output_file = Path(output_directory) / f"{result['sample']['sha256']}_report.json"
                with open(output_file, 'w') as f:
                    json.dump(result, f, indent=2, default=str)
                
                print(f"[+] Analysis complete: {output_file}")
                
            except Exception as e:
                print(f"[-] Failed to analyze {sample_path}: {e}")
    
    # Generate summary report
    summary = analyzer.generate_collection_summary()
    summary_file = Path(output_directory) / "collection_summary.json"
    with open(summary_file, 'w') as f:
        json.dump(summary, f, indent=2, default=str)
    
    print(f"[+] Collection analysis complete: {summary_file}")

if __name__ == "__main__":
    # Example usage
    analyze_malware_collection("samples/", "reports/")

This comprehensive malware analysis framework provides enterprise-grade capabilities for automated malware triage, behavioral analysis, and threat intelligence generation. The system supports advanced evasion detection, network behavior analysis, and MITRE ATT&CK framework mapping for professional security operations.

Clone this wiki locally