diff --git a/docs/README.md b/docs/README.md index af9011092..f3da2cb41 100644 --- a/docs/README.md +++ b/docs/README.md @@ -55,10 +55,11 @@ usage: sherlock [-h] [--version] [--verbose] [--folderoutput FOLDEROUTPUT] [--output OUTPUT] [--tor] [--unique-tor] [--csv] [--xlsx] [--site SITE_NAME] [--proxy PROXY_URL] [--json JSON_FILE] [--timeout TIMEOUT] [--print-all] [--print-found] [--no-color] - [--browse] [--local] [--nsfw] + [--browse] [--local] [--nsfw] [--skip-cache] [--ignore-cache] + [--cache-duration CACHE_DURATION] USERNAMES [USERNAMES ...] -Sherlock: Find Usernames Across Social Networks (Version 0.14.3) +Sherlock: Find Usernames Across Social Networks (Version 0.16.0) positional arguments: USERNAMES One or more usernames to check with social networks. @@ -96,6 +97,10 @@ optional arguments: --browse, -b Browse to all results on default browser. --local, -l Force the use of the local data.json file. --nsfw Include checking of NSFW sites from default list. + --no-cache Disable caching of results (don't read or write cache) + --force-check Ignore cached results and force fresh checks for all sites + --cache-duration CACHE_DURATION + Cache duration in seconds (default: 86400 = 24 hours) ``` ## Apify Actor Usage [![Sherlock Actor](https://apify.com/actor-badge?actor=netmilk/sherlock)](https://apify.com/netmilk/sherlock?fpr=sherlock) diff --git a/pyproject.toml b/pyproject.toml index 45dc683d6..665c45b85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ stem = "^1.8.0" pandas = "^2.2.1" openpyxl = "^3.0.10" tomli = "^2.2.1" +platformdirs = "^4.4.0" [tool.poetry.group.dev.dependencies] jsonschema = "^4.0.0" @@ -62,3 +63,4 @@ defusedxml = "^0.7.1" [tool.poetry.scripts] sherlock = 'sherlock_project.sherlock:main' +sherlock-cache = 'sherlock_project.cache_cli:main' diff --git a/sherlock_project/cache.py b/sherlock_project/cache.py new file mode 100644 index 000000000..5c1e4bfc6 --- /dev/null +++ b/sherlock_project/cache.py @@ -0,0 +1,418 @@ +""" +Sherlock Cache Module + +This module handles SQLite-based caching for username lookup results. +Uses platform-specific cache directories following XDG Base Directory spec. +""" + +import os +import sqlite3 +import time +from pathlib import Path +from typing import Optional + +from platformdirs import user_cache_dir + +from sherlock_project.result import QueryStatus + + +# Database schema version (increment when schema changes) +SCHEMA_VERSION = 1 + + +class SherlockCache: + """ + Manages SQLite cache for Sherlock results. + + Uses platform-specific cache directories: + - Linux/macOS: ~/.cache/sherlock/cache.sqlite3 + - Windows: %LOCALAPPDATA%\\sherlock\\cache.sqlite3 + + Implements parameterized queries to prevent SQL injection. + """ + + def __init__( + self, + cache_path: Optional[str] = None, + cache_duration: int = 86400 + ) -> None: + """ + Initialize the cache. + + Args: + cache_path: Custom path to SQLite database. If None, uses platform default. + Can be full path with filename or directory (will add cache.sqlite3) + cache_duration: Cache TTL in seconds (default: 86400 = 24 hours) + + Raises: + ValueError: If cache_duration <= 0 or cache_path is invalid + RuntimeError: If database initialization fails + """ + if cache_duration <= 0: + raise ValueError("cache_duration must be positive") + + self.cache_duration = cache_duration + + # Determine cache path + if cache_path is None: + # Use environment variable if set, otherwise platform default + cache_path = os.environ.get('SHERLOCK_CACHE_PATH') + + if cache_path is None: + # Use platform-specific cache directory + cache_dir = Path(user_cache_dir("sherlock", "sherlock_project")) + cache_path = str(cache_dir / "cache.sqlite3") + else: + # User provided path - check if it's a directory or full path + cache_path_obj = Path(cache_path) + if cache_path_obj.is_dir() or (not cache_path_obj.suffix): + # It's a directory, add filename + cache_path = str(cache_path_obj / "cache.sqlite3") + + # Validate and create directory + cache_path_obj = Path(cache_path).resolve() + + try: + cache_path_obj.parent.mkdir(parents=True, exist_ok=True) + except (OSError, PermissionError) as e: + raise RuntimeError(f"Cannot create cache directory: {e}") from e + + self.cache_path = str(cache_path_obj) + self._init_database() + + def _init_database(self) -> None: + """ + Initialize the SQLite database with required tables. + Runs migrations if needed. + + Raises: + RuntimeError: If database initialization fails + """ + try: + with sqlite3.connect(self.cache_path) as conn: + cursor = conn.cursor() + + # Create results table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS results ( + username TEXT NOT NULL, + site TEXT NOT NULL, + status TEXT NOT NULL, + url TEXT, + timestamp INTEGER NOT NULL, + cache_duration INTEGER NOT NULL DEFAULT 86400, + PRIMARY KEY (username, site) + ) + ''') + + # Create index for faster timestamp queries + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_timestamp + ON results(timestamp) + ''') + + conn.commit() + + # Run migrations + self._migrate_schema(conn) + + except sqlite3.Error as e: + raise RuntimeError(f"Failed to initialize cache database: {e}") from e + + def _migrate_schema(self, conn: sqlite3.Connection) -> None: + """ + Handle database schema migrations using PRAGMA user_version. + + Args: + conn: Active database connection + + Raises: + RuntimeError: If migration fails + """ + cursor = conn.cursor() + + # Get current schema version + cursor.execute("PRAGMA user_version") + current_version = cursor.fetchone()[0] + + if current_version == SCHEMA_VERSION: + # Already up to date + return + + if current_version == 0: + # Fresh database or pre-versioning database + # Check if cache_duration column exists (migration from v0) + cursor.execute("PRAGMA table_info(results)") + columns = [row[1] for row in cursor.fetchall()] + + if 'cache_duration' not in columns: + # Migrate from v0: Add cache_duration column + try: + cursor.execute(''' + ALTER TABLE results + ADD COLUMN cache_duration INTEGER NOT NULL DEFAULT 86400 + ''') + conn.commit() + except sqlite3.OperationalError: + # Column already exists (shouldn't happen, but be safe) + pass + + # Add future migrations here as elif current_version == X: + + # Update schema version + cursor.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.commit() + + def get( + self, + username: str, + site: str + ) -> Optional[dict[str, QueryStatus | str | int]]: + """ + Retrieve cached result if not expired. + + Args: + username: Username to lookup + site: Site name + + Returns: + Dictionary with status, url, timestamp or None if expired/missing + """ + # Validate inputs + self._validate_input(username, "username") + self._validate_input(site, "site") + + with sqlite3.connect(self.cache_path) as conn: + cursor = conn.cursor() + + # Parameterized query prevents SQL injection + cursor.execute( + ''' + SELECT status, url, timestamp, cache_duration + FROM results + WHERE username = ? AND site = ? + ''', + (username, site) + ) + + result = cursor.fetchone() + + if result is None: + return None + + status_str, url, timestamp, cached_duration = result + current_time = int(time.time()) + + # Check expiration using ORIGINAL cache_duration + if current_time - timestamp > cached_duration: + return None + + # Validate status enum + try: + status = QueryStatus[status_str] + except KeyError: + return None + + return { + 'status': status, + 'url': url, + 'timestamp': timestamp + } + + def set( + self, + username: str, + site: str, + status: QueryStatus, + url: Optional[str] = None + ) -> None: + """ + Store result in cache. + + Args: + username: Username + site: Site name + status: Query status + url: Profile URL if found + """ + # Validate inputs + self._validate_input(username, "username") + self._validate_input(site, "site") + + if url is not None: + if len(url) > 2048: + raise ValueError("URL exceeds maximum length (2048)") + if '\x00' in url: + raise ValueError("URL contains null byte") + + current_time = int(time.time()) + + with sqlite3.connect(self.cache_path) as conn: + cursor = conn.cursor() + + # Parameterized query prevents SQL injection + cursor.execute( + ''' + INSERT OR REPLACE INTO results + (username, site, status, url, timestamp, cache_duration) + VALUES (?, ?, ?, ?, ?, ?) + ''', + (username, site, status.name, url, current_time, self.cache_duration) + ) + + conn.commit() + + def set_batch( + self, + results: list[tuple[str, str, QueryStatus, Optional[str]]] + ) -> None: + """ + Store multiple results in cache (for post-run bulk insert). + + Args: + results: List of (username, site, status, url) tuples + """ + if not results: + return + + current_time = int(time.time()) + + with sqlite3.connect(self.cache_path) as conn: + cursor = conn.cursor() + + # Prepare batch data + batch_data = [ + (username, site, status.name, url, current_time, self.cache_duration) + for username, site, status, url in results + ] + + # Batch insert + cursor.executemany( + ''' + INSERT OR REPLACE INTO results + (username, site, status, url, timestamp, cache_duration) + VALUES (?, ?, ?, ?, ?, ?) + ''', + batch_data + ) + + conn.commit() + + def clear( + self, + username: Optional[str] = None, + site: Optional[str] = None + ) -> None: + """ + Clear cache entries. + + Args: + username: Clear specific username (None = all) + site: Clear specific site (None = all) + """ + # Validate if provided + if username is not None: + self._validate_input(username, "username") + if site is not None: + self._validate_input(site, "site") + + with sqlite3.connect(self.cache_path) as conn: + cursor = conn.cursor() + + # Parameterized queries + if username and site: + cursor.execute( + 'DELETE FROM results WHERE username = ? AND site = ?', + (username, site) + ) + elif username: + cursor.execute( + 'DELETE FROM results WHERE username = ?', + (username,) + ) + elif site: + cursor.execute( + 'DELETE FROM results WHERE site = ?', + (site,) + ) + else: + cursor.execute('DELETE FROM results') + + conn.commit() + + def cleanup_expired(self) -> None: + """Remove expired entries based on their original TTL.""" + current_time = int(time.time()) + + with sqlite3.connect(self.cache_path) as conn: + cursor = conn.cursor() + + # Delete where (now - timestamp) > original cache_duration + cursor.execute( + ''' + DELETE FROM results + WHERE (? - timestamp) > cache_duration + ''', + (current_time,) + ) + + conn.commit() + + def get_stats(self) -> dict[str, str | int]: + """ + Get cache statistics. + + Returns: + Dictionary with total_entries, valid_entries, expired_entries, cache_path + """ + with sqlite3.connect(self.cache_path) as conn: + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(*) FROM results') + total = cursor.fetchone()[0] + + current_time = int(time.time()) + + # Count valid (non-expired) entries + cursor.execute( + ''' + SELECT COUNT(*) FROM results + WHERE (? - timestamp) <= cache_duration + ''', + (current_time,) + ) + valid = cursor.fetchone()[0] + + return { + 'total_entries': total, + 'valid_entries': valid, + 'expired_entries': total - valid, + 'cache_path': self.cache_path + } + + @staticmethod + def _validate_input(value: str, field_name: str) -> None: + """ + Validate username/site input. + + Args: + value: Input to validate + field_name: Name for error messages + + Raises: + ValueError: If input is invalid + """ + if not value: + raise ValueError(f"{field_name} cannot be empty") + + if len(value) > 255: + raise ValueError(f"{field_name} exceeds maximum length (255)") + + # Reject null bytes and control characters (except whitespace) + if '\x00' in value: + raise ValueError(f"{field_name} contains null byte") + + # Check for other dangerous control characters + for char in value: + if ord(char) < 32 and char not in '\t\n\r': + raise ValueError(f"{field_name} contains invalid control characters") diff --git a/sherlock_project/cache_cli.py b/sherlock_project/cache_cli.py new file mode 100644 index 000000000..06fecce04 --- /dev/null +++ b/sherlock_project/cache_cli.py @@ -0,0 +1,108 @@ +""" +Sherlock Cache Management CLI + +Utility for managing Sherlock's SQLite cache. +""" + +import argparse +import sys + +from colorama import Fore, Style + +from sherlock_project.cache import SherlockCache + + +def main() -> None: + """Main entry point for cache management CLI.""" + parser = argparse.ArgumentParser( + prog="sherlock-cache", + description="Manage Sherlock's result cache" + ) + + subparsers = parser.add_subparsers( + dest="command", + help="Cache management commands", + required=True + ) + + # Clear subcommand + clear_parser = subparsers.add_parser( + "clear", + help="Clear cache entries" + ) + clear_parser.add_argument( + "--username", + help="Clear cache for specific username only" + ) + clear_parser.add_argument( + "--site", + help="Clear cache for specific site only" + ) + + # Stats subcommand + subparsers.add_parser( + "stats", + help="Show cache statistics" + ) + + # Cleanup subcommand + subparsers.add_parser( + "cleanup", + help="Remove expired cache entries" + ) + + args = parser.parse_args() + + # Initialize cache + try: + cache = SherlockCache() + except (ValueError, RuntimeError) as e: + print(f"{Fore.RED}✗{Style.RESET_ALL} Cache initialization failed: {e}") + sys.exit(1) + + # Execute command + if args.command == "clear": + username = getattr(args, 'username', None) + site = getattr(args, 'site', None) + + try: + cache.clear(username=username, site=site) + + if username and site: + print( + f"{Fore.GREEN}✓{Style.RESET_ALL} " + f"Cleared cache for {username} on {site}" + ) + elif username: + print( + f"{Fore.GREEN}✓{Style.RESET_ALL} " + f"Cleared all cache for username: {username}" + ) + elif site: + print( + f"{Fore.GREEN}✓{Style.RESET_ALL} " + f"Cleared all cache for site: {site}" + ) + else: + print(f"{Fore.GREEN}✓{Style.RESET_ALL} Cleared entire cache") + except ValueError as e: + print(f"{Fore.RED}✗{Style.RESET_ALL} Error: {e}") + sys.exit(1) + + elif args.command == "stats": + stats = cache.get_stats() + print(f"\n{Style.BRIGHT}Cache Statistics:{Style.RESET_ALL}") + print(f" Cache Path: {stats['cache_path']}") + print(f" Total Entries: {stats['total_entries']}") + print( + f" Valid Entries: " + f"{Fore.GREEN}{stats['valid_entries']}{Style.RESET_ALL}" + ) + print( + f" Expired Entries: " + f"{Fore.YELLOW}{stats['expired_entries']}{Style.RESET_ALL}\n" + ) + + elif args.command == "cleanup": + cache.cleanup_expired() + print(f"{Fore.GREEN}✓{Style.RESET_ALL} Cleaned up expired cache entries") diff --git a/sherlock_project/sherlock.py b/sherlock_project/sherlock.py index 75b3e3d70..c828fc8fc 100644 --- a/sherlock_project/sherlock.py +++ b/sherlock_project/sherlock.py @@ -41,6 +41,7 @@ from sherlock_project.notify import QueryNotify from sherlock_project.notify import QueryNotifyPrint from sherlock_project.sites import SitesInformation +from sherlock_project.cache import SherlockCache from colorama import init from argparse import ArgumentTypeError @@ -174,7 +175,9 @@ def sherlock( dump_response: bool = False, proxy: Optional[str] = None, timeout: int = 60, -) -> dict[str, dict[str, str | QueryResult]]: + cache: Optional[SherlockCache] = None, + ignore_cache: bool = False, +) -> dict[str, dict[str, str] | QueryResult]: """Run Sherlock Analysis. Checks for existence of username on various social media sites. @@ -204,6 +207,9 @@ def sherlock( there was an HTTP error when checking for existence. """ + """Run Sherlock Analysis with caching support.""" + + # Notify caller that we are starting the query. query_notify.start(username) @@ -230,6 +236,31 @@ def sherlock( # Results from analysis of this specific site results_site = {"url_main": net_info.get("urlMain")} + # Check cache first (if enabled and not ignoring cache) + if cache and not ignore_cache: + cached_result = cache.get(username, social_network) + if cached_result: + # Use cached result + result = QueryResult( + username=username, + site_name=social_network, + site_url_user=cached_result.get("url"), + status=cached_result["status"], + query_time=0, # Cached, no query time + context="Cached result" + ) + query_notify.update(result) + + # Store in results_total + results_site["status"] = result + results_site["url_main"] = net_info.get("urlMain") + results_site["url_user"] = result.site_url_user + results_site["http_status"] = "" + results_site["response_text"] = "" + results_total[social_network] = results_site + + continue # Skip to next site + # Record URL of main site # A user agent is needed because some sites don't return the correct @@ -489,6 +520,7 @@ def sherlock( ) query_notify.update(result) + # Save status of request results_site["status"] = result @@ -499,6 +531,22 @@ def sherlock( # Add this site's results into final dictionary with all of the other results. results_total[social_network] = results_site + # Bulk cache results after all checks complete (prevents race conditions) + if cache: + cache_results = [ + (username, site, result.status, result.site_url_user if result.status == QueryStatus.CLAIMED else None) + for site, result_dict in results_total.items() + if "status" in result_dict + for result in [result_dict["status"]] + if result.status in (QueryStatus.CLAIMED, QueryStatus.AVAILABLE) + ] + if cache_results: + try: + cache.set_batch(cache_results) + except Exception as e: + # Don't fail the entire run if caching fails + query_notify.warning(f"Failed to cache results: {e}") + return results_total @@ -675,6 +723,29 @@ def main(): help="Include checking of NSFW sites from default list.", ) + parser.add_argument( + "--skip-cache", + action="store_true", + dest="skip_cache", + help="Disable result caching (cache will not be read or written)." + ) + + parser.add_argument( + "--ignore-cache", + action="store_true", + dest="ignore_cache", + help="Ignore cached results and force fresh checks (cache will still be updated)." + ) + + parser.add_argument( + "--cache-duration", + action="store", + type=int, + dest="cache_duration", + default=86400, + help="Cache duration in seconds (default: 86400 = 24 hours)", + ) + # TODO deprecated in favor of --txt, retained for workflow compatibility, to be removed # in future release parser.add_argument( @@ -812,6 +883,25 @@ def main(): result=None, verbose=args.verbose, print_all=args.print_all, browse=args.browse ) + # Initialize cache if enabled + cache = None + if not args.skip_cache: + # Check environment variable for cache disable + cache_disabled = os.environ.get('SHERLOCK_CACHE_DISABLE', '').lower() in ('true', '1', 'yes') + + if not cache_disabled: + # Get cache TTL from environment or args + cache_ttl = int(os.environ.get('SHERLOCK_CACHE_TTL', args.cache_duration)) + + # Get custom cache path from environment + cache_path = os.environ.get('SHERLOCK_CACHE_PATH', None) + + try: + cache = SherlockCache(cache_path=cache_path, cache_duration=cache_ttl) + cache.cleanup_expired() # Clean up old entries + except (ValueError, RuntimeError) as e: + query_notify.warning(f"Failed to initialize cache: {e}") + # Run report on all specified users. all_usernames = [] for username in args.username: @@ -828,6 +918,8 @@ def main(): dump_response=args.dump_response, proxy=args.proxy, timeout=args.timeout, + cache=cache, + ignore_cache=args.ignore_cache, ) if args.output: diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 000000000..d66ec4ff6 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,386 @@ +"""Tests for cache functionality using mocks.""" + +import time +import unittest +from unittest.mock import MagicMock, Mock, patch + +from sherlock_project.cache import SherlockCache +from sherlock_project.result import QueryStatus + + +class TestCacheInitialization(unittest.TestCase): + """Test cache initialization and security.""" + + @patch('sherlock_project.cache.Path.mkdir') + @patch('sherlock_project.cache.sqlite3') + @patch('sherlock_project.cache.user_cache_dir') + def test_init_creates_database( + self, + mock_cache_dir: Mock, + mock_sqlite: Mock, + mock_mkdir: Mock + ) -> None: + """Test database initialization.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache() + + assert cache is not None + + # Verify database operations + assert mock_cursor.execute.call_count >= 2 + calls = [str(call) for call in mock_cursor.execute.call_args_list] + assert any('CREATE TABLE' in str(call) for call in calls) + assert any('CREATE INDEX' in str(call) for call in calls) + + def test_init_rejects_negative_duration(self) -> None: + """Test cache_duration validation.""" + with self.assertRaises(ValueError) as cm: + SherlockCache(cache_duration=0) + self.assertIn("positive", str(cm.exception)) + + with self.assertRaises(ValueError) as cm: + SherlockCache(cache_duration=-100) + self.assertIn("positive", str(cm.exception)) + + @patch('sherlock_project.cache.Path.mkdir') + @patch('sherlock_project.cache.sqlite3') + @patch('sherlock_project.cache.user_cache_dir') + def test_uses_platform_cache_dir( + self, + mock_cache_dir: Mock, + mock_sqlite: Mock, + mock_mkdir: Mock + ) -> None: + """Test platform-specific cache directory usage.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache() + + # Verify platformdirs was called + mock_cache_dir.assert_called_once_with("sherlock", "sherlock_project") + + # Verify cache path ends with cache.sqlite3 + assert cache.cache_path.endswith("cache.sqlite3") + assert cache is not None + + +@patch('sherlock_project.cache.sqlite3') +@patch('sherlock_project.cache.Path.mkdir') +@patch('sherlock_project.cache.user_cache_dir') +class TestCacheOperations(unittest.TestCase): + """Test cache get/set operations.""" + + def test_set_uses_parameterized_query( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test SQL injection protection via parameterized queries.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_conn.__exit__.return_value = None + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache(cache_duration=86400) + cache.set("testuser", "GitHub", QueryStatus.CLAIMED, "https://github.com/testuser") + + # Verify parameterized query was used (prevents SQL injection) + call_args = mock_cursor.execute.call_args + self.assertIn("INSERT OR REPLACE", call_args[0][0]) + self.assertEqual( + call_args[0][1][:4], + ("testuser", "GitHub", "CLAIMED", "https://github.com/testuser") + ) + + def test_set_rejects_control_characters( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test rejection of control characters in username.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache(cache_duration=86400) + + # Test various control characters + with self.assertRaises(ValueError) as cm: + cache.set("user\x00name", "GitHub", QueryStatus.CLAIMED, "https://example.com") + self.assertIn("null byte", str(cm.exception)) + + with self.assertRaises(ValueError) as cm: + cache.set("user\x01name", "GitHub", QueryStatus.CLAIMED, "https://example.com") + self.assertIn("control characters", str(cm.exception)) + + def test_set_rejects_null_bytes( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test null byte rejection.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache(cache_duration=86400) + + with self.assertRaises(ValueError) as cm: + cache.set("user\x00injection", "GitHub", QueryStatus.CLAIMED, "https://example.com") + self.assertIn("null byte", str(cm.exception)) + + def test_set_validates_url_length( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test URL length validation.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache(cache_duration=86400) + + long_url = "https://example.com/" + ("a" * 3000) + + with self.assertRaises(ValueError) as cm: + cache.set("user", "Site", QueryStatus.CLAIMED, long_url) + self.assertIn("maximum length", str(cm.exception)) + + def test_get_uses_parameterized_query( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test SQL injection protection in get() via parameterized queries.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + current_time = int(time.time()) + mock_cursor.fetchone.return_value = ( + "CLAIMED", + "https://github.com/testuser", + current_time, + 86400 + ) + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_conn.__exit__.return_value = None + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache(cache_duration=86400) + result = cache.get("testuser", "GitHub") + + assert result is not None + + # Verify parameterized query (prevents SQL injection) + call_args = mock_cursor.execute.call_args + self.assertIn("SELECT", call_args[0][0]) + self.assertIn("WHERE username = ? AND site = ?", call_args[0][0]) + self.assertEqual(call_args[0][1], ("testuser", "GitHub")) + + def test_get_returns_none_for_expired( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test expired entries return None.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + old_timestamp = int(time.time()) - (2 * 86400) + mock_cursor.fetchone.return_value = ( + "CLAIMED", + "https://github.com/testuser", + old_timestamp, + 86400 + ) + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_conn.__exit__.return_value = None + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache(cache_duration=86400) + result = cache.get("testuser", "GitHub") + + self.assertIsNone(result) + + def test_get_returns_valid_entry( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test valid entry is returned correctly.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + current_time = int(time.time()) + mock_cursor.fetchone.return_value = ( + "CLAIMED", + "https://github.com/testuser", + current_time - 1000, + 86400 + ) + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_conn.__exit__.return_value = None + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache(cache_duration=86400) + result = cache.get("testuser", "GitHub") + + self.assertIsNotNone(result) + self.assertEqual(result['status'], QueryStatus.CLAIMED) + self.assertEqual(result['url'], "https://github.com/testuser") + + +@patch('sherlock_project.cache.sqlite3') +@patch('sherlock_project.cache.Path.mkdir') +@patch('sherlock_project.cache.user_cache_dir') +class TestCacheClear(unittest.TestCase): + """Test cache clearing functionality.""" + + def test_clear_all( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test clearing entire cache.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_conn.__exit__.return_value = None + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache() + cache.clear() + + call_args = mock_cursor.execute.call_args + self.assertEqual(call_args[0][0], 'DELETE FROM results') + + def test_clear_by_username( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test clearing by username.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_conn.__exit__.return_value = None + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache() + cache.clear(username="testuser") + + call_args = mock_cursor.execute.call_args + self.assertIn("WHERE username = ?", call_args[0][0]) + self.assertEqual(call_args[0][1], ("testuser",)) + + def test_clear_validates_input( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test input validation in clear().""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_conn.__enter__.return_value = mock_conn + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache() + + with self.assertRaises(ValueError): + cache.clear(username="user\x00injection") + + +@patch('sherlock_project.cache.sqlite3') +@patch('sherlock_project.cache.Path.mkdir') +@patch('sherlock_project.cache.user_cache_dir') +class TestCacheStats(unittest.TestCase): + """Test cache statistics.""" + + def test_stats_calculation( + self, + mock_cache_dir: Mock, + mock_mkdir: Mock, + mock_sqlite: Mock + ) -> None: + """Test statistics calculation.""" + mock_cache_dir.return_value = "/home/user/.cache/sherlock" + + # Create separate cursors for init and stats + init_cursor = MagicMock() + stats_cursor = MagicMock() + + # Stats cursor should return values for the two SELECT COUNT queries + stats_cursor.fetchone.side_effect = [(10,), (7,)] + + mock_conn = MagicMock() + # Return different cursor for stats call + mock_conn.cursor.return_value = init_cursor + mock_conn.__enter__.return_value = mock_conn + mock_conn.__exit__.return_value = None + mock_sqlite.connect.return_value = mock_conn + + cache = SherlockCache() + + # Now set up for the stats call + mock_conn.cursor.return_value = stats_cursor + stats = cache.get_stats() + + self.assertEqual(stats['total_entries'], 10) + self.assertEqual(stats['valid_entries'], 7) + self.assertEqual(stats['expired_entries'], 3) + self.assertIn('cache_path', stats) +