From 839ae744cf403d82c09c3dc5b91810c9ce6232d2 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Sun, 4 Jan 2026 13:17:07 -0800 Subject: [PATCH 1/2] simplify entrypoints for orchestrator and workers --- archivebox/__init__.py | 6 + archivebox/cli/__init__.py | 2 - archivebox/cli/archivebox_add.py | 9 +- archivebox/cli/archivebox_orchestrator.py | 67 ------- archivebox/cli/archivebox_run.py | 65 ++++++- archivebox/cli/archivebox_worker.py | 50 ------ archivebox/core/models.py | 21 --- archivebox/crawls/models.py | 32 ++-- .../0010_alter_process_process_type.py | 18 ++ archivebox/machine/models.py | 81 ++++++--- archivebox/misc/process_utils.py | 123 ------------- .../screenshot/tests/test_screenshot.py | 6 + archivebox/workers/worker.py | 169 +++++++++++++----- 13 files changed, 300 insertions(+), 349 deletions(-) delete mode 100644 archivebox/cli/archivebox_orchestrator.py delete mode 100644 archivebox/cli/archivebox_worker.py create mode 100644 archivebox/machine/migrations/0010_alter_process_process_type.py delete mode 100644 archivebox/misc/process_utils.py diff --git a/archivebox/__init__.py b/archivebox/__init__.py index 8c0be7a0e7..0079c7cce4 100755 --- a/archivebox/__init__.py +++ b/archivebox/__init__.py @@ -15,6 +15,12 @@ import sys from pathlib import Path +# Force unbuffered output for real-time logs +if hasattr(sys.stdout, 'reconfigure'): + sys.stdout.reconfigure(line_buffering=True) + sys.stderr.reconfigure(line_buffering=True) +os.environ['PYTHONUNBUFFERED'] = '1' + ASCII_LOGO = """ █████╗ ██████╗ ██████╗██╗ ██╗██╗██╗ ██╗███████╗ ██████╗ ██████╗ ██╗ ██╗ ██╔══██╗██╔══██╗██╔════╝██║ ██║██║██║ ██║██╔════╝ ██╔══██╗██╔═══██╗╚██╗██╔╝ diff --git a/archivebox/cli/__init__.py b/archivebox/cli/__init__.py index 4c72028245..5f17755b6c 100644 --- a/archivebox/cli/__init__.py +++ b/archivebox/cli/__init__.py @@ -53,8 +53,6 @@ class ArchiveBoxGroup(click.Group): 'manage': 'archivebox.cli.archivebox_manage.main', # Introspection commands 'pluginmap': 'archivebox.cli.archivebox_pluginmap.main', - # Worker command - 'worker': 'archivebox.cli.archivebox_worker.main', } all_subcommands = { **meta_commands, diff --git a/archivebox/cli/archivebox_add.py b/archivebox/cli/archivebox_add.py index 25b0815b22..65a34c02d1 100644 --- a/archivebox/cli/archivebox_add.py +++ b/archivebox/cli/archivebox_add.py @@ -127,10 +127,11 @@ def add(urls: str | list[str], # Background mode: just queue work and return (orchestrator via server will pick it up) print('[yellow]\\[*] URLs queued. Orchestrator will process them (run `archivebox server` if not already running).[/yellow]') else: - # Foreground mode: run orchestrator inline until all work is done - print(f'[green]\\[*] Starting orchestrator to process crawl...[/green]') - orchestrator = Orchestrator(exit_on_idle=True, crawl_id=str(crawl.id)) - orchestrator.runloop() # Block until complete + # Foreground mode: run CrawlWorker inline until all work is done + print(f'[green]\\[*] Starting worker to process crawl...[/green]') + from archivebox.workers.worker import CrawlWorker + worker = CrawlWorker(crawl_id=str(crawl.id), worker_id=0) + worker.runloop() # Block until complete # 6. Return the list of Snapshots in this crawl return crawl.snapshot_set.all() diff --git a/archivebox/cli/archivebox_orchestrator.py b/archivebox/cli/archivebox_orchestrator.py deleted file mode 100644 index 4b27272736..0000000000 --- a/archivebox/cli/archivebox_orchestrator.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 - -""" -archivebox orchestrator [--daemon] - -Start the orchestrator process that manages workers. - -The orchestrator polls queues for each model type (Crawl, Snapshot, ArchiveResult) -and lazily spawns worker processes when there is work to be done. -""" - -__package__ = 'archivebox.cli' -__command__ = 'archivebox orchestrator' - -import sys - -import rich_click as click - -from archivebox.misc.util import docstring - - -def orchestrator(daemon: bool = False, watch: bool = False) -> int: - """ - Start the orchestrator process. - - The orchestrator: - 1. Polls each model queue (Crawl, Snapshot, ArchiveResult) - 2. Spawns worker processes when there is work to do - 3. Monitors worker health and restarts failed workers - 4. Exits when all queues are empty (unless --daemon) - - Args: - daemon: Run forever (don't exit when idle) - watch: Just watch the queues without spawning workers (for debugging) - - Exit codes: - 0: All work completed successfully - 1: Error occurred - """ - from archivebox.workers.orchestrator import Orchestrator - - if Orchestrator.is_running(): - print('[yellow]Orchestrator is already running[/yellow]') - return 0 - - try: - orchestrator_instance = Orchestrator(exit_on_idle=not daemon) - orchestrator_instance.runloop() - return 0 - except KeyboardInterrupt: - return 0 - except Exception as e: - print(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr) - return 1 - - -@click.command() -@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)") -@click.option('--watch', '-w', is_flag=True, help="Watch queues without spawning workers") -@docstring(orchestrator.__doc__) -def main(daemon: bool, watch: bool): - """Start the ArchiveBox orchestrator process""" - sys.exit(orchestrator(daemon=daemon, watch=watch)) - - -if __name__ == '__main__': - main() diff --git a/archivebox/cli/archivebox_run.py b/archivebox/cli/archivebox_run.py index 9901c6844e..0beed5e295 100644 --- a/archivebox/cli/archivebox_run.py +++ b/archivebox/cli/archivebox_run.py @@ -1,16 +1,18 @@ #!/usr/bin/env python3 """ -archivebox run [--daemon] +archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...] Unified command for processing queued work. Modes: - With stdin JSONL: Process piped records, exit when complete - Without stdin (TTY): Run orchestrator in foreground until killed + - --crawl-id: Run orchestrator for specific crawl only + - --snapshot-id: Run worker for specific snapshot only (internal use) Examples: - # Run orchestrator in foreground (replaces `archivebox orchestrator`) + # Run orchestrator in foreground archivebox run # Run as daemon (don't exit on idle) @@ -23,6 +25,12 @@ # Mixed types work too cat mixed_records.jsonl | archivebox run + + # Run orchestrator for specific crawl (shows live progress for that crawl) + archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e + + # Run worker for specific snapshot (internal use by orchestrator) + archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad """ __package__ = 'archivebox.cli' @@ -187,15 +195,62 @@ def run_orchestrator(daemon: bool = False) -> int: return 1 +def run_snapshot_worker(snapshot_id: str) -> int: + """ + Run a SnapshotWorker for a specific snapshot. + + Args: + snapshot_id: Snapshot UUID to process + + Returns exit code (0 = success, 1 = error). + """ + from archivebox.workers.worker import _run_snapshot_worker + + try: + _run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0) + return 0 + except KeyboardInterrupt: + return 0 + except Exception as e: + rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr) + import traceback + traceback.print_exc() + return 1 + + @click.command() @click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)") -def main(daemon: bool): +@click.option('--crawl-id', help="Run orchestrator for specific crawl only") +@click.option('--snapshot-id', help="Run worker for specific snapshot only") +def main(daemon: bool, crawl_id: str, snapshot_id: str): """ Process queued work. - When stdin is piped: Process those specific records and exit. - When run standalone: Run orchestrator in foreground. + Modes: + - No args + stdin piped: Process piped JSONL records + - No args + TTY: Run orchestrator for all work + - --crawl-id: Run orchestrator for that crawl only + - --snapshot-id: Run worker for that snapshot only """ + # Snapshot worker mode + if snapshot_id: + sys.exit(run_snapshot_worker(snapshot_id)) + + # Crawl worker mode + if crawl_id: + from archivebox.workers.worker import CrawlWorker + try: + worker = CrawlWorker(crawl_id=crawl_id, worker_id=0) + worker.runloop() + sys.exit(0) + except KeyboardInterrupt: + sys.exit(0) + except Exception as e: + rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr) + import traceback + traceback.print_exc() + sys.exit(1) + # Check if stdin has data (non-TTY means piped input) if not sys.stdin.isatty(): sys.exit(process_stdin_records()) diff --git a/archivebox/cli/archivebox_worker.py b/archivebox/cli/archivebox_worker.py deleted file mode 100644 index 5701936060..0000000000 --- a/archivebox/cli/archivebox_worker.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 - -__package__ = 'archivebox.cli' -__command__ = 'archivebox worker' - -import sys - -import rich_click as click - -from archivebox.misc.util import docstring - - -def worker(worker_type: str, daemon: bool = False, plugin: str | None = None): - """ - Start a worker process to process items from the queue. - - Worker types: - - crawl: Process Crawl objects (parse seeds, create snapshots) - - snapshot: Process Snapshot objects (create archive results) - - archiveresult: Process ArchiveResult objects (run plugins) - - Workers poll the database for queued items, claim them atomically, - and spawn subprocess tasks to handle each item. - """ - from archivebox.workers.worker import get_worker_class - - WorkerClass = get_worker_class(worker_type) - - # Build kwargs - kwargs = {'daemon': daemon} - if plugin and worker_type == 'archiveresult': - kwargs['extractor'] = plugin # internal field still called extractor - - # Create and run worker - worker_instance = WorkerClass(**kwargs) - worker_instance.runloop() - - -@click.command() -@click.argument('worker_type', type=click.Choice(['crawl', 'snapshot', 'archiveresult'])) -@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)") -@click.option('--plugin', '-p', default=None, help='Filter by plugin (archiveresult only)') -@docstring(worker.__doc__) -def main(worker_type: str, daemon: bool, plugin: str | None): - """Start an ArchiveBox worker process""" - worker(worker_type, daemon=daemon, plugin=plugin) - - -if __name__ == '__main__': - main() diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 1888922e87..b05ad5013c 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -1456,27 +1456,6 @@ def cleanup(self): empty_ars.delete() print(f'[yellow]🗑️ Deleted {deleted_count} empty ArchiveResults for {self.url}[/yellow]') - def has_running_background_hooks(self) -> bool: - """ - Check if any ArchiveResult background hooks are still running. - - Used by state machine to determine if snapshot is finished. - """ - from archivebox.misc.process_utils import validate_pid_file - - if not self.OUTPUT_DIR.exists(): - return False - - for plugin_dir in self.OUTPUT_DIR.iterdir(): - if not plugin_dir.is_dir(): - continue - pid_file = plugin_dir / 'hook.pid' - cmd_file = plugin_dir / 'cmd.sh' - if validate_pid_file(pid_file, cmd_file): - return True - - return False - def to_json(self) -> dict: """ Convert Snapshot model instance to a JSON-serializable dict. diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index d8df425c0c..86277275ea 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -449,17 +449,27 @@ def is_finished(self) -> bool: def cleanup(self): """Clean up background hooks and run on_CrawlEnd hooks.""" from archivebox.hooks import run_hook, discover_hooks - from archivebox.misc.process_utils import safe_kill_process - - # Kill any background processes by scanning for all .pid files + from archivebox.machine.models import Process + + # Kill any background Crawl hooks using Process records + # Find all running hook Processes that are children of this crawl's workers + # (CrawlWorker already kills its hooks via on_shutdown, but this is backup for orphans) + running_hooks = Process.objects.filter( + parent__worker_type='crawl', + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + ).distinct() + + for process in running_hooks: + # Use Process.kill_tree() to gracefully kill parent + children + killed_count = process.kill_tree(graceful_timeout=2.0) + if killed_count > 0: + print(f'[yellow]🔪 Killed {killed_count} orphaned crawl hook process(es)[/yellow]') + + # Clean up .pid files from output directory if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): - cmd_file = pid_file.parent / 'cmd.sh' - # safe_kill_process now waits for termination and escalates to SIGKILL - # Returns True only if process is confirmed dead - killed = safe_kill_process(pid_file, cmd_file) - if killed: - pid_file.unlink(missing_ok=True) + pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks from archivebox.config.configset import get_config @@ -472,7 +482,7 @@ def cleanup(self): output_dir = self.OUTPUT_DIR / plugin_name output_dir.mkdir(parents=True, exist_ok=True) - result = run_hook( + process = run_hook( hook, output_dir=output_dir, config=config, @@ -481,7 +491,7 @@ def cleanup(self): ) # Log failures but don't block - if result and result['returncode'] != 0: + if process.exit_code != 0: print(f'[yellow]⚠️ CrawlEnd hook failed: {hook.name}[/yellow]') diff --git a/archivebox/machine/migrations/0010_alter_process_process_type.py b/archivebox/machine/migrations/0010_alter_process_process_type.py new file mode 100644 index 0000000000..ebf8129411 --- /dev/null +++ b/archivebox/machine/migrations/0010_alter_process_process_type.py @@ -0,0 +1,18 @@ +# Generated by Django 6.0 on 2026-01-03 06:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('machine', '0009_alter_binary_status'), + ] + + operations = [ + migrations.AlterField( + model_name='process', + name='process_type', + field=models.CharField(choices=[('supervisord', 'Supervisord'), ('orchestrator', 'Orchestrator'), ('worker', 'Worker'), ('cli', 'CLI'), ('hook', 'Hook'), ('binary', 'Binary')], db_index=True, default='cli', help_text='Type of process (cli, worker, orchestrator, binary, supervisord)', max_length=16), + ), + ] diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 4e5de2f49f..e9777d8020 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -499,20 +499,25 @@ def cleanup(self): since installations are foreground, but included for consistency). """ from pathlib import Path - from archivebox.misc.process_utils import safe_kill_process - output_dir = self.OUTPUT_DIR - if not output_dir.exists(): - return + # Kill any background binary installation hooks using Process records + # (rarely used since binary installations are typically foreground) + running_hooks = Process.objects.filter( + binary=self, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + ) - # Kill any background hooks - for plugin_dir in output_dir.iterdir(): - if not plugin_dir.is_dir(): - continue + for process in running_hooks: + killed_count = process.kill_tree(graceful_timeout=2.0) + if killed_count > 0: + print(f'[yellow]🔪 Killed {killed_count} binary installation hook process(es)[/yellow]') - pid_file = plugin_dir / 'hook.pid' - cmd_file = plugin_dir / 'cmd.sh' - safe_kill_process(pid_file, cmd_file) + # Clean up .pid files from output directory + output_dir = self.OUTPUT_DIR + if output_dir.exists(): + for pid_file in output_dir.glob('**/*.pid'): + pid_file.unlink(missing_ok=True) def symlink_to_lib_bin(self, lib_bin_dir: str | Path) -> Path | None: """ @@ -1273,32 +1278,61 @@ def stderr_file(self) -> Path: def _write_pid_file(self) -> None: """Write PID file with mtime set to process start time.""" - from archivebox.misc.process_utils import write_pid_file_with_mtime if self.pid and self.started_at and self.pid_file: - write_pid_file_with_mtime( - self.pid_file, - self.pid, - self.started_at.timestamp() - ) + # Write PID to file + self.pid_file.write_text(str(self.pid)) + # Set mtime to process start time for validation + try: + start_time = self.started_at.timestamp() + os.utime(self.pid_file, (start_time, start_time)) + except OSError: + pass # mtime optional, validation degrades gracefully def _write_cmd_file(self) -> None: """Write cmd.sh script for debugging/validation.""" - from archivebox.misc.process_utils import write_cmd_file if self.cmd and self.cmd_file: - write_cmd_file(self.cmd_file, self.cmd) + # Escape shell arguments (quote if contains space, ", or $) + def escape(arg: str) -> str: + return f'"{arg.replace(chr(34), chr(92)+chr(34))}"' if any(c in arg for c in ' "$') else arg + + # Write executable shell script + script = '#!/bin/bash\n' + ' '.join(escape(arg) for arg in self.cmd) + '\n' + self.cmd_file.write_text(script) + try: + self.cmd_file.chmod(0o755) + except OSError: + pass def _build_env(self) -> dict: """Build environment dict for subprocess, merging stored env with system.""" + import json + env = os.environ.copy() - env.update(self.env or {}) + + # Convert all values to strings for subprocess.Popen + if self.env: + for key, value in self.env.items(): + if value is None: + continue + elif isinstance(value, str): + env[key] = value # Already a string, use as-is + elif isinstance(value, bool): + env[key] = 'True' if value else 'False' + elif isinstance(value, (int, float)): + env[key] = str(value) + else: + # Lists, dicts, etc. - serialize to JSON + env[key] = json.dumps(value, default=str) + return env - def launch(self, background: bool = False) -> 'Process': + def launch(self, background: bool = False, cwd: str | None = None) -> 'Process': """ Spawn the subprocess and update this Process record. Args: background: If True, don't wait for completion (for daemons/bg hooks) + cwd: Working directory for the subprocess (defaults to self.pwd) Returns: self (updated with pid, started_at, etc.) @@ -1310,6 +1344,9 @@ def launch(self, background: bool = False) -> 'Process': if not self.pwd: raise ValueError("Process.pwd must be set before calling launch()") + # Use provided cwd or default to pwd + working_dir = cwd or self.pwd + # Ensure output directory exists Path(self.pwd).mkdir(parents=True, exist_ok=True) @@ -1322,7 +1359,7 @@ def launch(self, background: bool = False) -> 'Process': with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err: proc = subprocess.Popen( self.cmd, - cwd=self.pwd, + cwd=working_dir, stdout=out, stderr=err, env=self._build_env(), diff --git a/archivebox/misc/process_utils.py b/archivebox/misc/process_utils.py deleted file mode 100644 index 15fa861ea3..0000000000 --- a/archivebox/misc/process_utils.py +++ /dev/null @@ -1,123 +0,0 @@ -""" -Process validation using psutil and filesystem mtime. - -Uses mtime as a "password": PID files are timestamped with process start time. -Since filesystem mtimes can be set arbitrarily but process start times cannot, -comparing them detects PID reuse. -""" - -__package__ = 'archivebox.misc' - -import os -import time -from pathlib import Path -from typing import Optional - -try: - import psutil - PSUTIL_AVAILABLE = True -except ImportError: - PSUTIL_AVAILABLE = False - - -def validate_pid_file(pid_file: Path, cmd_file: Optional[Path] = None, tolerance: float = 5.0) -> bool: - """Validate PID using mtime and optional cmd.sh. Returns True if process is ours.""" - if not PSUTIL_AVAILABLE or not pid_file.exists(): - return False - - try: - pid = int(pid_file.read_text().strip()) - proc = psutil.Process(pid) - - # Check mtime matches process start time - if abs(pid_file.stat().st_mtime - proc.create_time()) > tolerance: - return False # PID reused - - # Validate command if provided - if cmd_file and cmd_file.exists(): - cmd = cmd_file.read_text() - cmdline = ' '.join(proc.cmdline()) - if '--remote-debugging-port' in cmd and '--remote-debugging-port' not in cmdline: - return False - if ('chrome' in cmd.lower() or 'chromium' in cmd.lower()): - if 'chrome' not in proc.name().lower() and 'chromium' not in proc.name().lower(): - return False - - return True - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, ValueError, OSError): - return False - - -def write_pid_file_with_mtime(pid_file: Path, pid: int, start_time: float): - """Write PID file and set mtime to process start time.""" - pid_file.write_text(str(pid)) - try: - os.utime(pid_file, (start_time, start_time)) - except OSError: - pass # mtime optional, validation degrades gracefully - - -def write_cmd_file(cmd_file: Path, cmd: list[str]): - """Write shell command script.""" - def escape(arg: str) -> str: - return f'"{arg.replace(chr(34), chr(92)+chr(34))}"' if any(c in arg for c in ' "$') else arg - - script = '#!/bin/bash\n' + ' '.join(escape(arg) for arg in cmd) + '\n' - cmd_file.write_text(script) - try: - cmd_file.chmod(0o755) - except OSError: - pass - - -def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15, timeout: float = 3.0) -> bool: - """ - Kill process after validation, with graceful wait and SIGKILL escalation. - - Returns True only if process is confirmed dead (either already dead or killed successfully). - """ - import time - import signal - - if not validate_pid_file(pid_file, cmd_file): - pid_file.unlink(missing_ok=True) # Clean stale file - return True # Process already dead, consider it killed - - try: - pid = int(pid_file.read_text().strip()) - - # Send initial signal (SIGTERM by default) - try: - os.kill(pid, signal_num) - except ProcessLookupError: - # Process already dead - return True - - # Wait for process to terminate gracefully - start_time = time.time() - while time.time() - start_time < timeout: - try: - os.kill(pid, 0) # Check if process still exists - time.sleep(0.1) - except ProcessLookupError: - # Process terminated - return True - - # Process didn't terminate, escalate to SIGKILL - try: - os.kill(pid, signal.SIGKILL) - time.sleep(0.5) # Brief wait after SIGKILL - # Verify it's dead - try: - os.kill(pid, 0) - # Process still alive after SIGKILL - this is unusual - return False - except ProcessLookupError: - # Process finally dead - return True - except ProcessLookupError: - # Process died between timeout and SIGKILL - return True - - except (OSError, ValueError): - return False diff --git a/archivebox/plugins/screenshot/tests/test_screenshot.py b/archivebox/plugins/screenshot/tests/test_screenshot.py index a522f38b4a..b294199168 100644 --- a/archivebox/plugins/screenshot/tests/test_screenshot.py +++ b/archivebox/plugins/screenshot/tests/test_screenshot.py @@ -206,6 +206,12 @@ def test_config_save_screenshot_false_skips(): env = os.environ.copy() env['SCREENSHOT_ENABLED'] = 'False' + # DEBUG: Check if NODE_V8_COVERAGE is in env + if 'NODE_V8_COVERAGE' in env: + print(f"\n[DEBUG] NODE_V8_COVERAGE in env: {env['NODE_V8_COVERAGE']}") + else: + print("\n[DEBUG] NODE_V8_COVERAGE NOT in env") + result = subprocess.run( ['node', str(SCREENSHOT_HOOK), f'--url={TEST_URL}', '--snapshot-id=test999'], cwd=tmpdir, diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 633bec66b4..826accdb46 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -17,7 +17,7 @@ from typing import ClassVar, Any from datetime import timedelta from pathlib import Path -from multiprocessing import Process as MPProcess, cpu_count +from multiprocessing import cpu_count from django.db.models import QuerySet from django.utils import timezone @@ -282,26 +282,80 @@ def _terminate_background_hooks( still_running.remove(hook_name) @classmethod - def start(cls, **kwargs: Any) -> int: + def start(cls, parent: Any = None, **kwargs: Any) -> int: """ - Fork a new worker as a subprocess. + Fork a new worker as a subprocess using Process.launch(). + + Args: + parent: Parent Process record (for hierarchy tracking) + **kwargs: Worker-specific args (crawl_id or snapshot_id) + Returns the PID of the new process. """ - from archivebox.machine.models import Process + from archivebox.machine.models import Process, Machine + from archivebox.config.configset import get_config + from pathlib import Path + from django.conf import settings + import sys + + # Build command and get config for the appropriate scope + if cls.name == 'crawl': + crawl_id = kwargs.get('crawl_id') + if not crawl_id: + raise ValueError("CrawlWorker requires crawl_id") + + from archivebox.crawls.models import Crawl + crawl = Crawl.objects.get(id=crawl_id) + + cmd = [sys.executable, '-m', 'archivebox', 'run', '--crawl-id', str(crawl_id)] + pwd = Path(crawl.OUTPUT_DIR) # Run in crawl's output directory + env = get_config(scope='crawl', crawl=crawl) + + elif cls.name == 'snapshot': + snapshot_id = kwargs.get('snapshot_id') + if not snapshot_id: + raise ValueError("SnapshotWorker requires snapshot_id") + + from archivebox.core.models import Snapshot + snapshot = Snapshot.objects.get(id=snapshot_id) - worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) + cmd = [sys.executable, '-m', 'archivebox', 'run', '--snapshot-id', str(snapshot_id)] + pwd = Path(snapshot.output_dir) # Run in snapshot's output directory + env = get_config(scope='snapshot', snapshot=snapshot) - # Use module-level function for pickling compatibility - proc = MPProcess( - target=_run_worker, - args=(cls.name, worker_id), - kwargs=kwargs, - name=f'{cls.name}_worker_{worker_id}', + else: + raise ValueError(f"Unknown worker type: {cls.name}") + + # Ensure output directory exists + pwd.mkdir(parents=True, exist_ok=True) + + # Convert config to JSON-serializable format for storage + import json + env_serializable = { + k: json.loads(json.dumps(v, default=str)) + for k, v in env.items() + if v is not None + } + + # Create Process record with full config as environment + # pwd = where stdout/stderr/pid/cmd files are written (snapshot/crawl output dir) + # cwd (passed to launch) = where subprocess runs from (DATA_DIR) + # parent = parent Process for hierarchy tracking (CrawlWorker -> SnapshotWorker) + process = Process.objects.create( + machine=Machine.current(), + parent=parent, + process_type=Process.TypeChoices.WORKER, + worker_type=cls.name, + pwd=str(pwd), + cmd=cmd, + env=env_serializable, + timeout=3600, # 1 hour default timeout for workers ) - proc.start() - assert proc.pid is not None - return proc.pid + # Launch in background with DATA_DIR as working directory + process.launch(background=True, cwd=str(settings.DATA_DIR)) + + return process.pid @classmethod def get_running_workers(cls) -> list: @@ -377,17 +431,18 @@ def runloop(self) -> None: self.on_startup() try: - print(f'[cyan]🔄 CrawlWorker.runloop: Starting tick() for crawl {self.crawl_id}[/cyan]', file=sys.stderr) + print(f'🔄 CrawlWorker starting for crawl {self.crawl_id}', file=sys.stderr) + # Advance state machine: QUEUED → STARTED (triggers run() via @started.enter) self.crawl.sm.tick() self.crawl.refresh_from_db() - print(f'[cyan]🔄 tick() complete, crawl status={self.crawl.status}[/cyan]', file=sys.stderr) + print(f'🔄 tick() complete, crawl status={self.crawl.status}', file=sys.stderr) # Now spawn SnapshotWorkers and monitor progress while True: # Check if crawl is done if self._is_crawl_finished(): - print(f'[cyan]🔄 Crawl finished, sealing...[/cyan]', file=sys.stderr) + print(f'🔄 Crawl finished, sealing...', file=sys.stderr) self.crawl.sm.seal() break @@ -401,9 +456,12 @@ def runloop(self) -> None: def _spawn_snapshot_workers(self) -> None: """Spawn SnapshotWorkers for queued snapshots (up to limit).""" + from pathlib import Path from archivebox.core.models import Snapshot from archivebox.machine.models import Process + debug_log = Path('/tmp/archivebox_crawl_worker_debug.log') + # Count running SnapshotWorkers for this crawl running_count = Process.objects.filter( process_type=Process.TypeChoices.WORKER, @@ -412,22 +470,51 @@ def _spawn_snapshot_workers(self) -> None: status__in=['running', 'started'], ).count() + with open(debug_log, 'a') as f: + f.write(f' _spawn_snapshot_workers: running={running_count}/{self.MAX_SNAPSHOT_WORKERS}\n') + f.flush() + if running_count >= self.MAX_SNAPSHOT_WORKERS: return # At limit - # Get queued snapshots for this crawl (SnapshotWorker will mark as STARTED in on_startup) - queued_snapshots = Snapshot.objects.filter( + # Get snapshots that need workers spawned + # Find all running SnapshotWorker processes for this crawl + running_processes = Process.objects.filter( + parent_id=self.db_process.id, + worker_type='snapshot', + status__in=['running', 'started'], + ) + + # Extract snapshot IDs from their pwd (contains snapshot ID at the end) + running_snapshot_ids = [] + for proc in running_processes: + if proc.pwd: + # pwd is like: /path/to/archive/{timestamp} + # We need to match this against snapshot.output_dir + running_snapshot_ids.append(proc.pwd) + + # Find snapshots that don't have a running worker + all_snapshots = Snapshot.objects.filter( crawl_id=self.crawl_id, - status=Snapshot.StatusChoices.QUEUED, - ).order_by('created_at')[:self.MAX_SNAPSHOT_WORKERS - running_count] + status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED], + ).order_by('created_at') - import sys - print(f'[yellow]🔧 _spawn_snapshot_workers: running={running_count}/{self.MAX_SNAPSHOT_WORKERS}, queued={queued_snapshots.count()}[/yellow]', file=sys.stderr) + # Filter out snapshots that already have workers + pending_snapshots = [ + snap for snap in all_snapshots + if snap.output_dir not in running_snapshot_ids + ][:self.MAX_SNAPSHOT_WORKERS - running_count] + + with open(debug_log, 'a') as f: + f.write(f' Found {len(pending_snapshots)} snapshots needing workers for crawl {self.crawl_id}\n') + f.flush() # Spawn workers - for snapshot in queued_snapshots: - print(f'[yellow]🔧 Spawning worker for {snapshot.url} (status={snapshot.status})[/yellow]', file=sys.stderr) - SnapshotWorker.start(snapshot_id=str(snapshot.id)) + for snapshot in pending_snapshots: + with open(debug_log, 'a') as f: + f.write(f' Spawning worker for {snapshot.url} (status={snapshot.status})\n') + f.flush() + SnapshotWorker.start(parent=self.db_process, snapshot_id=str(snapshot.id)) log_worker_event( worker_type='CrawlWorker', event=f'Spawned SnapshotWorker for {snapshot.url}', @@ -437,13 +524,25 @@ def _spawn_snapshot_workers(self) -> None: def _is_crawl_finished(self) -> bool: """Check if all snapshots are sealed.""" + from pathlib import Path from archivebox.core.models import Snapshot + debug_log = Path('/tmp/archivebox_crawl_worker_debug.log') + + total = Snapshot.objects.filter(crawl_id=self.crawl_id).count() pending = Snapshot.objects.filter( crawl_id=self.crawl_id, status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED], ).count() + queued = Snapshot.objects.filter(crawl_id=self.crawl_id, status=Snapshot.StatusChoices.QUEUED).count() + started = Snapshot.objects.filter(crawl_id=self.crawl_id, status=Snapshot.StatusChoices.STARTED).count() + sealed = Snapshot.objects.filter(crawl_id=self.crawl_id, status=Snapshot.StatusChoices.SEALED).count() + + with open(debug_log, 'a') as f: + f.write(f' _is_crawl_finished: total={total}, queued={queued}, started={started}, sealed={sealed}, pending={pending}\n') + f.flush() + return pending == 0 def on_shutdown(self, error: BaseException | None = None) -> None: @@ -701,24 +800,6 @@ def _extract_plugin_name(hook_name: str) -> str: name = name.replace('.bg', '') # Remove .bg suffix return name - @classmethod - def start(cls, snapshot_id: str, **kwargs: Any) -> int: - """Fork a SnapshotWorker for a specific snapshot.""" - from archivebox.machine.models import Process - - worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) - - proc = MPProcess( - target=_run_snapshot_worker, # New module-level function - args=(snapshot_id, worker_id), - kwargs=kwargs, - name=f'snapshot_worker_{snapshot_id[:8]}', - ) - proc.start() - - assert proc.pid is not None - return proc.pid - # Populate the registry WORKER_TYPES.update({ From 456aaee287c69c9bf26a282639c6042a74bcb7b4 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Sun, 4 Jan 2026 16:16:26 -0800 Subject: [PATCH 2/2] more migration id/uuid and config propagation fixes --- archivebox/base_models/models.py | 6 +- archivebox/config/configset.py | 29 +- archivebox/config/constants.py | 1 + .../core/migrations/0023_upgrade_to_0_9_0.py | 18 +- ...options_alter_snapshot_options_and_more.py | 1 + .../0029_migrate_archiveresult_to_uuid_pk.py | 115 +++-- archivebox/core/models.py | 11 +- archivebox/crawls/models.py | 15 +- archivebox/hooks.py | 6 +- archivebox/machine/models.py | 130 ++++- archivebox/plugins/chrome/config.json | 6 + .../screenshot/tests/test_screenshot.py | 18 +- archivebox/tests/test_migrations_07_to_09.py | 2 +- .../tests/test_worker_config_propagation.py | 481 ++++++++++++++++++ archivebox/workers/worker.py | 44 +- bin/test_plugins.sh | 2 +- 16 files changed, 790 insertions(+), 95 deletions(-) create mode 100644 archivebox/tests/test_worker_config_propagation.py diff --git a/archivebox/base_models/models.py b/archivebox/base_models/models.py index adfbce3530..7d0bbb05fd 100755 --- a/archivebox/base_models/models.py +++ b/archivebox/base_models/models.py @@ -111,7 +111,7 @@ class Meta: def save(self, *args, **kwargs): super().save(*args, **kwargs) - self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + Path(self.output_dir).mkdir(parents=True, exist_ok=True) # Note: index.json is deprecated, models should use write_index_jsonl() for full data @property @@ -127,5 +127,5 @@ def output_dir_str(self) -> str: return f'{self.output_dir_parent}/{self.output_dir_name}' @property - def OUTPUT_DIR(self) -> Path: - return DATA_DIR / self.output_dir_str + def output_dir(self) -> Path: + raise NotImplementedError(f'{self.__class__.__name__} must implement output_dir property') diff --git a/archivebox/config/configset.py b/archivebox/config/configset.py index 7e56e22a0f..805cb86e14 100644 --- a/archivebox/config/configset.py +++ b/archivebox/config/configset.py @@ -118,12 +118,12 @@ def update_in_place(self, warn: bool = True, persist: bool = False, **kwargs) -> def get_config( - scope: str = "global", defaults: Optional[Dict] = None, persona: Any = None, user: Any = None, crawl: Any = None, snapshot: Any = None, + machine: Any = None, ) -> Dict[str, Any]: """ Get merged config from all sources. @@ -134,17 +134,18 @@ def get_config( 3. Per-user config (user.config JSON field) 4. Per-persona config (persona.get_derived_config() - includes CHROME_USER_DATA_DIR etc.) 5. Environment variables - 6. Config file (ArchiveBox.conf) - 7. Plugin schema defaults (config.json) - 8. Core config defaults + 6. Per-machine config (machine.config JSON field - resolved binary paths) + 7. Config file (ArchiveBox.conf) + 8. Plugin schema defaults (config.json) + 9. Core config defaults Args: - scope: Config scope ('global', 'crawl', 'snapshot', etc.) defaults: Default values to start with persona: Persona object (provides derived paths like CHROME_USER_DATA_DIR) user: User object with config JSON field crawl: Crawl object with config JSON field snapshot: Snapshot object with config JSON field + machine: Machine object with config JSON field (defaults to Machine.current()) Returns: Merged config dict @@ -184,6 +185,18 @@ def get_config( file_config = BaseConfigSet.load_from_file(config_file) config.update(file_config) + # Apply machine config overrides (cached binary paths, etc.) + if machine is None: + # Default to current machine if not provided + try: + from archivebox.machine.models import Machine + machine = Machine.current() + except Exception: + pass # Machine might not be available during early init + + if machine and hasattr(machine, "config") and machine.config: + config.update(machine.config) + # Override with environment variables for key in config: env_val = os.environ.get(key) @@ -221,8 +234,8 @@ def get_config( config.update(crawl.config) # Add CRAWL_OUTPUT_DIR for snapshot hooks to find shared Chrome session - if crawl and hasattr(crawl, "OUTPUT_DIR"): - config['CRAWL_OUTPUT_DIR'] = str(crawl.OUTPUT_DIR) + if crawl and hasattr(crawl, "output_dir"): + config['CRAWL_OUTPUT_DIR'] = str(crawl.output_dir) # Apply snapshot config overrides (highest priority) if snapshot and hasattr(snapshot, "config") and snapshot.config: @@ -260,7 +273,7 @@ def get_flat_config() -> Dict[str, Any]: Replaces abx.pm.hook.get_FLAT_CONFIG() """ - return get_config(scope="global") + return get_config() def get_all_configs() -> Dict[str, BaseConfigSet]: diff --git a/archivebox/config/constants.py b/archivebox/config/constants.py index 607ff2e7d6..9e78d72297 100644 --- a/archivebox/config/constants.py +++ b/archivebox/config/constants.py @@ -176,6 +176,7 @@ class ConstantsDict(Mapping): CRONTABS_DIR_NAME, "invalid", "users", + "machine", # Backwards compatibility with old directory names "user_plugins", # old name for USER_PLUGINS_DIR (now 'plugins') "user_templates", # old name for CUSTOM_TEMPLATES_DIR (now 'templates') diff --git a/archivebox/core/migrations/0023_upgrade_to_0_9_0.py b/archivebox/core/migrations/0023_upgrade_to_0_9_0.py index 2133309ce5..c32c31b3fe 100644 --- a/archivebox/core/migrations/0023_upgrade_to_0_9_0.py +++ b/archivebox/core/migrations/0023_upgrade_to_0_9_0.py @@ -15,6 +15,7 @@ def get_table_columns(table_name): def upgrade_core_tables(apps, schema_editor): """Upgrade core tables from v0.7.2 or v0.8.6rc0 to v0.9.0.""" + from archivebox.uuid_compat import uuid7 cursor = connection.cursor() # Check if core_archiveresult table exists @@ -60,8 +61,8 @@ def upgrade_core_tables(apps, schema_editor): if has_data: if has_uuid and not has_abid: - # Migrating from v0.7.2 (has uuid, minimal fields) - print('Migrating ArchiveResult from v0.7.2 schema...') + # Migrating from v0.7.2+ (has uuid column) + print('Migrating ArchiveResult from v0.7.2+ schema (with uuid)...') cursor.execute(""" INSERT OR IGNORE INTO core_archiveresult_new ( id, uuid, snapshot_id, cmd, pwd, cmd_version, @@ -86,7 +87,18 @@ def upgrade_core_tables(apps, schema_editor): FROM core_archiveresult; """) else: - print(f'Warning: Unexpected schema - has_uuid={has_uuid}, has_abid={has_abid}') + # Migrating from v0.7.2 (no uuid or abid column - generate fresh UUIDs) + print('Migrating ArchiveResult from v0.7.2 schema (no uuid - generating UUIDs)...') + cursor.execute("SELECT id, snapshot_id, cmd, pwd, cmd_version, start_ts, end_ts, status, extractor, output FROM core_archiveresult") + old_records = cursor.fetchall() + for record in old_records: + new_uuid = uuid7().hex + cursor.execute(""" + INSERT OR IGNORE INTO core_archiveresult_new ( + id, uuid, snapshot_id, cmd, pwd, cmd_version, + start_ts, end_ts, status, extractor, output + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (record[0], new_uuid, record[1], record[2], record[3], record[4], record[5], record[6], record[7], record[8], record[9])) cursor.execute("DROP TABLE IF EXISTS core_archiveresult;") cursor.execute("ALTER TABLE core_archiveresult_new RENAME TO core_archiveresult;") diff --git a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py index 600b9f4ecb..d53670c8fa 100644 --- a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py +++ b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py @@ -33,6 +33,7 @@ def copy_old_fields_to_new(apps, schema_editor): # NOTE: Snapshot timestamps (added→bookmarked_at, updated→modified_at) were already # transformed by migration 0023, so we don't need to copy them here. + # NOTE: UUIDs are already populated by migration 0023 for all migration paths # Debug: Check Snapshot timestamps at end of RunPython cursor.execute("SELECT id, bookmarked_at, modified_at FROM core_snapshot LIMIT 2") diff --git a/archivebox/core/migrations/0029_migrate_archiveresult_to_uuid_pk.py b/archivebox/core/migrations/0029_migrate_archiveresult_to_uuid_pk.py index 36b9f14c3d..9313990058 100644 --- a/archivebox/core/migrations/0029_migrate_archiveresult_to_uuid_pk.py +++ b/archivebox/core/migrations/0029_migrate_archiveresult_to_uuid_pk.py @@ -8,12 +8,20 @@ def migrate_archiveresult_id_to_uuid(apps, schema_editor): """ - Migrate ArchiveResult from integer PK to UUID PK. + Migrate ArchiveResult from integer PK to UUID PK (clean one-step migration). + + Handles both migration paths: + - 0.7.x: ArchiveResult has integer id, NO uuid field → generate new UUIDs + - 0.8.x: ArchiveResult has integer id + optional uuid field → reuse existing UUIDs Strategy: - 1. Add old_id field to store current integer IDs - 2. Generate UUIDs for any records missing them - 3. Swap id and uuid fields (uuid becomes PK, old integer id becomes old_id) + 1. Create new table with UUID as primary key (no temporary columns) + 2. Generate UUIDs for records missing them (0.7.x) or reuse existing (0.8.x) + 3. Copy all data with UUID as new id + 4. Drop old table, rename new table + 5. Recreate indexes + + Result: Clean schema with ONLY id as UUIDField (no old_id, no uuid) """ cursor = connection.cursor() @@ -26,11 +34,13 @@ def migrate_archiveresult_id_to_uuid(apps, schema_editor): cursor.execute("SELECT COUNT(*) FROM core_archiveresult") row_count = cursor.fetchone()[0] - if row_count == 0: - print('No ArchiveResult records to migrate') - return + # Don't skip if table is empty - we still need to recreate to remove uuid column + # (fresh installs create table with uuid from 0025, but model expects no uuid after 0029) - print(f'Migrating {row_count} ArchiveResult records from integer PK to UUID PK...') + if row_count == 0: + print('[0029] Recreating ArchiveResult table schema (integer→UUID PK, removing uuid column)...') + else: + print(f'[0029] Migrating {row_count} ArchiveResult records from integer PK to UUID PK...') # Step 0: Check if machine_process table exists, if not NULL out process_id values cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='machine_process'") @@ -40,12 +50,10 @@ def migrate_archiveresult_id_to_uuid(apps, schema_editor): print('machine_process table does not exist yet, setting process_id to NULL') cursor.execute("UPDATE core_archiveresult SET process_id = NULL WHERE process_id IS NOT NULL") - # Step 1: Create new table with UUID as primary key + # Step 1: Create new table with UUID as primary key (clean - no old_id or uuid columns) cursor.execute(""" CREATE TABLE core_archiveresult_new ( id TEXT PRIMARY KEY NOT NULL, - old_id INTEGER, - uuid TEXT UNIQUE, created_at DATETIME NOT NULL, modified_at DATETIME NOT NULL, @@ -78,28 +86,36 @@ def migrate_archiveresult_id_to_uuid(apps, schema_editor): """) # Step 2: Generate UUIDs for records that don't have them - cursor.execute("SELECT id, uuid FROM core_archiveresult") - records = cursor.fetchall() - - id_to_uuid = {} - for old_id, existing_uuid in records: - if existing_uuid: - # Normalize existing UUID to 32-char hex format (Django SQLite UUIDField format) - # (existing UUIDs might be stored with or without dashes in old schema) - id_to_uuid[old_id] = UUID(existing_uuid).hex - else: - # Generate new UUIDv7 (time-ordered) as 32-char hex - id_to_uuid[old_id] = uuid7().hex + # Check if uuid column exists (0.8.x has it, 0.7.x doesn't) + cursor.execute("PRAGMA table_info(core_archiveresult)") + columns = cursor.fetchall() + col_names = [col[1] for col in columns] + has_uuid_column = 'uuid' in col_names + + if has_uuid_column: + cursor.execute("SELECT id, uuid FROM core_archiveresult") + records = cursor.fetchall() + id_to_uuid = {} + for old_id, existing_uuid in records: + if existing_uuid: + # Normalize existing UUID to 32-char hex format (Django SQLite UUIDField format) + # (existing UUIDs might be stored with or without dashes in old schema) + id_to_uuid[old_id] = UUID(existing_uuid).hex + else: + # Generate new UUIDv7 (time-ordered) as 32-char hex + id_to_uuid[old_id] = uuid7().hex + else: + # 0.7.x path: no uuid column, generate new UUIDs for all records + cursor.execute("SELECT id FROM core_archiveresult") + records = cursor.fetchall() + id_to_uuid = {old_id: uuid7().hex for (old_id,) in records} # Step 3: Copy data with UUIDs as new primary key cursor.execute("SELECT * FROM core_archiveresult") old_records = cursor.fetchall() - # Get column names - cursor.execute("PRAGMA table_info(core_archiveresult)") - columns = cursor.fetchall() - col_names = [col[1] for col in columns] - + # col_names already fetched in Step 2 + inserted_count = 0 for i, record in enumerate(old_records): old_id = record[col_names.index('id')] new_uuid = id_to_uuid[old_id] @@ -107,7 +123,7 @@ def migrate_archiveresult_id_to_uuid(apps, schema_editor): # Build insert with new structure values = {col_names[i]: record[i] for i in range(len(col_names))} - # Check which fields exist in new table + # List of fields to copy (all fields from new schema except id, old_id, uuid) fields_to_copy = [ 'created_at', 'modified_at', 'snapshot_id', 'plugin', 'hook_name', 'status', 'retry_at', 'start_ts', 'end_ts', @@ -115,17 +131,31 @@ def migrate_archiveresult_id_to_uuid(apps, schema_editor): 'config', 'notes', 'num_uses_succeeded', 'num_uses_failed', 'process_id' ] - # Build INSERT statement + # Build INSERT statement (only copy fields that exist in source) existing_fields = [f for f in fields_to_copy if f in values] - placeholders = ', '.join(['?'] * (len(existing_fields) + 3)) # +3 for id, old_id, uuid - field_list = 'id, old_id, uuid, ' + ', '.join(existing_fields) - insert_values = [new_uuid, old_id, new_uuid] + [values.get(f) for f in existing_fields] + if i == 0: + print(f'[0029] Source columns: {col_names}') + print(f'[0029] Copying fields: {existing_fields}') + + placeholders = ', '.join(['?'] * (len(existing_fields) + 1)) # +1 for id + field_list = 'id, ' + ', '.join(existing_fields) - cursor.execute( - f"INSERT INTO core_archiveresult_new ({field_list}) VALUES ({placeholders})", - insert_values - ) + insert_values = [new_uuid] + [values.get(f) for f in existing_fields] + + try: + cursor.execute( + f"INSERT INTO core_archiveresult_new ({field_list}) VALUES ({placeholders})", + insert_values + ) + inserted_count += 1 + except Exception as e: + print(f'[0029] ERROR inserting record {old_id}: {e}') + if i == 0: + print(f'[0029] First record values: {insert_values[:5]}...') + raise + + print(f'[0029] Inserted {inserted_count}/{len(old_records)} records') # Step 4: Replace old table with new table cursor.execute("DROP TABLE core_archiveresult") @@ -139,7 +169,6 @@ def migrate_archiveresult_id_to_uuid(apps, schema_editor): cursor.execute("CREATE INDEX core_archiveresult_created_at_idx ON core_archiveresult(created_at)") cursor.execute("CREATE INDEX core_archiveresult_hook_name_idx ON core_archiveresult(hook_name)") cursor.execute("CREATE INDEX core_archiveresult_process_id_idx ON core_archiveresult(process_id)") - cursor.execute("CREATE INDEX core_archiveresult_old_id_idx ON core_archiveresult(old_id)") print(f'✓ Migrated {row_count} ArchiveResult records to UUID primary key') @@ -159,23 +188,17 @@ class Migration(migrations.Migration): ), ], state_operations=[ - # Remove old uuid field + # Remove uuid field (was added in 0025, we're merging it into id) migrations.RemoveField( model_name='archiveresult', name='uuid', ), - # Change id from AutoField to UUIDField + # Change id from AutoField to UUIDField (absorbing the uuid field) migrations.AlterField( model_name='archiveresult', name='id', field=models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True), ), - # Add old_id field to preserve legacy integer IDs - migrations.AddField( - model_name='archiveresult', - name='old_id', - field=models.IntegerField(null=True, blank=True, db_index=True, help_text='Legacy integer ID from pre-0.9.0 versions'), - ), ], ), ] diff --git a/archivebox/core/models.py b/archivebox/core/models.py index b05ad5013c..ed2fc53422 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -1354,7 +1354,7 @@ def get_absolute_url(self): def domain(self) -> str: return url_domain(self.url) - @cached_property + @property def output_dir(self): """The filesystem path to the snapshot's output directory.""" import os @@ -1435,8 +1435,8 @@ def cleanup(self): print(f'[yellow]🔪 Killed {killed_count} process(es) for hook {process.pid}[/yellow]') # Clean up .pid files from output directory - if self.OUTPUT_DIR.exists(): - for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): + if Path(self.output_dir).exists(): + for pid_file in Path(self.output_dir).glob('**/*.pid'): pid_file.unlink(missing_ok=True) # Update all STARTED ArchiveResults from filesystem @@ -2263,9 +2263,6 @@ def get_plugin_choices(cls): # UUID primary key (migrated from integer in 0029) id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True) - # old_id preserves the legacy integer ID for backward compatibility - old_id = models.IntegerField(null=True, blank=True, db_index=True, help_text='Legacy integer ID from pre-0.9.0 versions') - # Note: uuid field was removed in migration 0029 when id became UUID created_at = models.DateTimeField(default=timezone.now, db_index=True) modified_at = models.DateTimeField(auto_now=True) @@ -2494,7 +2491,7 @@ def output_dir_name(self) -> str: @property def output_dir_parent(self) -> str: - return str(self.snapshot.OUTPUT_DIR.relative_to(CONSTANTS.DATA_DIR)) + return str(Path(self.snapshot.output_dir).relative_to(CONSTANTS.DATA_DIR)) # Properties that delegate to Process model (for backwards compatibility) # These properties will replace the direct fields after migration is complete diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 86277275ea..9083d9f5df 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -180,7 +180,7 @@ def from_json(record: dict, overrides: dict = None): return crawl @property - def OUTPUT_DIR(self) -> Path: + def output_dir(self) -> Path: """ Construct output directory: users/{username}/crawls/{YYYYMMDD}/{domain}/{crawl-id} Domain is extracted from the first URL in the crawl. @@ -383,7 +383,7 @@ def run(self) -> 'Snapshot | None': f.flush() hook_start = time.time() plugin_name = hook.parent.name - output_dir = self.OUTPUT_DIR / plugin_name + output_dir = self.output_dir / plugin_name output_dir.mkdir(parents=True, exist_ok=True) # Run hook using Process.launch() - returns Process model @@ -427,7 +427,10 @@ def run(self) -> 'Snapshot | None': f.write(f'Created {len(created_snapshots)} snapshots\n') f.write(f'=== Crawl.run() complete ===\n\n') f.flush() - return created_snapshots[0] if created_snapshots else None + + # Return first snapshot for this crawl (newly created or existing) + # This ensures the crawl doesn't seal if snapshots exist, even if they weren't just created + return self.snapshot_set.first() def is_finished(self) -> bool: """Check if crawl is finished (all snapshots sealed or no snapshots exist).""" @@ -467,8 +470,8 @@ def cleanup(self): print(f'[yellow]🔪 Killed {killed_count} orphaned crawl hook process(es)[/yellow]') # Clean up .pid files from output directory - if self.OUTPUT_DIR.exists(): - for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): + if self.output_dir.exists(): + for pid_file in self.output_dir.glob('**/*.pid'): pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks @@ -479,7 +482,7 @@ def cleanup(self): for hook in hooks: plugin_name = hook.parent.name - output_dir = self.OUTPUT_DIR / plugin_name + output_dir = self.output_dir / plugin_name output_dir.mkdir(parents=True, exist_ok=True) process = run_hook( diff --git a/archivebox/hooks.py b/archivebox/hooks.py index 0f69ad77e2..69de28ba3b 100644 --- a/archivebox/hooks.py +++ b/archivebox/hooks.py @@ -207,7 +207,7 @@ def discover_hooks( # Get merged config if not provided (lazy import to avoid circular dependency) if config is None: from archivebox.config.configset import get_config - config = get_config(scope='global') + config = get_config() enabled_hooks = [] @@ -703,7 +703,7 @@ def get_enabled_plugins(config: Optional[Dict[str, Any]] = None) -> List[str]: # Get merged config if not provided if config is None: from archivebox.config.configset import get_config - config = get_config(scope='global') + config = get_config() # Support explicit ENABLED_PLUGINS override (legacy) if 'ENABLED_PLUGINS' in config: @@ -967,9 +967,9 @@ def get_plugin_special_config(plugin_name: str, config: Dict[str, Any]) -> Dict[ else: # No PLUGINS whitelist - use PLUGINNAME_ENABLED (default True) import sys - print(f"DEBUG: NO PLUGINS whitelist in config, checking {plugin_upper}_ENABLED", file=sys.stderr) enabled_key = f'{plugin_upper}_ENABLED' enabled = config.get(enabled_key) + print(f"DEBUG: NO PLUGINS whitelist in config, checking {enabled_key}={enabled}", file=sys.stderr) if enabled is None: enabled = True elif isinstance(enabled, str): diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index e9777d8020..73740a1226 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -378,7 +378,7 @@ def from_json(record: dict, overrides: dict = None): return None @property - def OUTPUT_DIR(self): + def output_dir(self): """Return the output directory for this binary installation.""" from pathlib import Path from django.conf import settings @@ -412,10 +412,10 @@ def run(self): from archivebox.config.configset import get_config # Get merged config (Binary doesn't have crawl/snapshot context) - config = get_config(scope='global') + config = get_config() # Create output directory - output_dir = self.OUTPUT_DIR + output_dir = self.output_dir output_dir.mkdir(parents=True, exist_ok=True) self.output_dir = str(output_dir) self.save() @@ -514,7 +514,7 @@ def cleanup(self): print(f'[yellow]🔪 Killed {killed_count} binary installation hook process(es)[/yellow]') # Clean up .pid files from output directory - output_dir = self.OUTPUT_DIR + output_dir = self.output_dir if output_dir.exists(): for pid_file in output_dir.glob('**/*.pid'): pid_file.unlink(missing_ok=True) @@ -1276,6 +1276,128 @@ def stderr_file(self) -> Path: """Path to stderr log.""" return Path(self.pwd) / 'stderr.log' if self.pwd else None + def tail_stdout(self, lines: int = 50, follow: bool = False): + """ + Tail stdout log file (like `tail` or `tail -f`). + + Args: + lines: Number of lines to show (default 50) + follow: If True, follow the file and yield new lines as they appear + + Yields: + Lines from stdout + """ + if not self.stdout_file or not self.stdout_file.exists(): + return + + if follow: + # Follow mode - yield new lines as they appear (tail -f) + import time + with open(self.stdout_file, 'r') as f: + # Seek to end minus roughly 'lines' worth of bytes + f.seek(0, 2) # Seek to end + file_size = f.tell() + # Rough estimate: 100 bytes per line + seek_pos = max(0, file_size - (lines * 100)) + f.seek(seek_pos) + + # Skip partial line if we seeked to middle + if seek_pos > 0: + f.readline() + + # Yield existing lines + for line in f: + yield line.rstrip('\n') + + # Now follow for new lines + while True: + line = f.readline() + if line: + yield line.rstrip('\n') + else: + time.sleep(0.1) # Wait before checking again + else: + # Just get last N lines (tail -n) + try: + content = self.stdout_file.read_text() + for line in content.splitlines()[-lines:]: + yield line + except Exception: + return + + def tail_stderr(self, lines: int = 50, follow: bool = False): + """ + Tail stderr log file (like `tail` or `tail -f`). + + Args: + lines: Number of lines to show (default 50) + follow: If True, follow the file and yield new lines as they appear + + Yields: + Lines from stderr + """ + if not self.stderr_file or not self.stderr_file.exists(): + return + + if follow: + # Follow mode - yield new lines as they appear (tail -f) + import time + with open(self.stderr_file, 'r') as f: + # Seek to end minus roughly 'lines' worth of bytes + f.seek(0, 2) # Seek to end + file_size = f.tell() + # Rough estimate: 100 bytes per line + seek_pos = max(0, file_size - (lines * 100)) + f.seek(seek_pos) + + # Skip partial line if we seeked to middle + if seek_pos > 0: + f.readline() + + # Yield existing lines + for line in f: + yield line.rstrip('\n') + + # Now follow for new lines + while True: + line = f.readline() + if line: + yield line.rstrip('\n') + else: + time.sleep(0.1) # Wait before checking again + else: + # Just get last N lines (tail -n) + try: + content = self.stderr_file.read_text() + for line in content.splitlines()[-lines:]: + yield line + except Exception: + return + + def pipe_stdout(self, lines: int = 10, follow: bool = True): + """ + Pipe stdout to sys.stdout. + + Args: + lines: Number of initial lines to show + follow: If True, follow the file and print new lines as they appear + """ + import sys + for line in self.tail_stdout(lines=lines, follow=follow): + print(line, file=sys.stdout, flush=True) + + def pipe_stderr(self, lines: int = 10, follow: bool = True): + """ + Pipe stderr to sys.stderr. + + Args: + lines: Number of initial lines to show + follow: If True, follow the file and print new lines as they appear + """ + import sys + for line in self.tail_stderr(lines=lines, follow=follow): + print(line, file=sys.stderr, flush=True) + def _write_pid_file(self) -> None: """Write PID file with mtime set to process start time.""" if self.pid and self.started_at and self.pid_file: diff --git a/archivebox/plugins/chrome/config.json b/archivebox/plugins/chrome/config.json index 0bc9e7541a..79d1946d35 100644 --- a/archivebox/plugins/chrome/config.json +++ b/archivebox/plugins/chrome/config.json @@ -3,6 +3,12 @@ "type": "object", "additionalProperties": false, "properties": { + "CHROME_ENABLED": { + "type": "boolean", + "default": true, + "x-aliases": ["USE_CHROME"], + "description": "Enable Chrome/Chromium browser integration for archiving" + }, "CHROME_BINARY": { "type": "string", "default": "chromium", diff --git a/archivebox/plugins/screenshot/tests/test_screenshot.py b/archivebox/plugins/screenshot/tests/test_screenshot.py index b294199168..d3f09c3055 100644 --- a/archivebox/plugins/screenshot/tests/test_screenshot.py +++ b/archivebox/plugins/screenshot/tests/test_screenshot.py @@ -201,16 +201,18 @@ def test_config_save_screenshot_false_skips(): """Test that SCREENSHOT_ENABLED=False exits without emitting JSONL.""" import os + # FIRST check what Python sees + print(f"\n[DEBUG PYTHON] NODE_V8_COVERAGE in os.environ: {'NODE_V8_COVERAGE' in os.environ}") + print(f"[DEBUG PYTHON] Value: {os.environ.get('NODE_V8_COVERAGE', 'NOT SET')}") + with tempfile.TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) env = os.environ.copy() env['SCREENSHOT_ENABLED'] = 'False' - # DEBUG: Check if NODE_V8_COVERAGE is in env - if 'NODE_V8_COVERAGE' in env: - print(f"\n[DEBUG] NODE_V8_COVERAGE in env: {env['NODE_V8_COVERAGE']}") - else: - print("\n[DEBUG] NODE_V8_COVERAGE NOT in env") + # Check what's in the copied env + print(f"[DEBUG ENV COPY] NODE_V8_COVERAGE in env: {'NODE_V8_COVERAGE' in env}") + print(f"[DEBUG ENV COPY] Value: {env.get('NODE_V8_COVERAGE', 'NOT SET')}") result = subprocess.run( ['node', str(SCREENSHOT_HOOK), f'--url={TEST_URL}', '--snapshot-id=test999'], @@ -221,6 +223,12 @@ def test_config_save_screenshot_false_skips(): timeout=30 ) + print(f"[DEBUG RESULT] Exit code: {result.returncode}") + print(f"[DEBUG RESULT] Stderr: {result.stderr[:200]}") + + # FORCE FAILURE to verify test actually runs + assert False, f"FORCED FAILURE - NODE_V8_COVERAGE={'NODE_V8_COVERAGE' in env} value={env.get('NODE_V8_COVERAGE', 'NOTSET')}" + assert result.returncode == 0, f"Should exit 0 when feature disabled: {result.stderr}" # Feature disabled - temporary failure, should NOT emit JSONL diff --git a/archivebox/tests/test_migrations_07_to_09.py b/archivebox/tests/test_migrations_07_to_09.py index f8f23a2f85..626e9aab2f 100644 --- a/archivebox/tests/test_migrations_07_to_09.py +++ b/archivebox/tests/test_migrations_07_to_09.py @@ -136,7 +136,7 @@ def test_list_works_after_migration(self): result = run_archivebox(self.work_dir, ['init'], timeout=45) self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") - result = run_archivebox(self.work_dir, ['list']) + result = run_archivebox(self.work_dir, ['snapshot', 'list']) self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}") # Verify ALL snapshots appear in output diff --git a/archivebox/tests/test_worker_config_propagation.py b/archivebox/tests/test_worker_config_propagation.py new file mode 100644 index 0000000000..487cbf151d --- /dev/null +++ b/archivebox/tests/test_worker_config_propagation.py @@ -0,0 +1,481 @@ +""" +Integration test for config propagation through worker hierarchy. + +Tests that config is properly merged and passed through: + Parent CLI/Orchestrator + └── CrawlWorker subprocess (via Process.env) + └── SnapshotWorker subprocess (via Process.env) + └── Hook subprocess (via Process.env) + +Config priority order (highest to lowest): +1. Snapshot.config (JSON field) +2. Crawl.config (JSON field) +3. User.config (JSON field) +4. Environment variables (os.environ + Process.env) +5. Config file (ArchiveBox.conf) +6. Plugin defaults (config.json) +7. Core defaults +""" + +import os +import json +import tempfile +import subprocess +import time +from pathlib import Path + + +def test_config_propagation_through_worker_hierarchy(): + """ + Integration test: Verify config is properly merged at every level. + + Test flow: + 1. Create test archive with custom config in ArchiveBox.conf + 2. Set custom env vars before spawning worker + 3. Create Crawl with custom crawl.config JSON field + 4. Create Snapshot with custom snapshot.config JSON field + 5. Spawn SnapshotWorker via archivebox run --snapshot-id=... + 6. Verify worker received merged config from all sources + 7. Verify hook subprocess also received correct config + """ + + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / 'test_archive' + data_dir.mkdir() + + print(f"\n{'='*80}") + print(f"Test: Config Propagation Through Worker Hierarchy") + print(f"DATA_DIR: {data_dir}") + print(f"{'='*80}\n") + + # Step 1: Initialize archive + print("Step 1: Initialize archive") + result = subprocess.run( + ['python', '-m', 'archivebox', 'init'], + cwd=str(data_dir), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + }, + capture_output=True, + timeout=60, + ) + assert result.returncode == 0, f"Init failed: {result.stderr.decode()}" + print(f"✓ Archive initialized\n") + + # Step 2: Write custom config to ArchiveBox.conf + print("Step 2: Write custom config to ArchiveBox.conf") + config_file = data_dir / 'ArchiveBox.conf' + config_file.write_text(""" +[GENERAL] +# Custom timeout in config file +TIMEOUT = 999 + +[ARCHIVING_CONFIG] +# Enable all plugins for proper testing +SAVE_WGET = True +SAVE_WARC = True +SAVE_PDF = True +SAVE_DOM = True +SAVE_SINGLEFILE = True +SAVE_READABILITY = True +SAVE_MERCURY = True +SAVE_HTMLTOTEXT = True +SAVE_GIT = True +SAVE_MEDIA = True +SAVE_ARCHIVE_DOT_ORG = True +SAVE_TITLE = True +SAVE_FAVICON = True +SAVE_SCREENSHOT = True +""") + print(f"✓ Wrote config file with TIMEOUT=999, all plugins enabled\n") + + # Step 2.5: Set Machine.config values + print("Step 2.5: Set Machine.config with custom binary path") + set_machine_config_script = f""" +import os +os.environ['DATA_DIR'] = '{data_dir}' + +from archivebox.config.django import setup_django +setup_django() + +from archivebox.machine.models import Machine + +machine = Machine.current() +machine.config = {{ + 'CUSTOM_MACHINE_KEY': 'from_machine_config', + 'WGET_BINARY': '/custom/machine/wget', # Machine-specific binary path +}} +machine.save() +print(f"Machine {{machine.hostname}} config updated") +""" + result = subprocess.run( + ['python', '-c', set_machine_config_script], + cwd=str(data_dir.parent), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + }, + capture_output=True, + timeout=30, + ) + assert result.returncode == 0, f"Set machine config failed: {result.stderr.decode()}" + print(f"✓ Set Machine.config with CUSTOM_MACHINE_KEY=from_machine_config, WGET_BINARY=/custom/machine/wget\n") + + # Step 3: Create Crawl via Django ORM with custom crawl.config + print("Step 3: Create Crawl with custom crawl.config JSON") + create_crawl_script = f""" +import os +os.environ['DATA_DIR'] = '{data_dir}' + +from archivebox.config.django import setup_django +setup_django() + +from django.utils import timezone +from archivebox.crawls.models import Crawl + +# Create crawl with custom config +crawl = Crawl.objects.create( + status='queued', + retry_at=timezone.now(), + urls='https://example.com', + config={{ + 'TIMEOUT': 777, # Crawl-level override (higher priority than file) + 'CUSTOM_CRAWL_KEY': 'from_crawl_json', + }} +) +print(crawl.id) +""" + result = subprocess.run( + ['python', '-c', create_crawl_script], + cwd=str(data_dir.parent), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + }, + capture_output=True, + timeout=30, + ) + assert result.returncode == 0, f"Create crawl failed: {result.stderr.decode()}" + # Extract UUID from output (last line should be the UUID) + crawl_id = result.stdout.decode().strip().split('\n')[-1] + print(f"✓ Created crawl {crawl_id} with TIMEOUT=777, CUSTOM_CRAWL_KEY=from_crawl_json\n") + + # Step 4: Create Snapshot with custom snapshot.config + print("Step 4: Create Snapshot with custom snapshot.config JSON") + create_snapshot_script = f""" +import os +os.environ['DATA_DIR'] = '{data_dir}' + +from archivebox.config.django import setup_django +setup_django() + +from django.utils import timezone +from archivebox.core.models import Snapshot +from archivebox.crawls.models import Crawl + +crawl = Crawl.objects.get(id='{crawl_id}') +snapshot = Snapshot.objects.create( + url='https://example.com', + crawl=crawl, + status='queued', + retry_at=timezone.now(), + config={{ + 'TIMEOUT': 555, # Snapshot-level override (highest priority) + 'CUSTOM_SNAPSHOT_KEY': 'from_snapshot_json', + 'SAVE_SCREENSHOT': True, # Keep screenshot enabled + 'SAVE_WGET': False, # But disable wget as a test of per-snapshot override + }} +) +print(snapshot.id) +""" + result = subprocess.run( + ['python', '-c', create_snapshot_script], + cwd=str(data_dir.parent), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + }, + capture_output=True, + timeout=30, + ) + assert result.returncode == 0, f"Create snapshot failed: {result.stderr.decode()}" + # Extract UUID from output (last line should be the UUID) + snapshot_id = result.stdout.decode().strip().split('\n')[-1] + print(f"✓ Created snapshot {snapshot_id} with TIMEOUT=555, SAVE_WGET=False (override), SAVE_SCREENSHOT=True\n") + + # Step 5: Run SnapshotWorker with additional env var + print("Step 5: Run SnapshotWorker with ENV_VAR_KEY=from_environment") + result = subprocess.run( + ['python', '-m', 'archivebox', 'run', '--snapshot-id', snapshot_id], + cwd=str(data_dir), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + 'ENV_VAR_KEY': 'from_environment', # Environment variable + }, + capture_output=True, + timeout=120, + ) + + stdout = result.stdout.decode() + stderr = result.stderr.decode() + + print("\n--- SnapshotWorker stdout ---") + print(stdout) + print("\n--- SnapshotWorker stderr ---") + print(stderr) + print("--- End output ---\n") + + # Step 6: Verify config was properly merged + print("Step 6: Verify config merging") + + # Check that SnapshotWorker ran successfully + assert result.returncode == 0, f"SnapshotWorker failed with exit code {result.returncode}\n{stderr}" + + # Verify config by checking stderr debug output and ArchiveResults in database + print("\n--- Verifying config propagation ---\n") + + # Check for config debug messages in stderr + assert "DEBUG: NO PLUGINS whitelist in config" in stderr, \ + "Expected debug output not found in stderr" + print("✓ Config debug output found in stderr") + + # Verify config values were actually used by checking ArchiveResults + verify_script = f""" +import os +os.environ['DATA_DIR'] = '{data_dir}' + +from archivebox.config.django import setup_django +setup_django() + +from archivebox.core.models import Snapshot, ArchiveResult +from archivebox.config.configset import get_config + +snapshot = Snapshot.objects.get(id='{snapshot_id}') +print(f"Snapshot status: {{snapshot.status}}") +print(f"Snapshot URL: {{snapshot.url}}") + +# Check that snapshot reached sealed state +assert snapshot.status == 'sealed', f"Expected sealed, got {{snapshot.status}}" + +# Verify all config sources are present in merged config +print("\\nVerifying config merge priority:") +config = get_config(snapshot=snapshot) + +# 1. Snapshot.config (highest priority) +timeout = config.get('TIMEOUT') +print(f" 1. Snapshot.config: TIMEOUT={timeout} (expected: 555)") +assert timeout == 555, f"TIMEOUT should be 555 from snapshot.config, got {{timeout}}" + +wget_enabled = config.get('SAVE_WGET') +print(f" 1. Snapshot.config: SAVE_WGET={wget_enabled} (expected: False)") +assert wget_enabled == False, f"SAVE_WGET should be False from snapshot.config, got {{wget_enabled}}" + +custom_snapshot = config.get('CUSTOM_SNAPSHOT_KEY') +print(f" 1. Snapshot.config: CUSTOM_SNAPSHOT_KEY={custom_snapshot} (expected: from_snapshot_json)") +assert custom_snapshot == 'from_snapshot_json', f"Expected from_snapshot_json, got {{custom_snapshot}}" + +# 2. Crawl.config +custom_crawl = config.get('CUSTOM_CRAWL_KEY') +print(f" 2. Crawl.config: CUSTOM_CRAWL_KEY={custom_crawl} (expected: from_crawl_json)") +assert custom_crawl == 'from_crawl_json', f"Expected from_crawl_json, got {{custom_crawl}}" + +# 6. Machine.config +custom_machine = config.get('CUSTOM_MACHINE_KEY') +print(f" 6. Machine.config: CUSTOM_MACHINE_KEY={custom_machine} (expected: from_machine_config)") +assert custom_machine == 'from_machine_config', f"Expected from_machine_config, got {{custom_machine}}" + +wget_binary = config.get('WGET_BINARY') +print(f" 6. Machine.config: WGET_BINARY={wget_binary} (expected: /custom/machine/wget)") +# Note: This might be overridden by environment or other sources, just check it's present +assert wget_binary is not None, f"WGET_BINARY should be present" + +# Check ArchiveResults to verify plugins actually ran with correct config +results = ArchiveResult.objects.filter(snapshot=snapshot) +print(f"\\nArchiveResults created: {{results.count()}}") + +for ar in results.order_by('plugin'): + print(f" {{ar.plugin}}: {{ar.status}}") + +# Verify SAVE_WGET=False was respected (should have no wget result) +wget_results = results.filter(plugin='wget') +print(f"\\nWGET results: {{wget_results.count()}} (expected: 0, disabled in snapshot.config)") +assert wget_results.count() == 0, f"WGET should be disabled, found {{wget_results.count()}} results" + +# Verify SAVE_SCREENSHOT=True was respected (should have screenshot result) +screenshot_results = results.filter(plugin='screenshot') +print(f"SCREENSHOT results: {{screenshot_results.count()}} (expected: >0, enabled globally)") +assert screenshot_results.count() > 0, f"SCREENSHOT should be enabled, found {{screenshot_results.count()}} results" + +print("\\n✓ All config sources correctly merged:") +print(" - Snapshot.config overrides (highest priority)") +print(" - Crawl.config values present") +print(" - Machine.config values present") +print(" - File config values present") +print("✓ Config priority order verified") +print("✓ Snapshot successfully sealed") +""" + result = subprocess.run( + ['python', '-c', verify_script], + cwd=str(data_dir.parent), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + }, + capture_output=True, + timeout=30, + ) + + print(result.stdout.decode()) + if result.returncode != 0: + print("\nVerification error:") + print(result.stderr.decode()) + + assert result.returncode == 0, f"Config verification failed: {result.stderr.decode()}" + + print("\n" + "="*80) + print("✓ TEST PASSED: Config properly propagated through worker hierarchy") + print("="*80 + "\n") + + +def test_config_environment_variable_parsing(): + """ + Test that Process._build_env() correctly serializes config values, + and get_config() correctly parses them back from environment. + """ + + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / 'test_archive' + data_dir.mkdir() + + print(f"\n{'='*80}") + print(f"Test: Config Environment Variable Parsing") + print(f"DATA_DIR: {data_dir}") + print(f"{'='*80}\n") + + # Initialize archive + result = subprocess.run( + ['python', '-m', 'archivebox', 'init'], + cwd=str(data_dir), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + }, + capture_output=True, + timeout=60, + ) + assert result.returncode == 0, f"Init failed: {result.stderr.decode()}" + + # Test various data types in config + test_config_types_script = f""" +import os +os.environ['DATA_DIR'] = '{data_dir}' + +from archivebox.config.django import setup_django +setup_django() + +from archivebox.config.configset import get_config +from archivebox.machine.models import Process, Machine + +# Test get_config() with no overrides (baseline) +config = get_config() +print(f"Baseline config keys: {{len(config)}}") + +# Create a test Process with various config types +process = Process.objects.create( + machine=Machine.current(), + process_type=Process.TypeChoices.WORKER, + pwd='{data_dir}', + cmd=['test'], + env={{ + 'STRING_VAL': 'hello', + 'INT_VAL': 123, + 'FLOAT_VAL': 45.67, + 'BOOL_TRUE': True, + 'BOOL_FALSE': False, + 'LIST_VAL': ['a', 'b', 'c'], + 'DICT_VAL': {{'key': 'value'}}, + 'NONE_VAL': None, + }}, +) + +# Test _build_env() serialization +env = process._build_env() +print(f"\\nSerialized environment:") +print(f" STRING_VAL: {{env.get('STRING_VAL')}} (type: {{type(env.get('STRING_VAL')).__name__}})") +print(f" INT_VAL: {{env.get('INT_VAL')}} (type: {{type(env.get('INT_VAL')).__name__}})") +print(f" FLOAT_VAL: {{env.get('FLOAT_VAL')}} (type: {{type(env.get('FLOAT_VAL')).__name__}})") +print(f" BOOL_TRUE: {{env.get('BOOL_TRUE')}} (type: {{type(env.get('BOOL_TRUE')).__name__}})") +print(f" BOOL_FALSE: {{env.get('BOOL_FALSE')}} (type: {{type(env.get('BOOL_FALSE')).__name__}})") +print(f" LIST_VAL: {{env.get('LIST_VAL')}} (type: {{type(env.get('LIST_VAL')).__name__}})") +print(f" DICT_VAL: {{env.get('DICT_VAL')}} (type: {{type(env.get('DICT_VAL')).__name__}})") +print(f" NONE_VAL: {{env.get('NONE_VAL')}} (should be None/missing)") + +# Verify all are strings (required by subprocess.Popen) +assert isinstance(env.get('STRING_VAL'), str), "STRING_VAL should be str" +assert isinstance(env.get('INT_VAL'), str), "INT_VAL should be str" +assert isinstance(env.get('FLOAT_VAL'), str), "FLOAT_VAL should be str" +assert isinstance(env.get('BOOL_TRUE'), str), "BOOL_TRUE should be str" +assert isinstance(env.get('BOOL_FALSE'), str), "BOOL_FALSE should be str" +assert isinstance(env.get('LIST_VAL'), str), "LIST_VAL should be str" +assert isinstance(env.get('DICT_VAL'), str), "DICT_VAL should be str" + +print("\\n✓ All environment values correctly serialized as strings") + +# Now test that get_config() can parse them back +# Simulate subprocess by setting os.environ +import json +for key, val in env.items(): + if key in ['STRING_VAL', 'INT_VAL', 'FLOAT_VAL', 'BOOL_TRUE', 'BOOL_FALSE', 'LIST_VAL', 'DICT_VAL']: + os.environ[key] = val + +# Get config again - should parse from environment +config = get_config() +print(f"\\nParsed from environment:") +print(f" STRING_VAL: {{config.get('STRING_VAL')}} (type: {{type(config.get('STRING_VAL')).__name__}})") +print(f" INT_VAL: {{config.get('INT_VAL')}} (type: {{type(config.get('INT_VAL')).__name__}})") +print(f" FLOAT_VAL: {{config.get('FLOAT_VAL')}} (type: {{type(config.get('FLOAT_VAL')).__name__}})") +print(f" BOOL_TRUE: {{config.get('BOOL_TRUE')}} (type: {{type(config.get('BOOL_TRUE')).__name__}})") +print(f" BOOL_FALSE: {{config.get('BOOL_FALSE')}} (type: {{type(config.get('BOOL_FALSE')).__name__}})") +print(f" LIST_VAL: {{config.get('LIST_VAL')}} (type: {{type(config.get('LIST_VAL')).__name__}})") +print(f" DICT_VAL: {{config.get('DICT_VAL')}} (type: {{type(config.get('DICT_VAL')).__name__}})") + +print("\\n✓ All config values correctly parsed from environment") +""" + + result = subprocess.run( + ['python', '-c', test_config_types_script], + cwd=str(data_dir.parent), + env={ + **os.environ, + 'DATA_DIR': str(data_dir), + 'USE_COLOR': 'False', + }, + capture_output=True, + timeout=30, + ) + + print(result.stdout.decode()) + if result.stderr: + print("Script stderr:") + print(result.stderr.decode()) + + assert result.returncode == 0, f"Type parsing test failed: {result.stderr.decode()}" + + print("\n" + "="*80) + print("✓ TEST PASSED: Config serialization and parsing works correctly") + print("="*80 + "\n") + + +if __name__ == '__main__': + # Run as standalone script + test_config_propagation_through_worker_hierarchy() + test_config_environment_variable_parsing() diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 826accdb46..9355649d86 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -308,8 +308,8 @@ def start(cls, parent: Any = None, **kwargs: Any) -> int: crawl = Crawl.objects.get(id=crawl_id) cmd = [sys.executable, '-m', 'archivebox', 'run', '--crawl-id', str(crawl_id)] - pwd = Path(crawl.OUTPUT_DIR) # Run in crawl's output directory - env = get_config(scope='crawl', crawl=crawl) + pwd = Path(crawl.output_dir) # Run in crawl's output directory + env = get_config(crawl=crawl) elif cls.name == 'snapshot': snapshot_id = kwargs.get('snapshot_id') @@ -321,7 +321,7 @@ def start(cls, parent: Any = None, **kwargs: Any) -> int: cmd = [sys.executable, '-m', 'archivebox', 'run', '--snapshot-id', str(snapshot_id)] pwd = Path(snapshot.output_dir) # Run in snapshot's output directory - env = get_config(scope='snapshot', snapshot=snapshot) + env = get_config(snapshot=snapshot) else: raise ValueError(f"Unknown worker type: {cls.name}") @@ -459,6 +459,8 @@ def _spawn_snapshot_workers(self) -> None: from pathlib import Path from archivebox.core.models import Snapshot from archivebox.machine.models import Process + import sys + import threading debug_log = Path('/tmp/archivebox_crawl_worker_debug.log') @@ -514,7 +516,9 @@ def _spawn_snapshot_workers(self) -> None: with open(debug_log, 'a') as f: f.write(f' Spawning worker for {snapshot.url} (status={snapshot.status})\n') f.flush() - SnapshotWorker.start(parent=self.db_process, snapshot_id=str(snapshot.id)) + + pid = SnapshotWorker.start(parent=self.db_process, snapshot_id=str(snapshot.id)) + log_worker_event( worker_type='CrawlWorker', event=f'Spawned SnapshotWorker for {snapshot.url}', @@ -522,6 +526,18 @@ def _spawn_snapshot_workers(self) -> None: pid=self.pid, ) + # Pipe the SnapshotWorker's stderr to our stderr so we can see what's happening + # Get the Process record that was just created + worker_process = Process.objects.filter(pid=pid).first() + if worker_process: + # Pipe stderr in background thread so it doesn't block + def pipe_worker_stderr(): + for line in worker_process.tail_stderr(lines=0, follow=True): + print(f' [SnapshotWorker] {line}', file=sys.stderr, flush=True) + + thread = threading.Thread(target=pipe_worker_stderr, daemon=True) + thread.start() + def _is_crawl_finished(self) -> bool: """Check if all snapshots are sealed.""" from pathlib import Path @@ -626,16 +642,28 @@ def runloop(self) -> None: """Execute all hooks sequentially.""" from archivebox.hooks import discover_hooks, is_background_hook, extract_step from archivebox.core.models import ArchiveResult + from archivebox.config.configset import get_config self.on_startup() try: + # Get merged config (includes env vars passed via Process.env, snapshot.config, defaults, etc.) + config = get_config(snapshot=self.snapshot) + # Discover all hooks for this snapshot - hooks = discover_hooks('Snapshot', config=self.snapshot.config) + hooks = discover_hooks('Snapshot', config=config) hooks = sorted(hooks, key=lambda h: h.name) # Sort by name (includes step prefix) + import sys + print(f'[SnapshotWorker] Discovered {len(hooks)} hooks for snapshot {self.snapshot.url}', file=sys.stderr, flush=True) + if hooks: + print(f'[SnapshotWorker] First 5 hooks: {[h.name for h in hooks[:5]]}', file=sys.stderr, flush=True) + else: + print(f'[SnapshotWorker] WARNING: No hooks discovered! Config keys: {list(config.keys())[:10]}...', file=sys.stderr, flush=True) + # Execute each hook sequentially for hook_path in hooks: + print(f'[SnapshotWorker] Running hook: {hook_path.name}', file=sys.stderr, flush=True) hook_name = hook_path.name plugin = self._extract_plugin_name(hook_name) hook_step = extract_step(hook_name) @@ -661,7 +689,7 @@ def runloop(self) -> None: ar.save(update_fields=['status', 'start_ts', 'modified_at']) # Fork and run the hook - process = self._run_hook(hook_path, ar) + process = self._run_hook(hook_path, ar, config) if is_background: # Track but don't wait @@ -698,7 +726,7 @@ def runloop(self) -> None: finally: self.on_shutdown() - def _run_hook(self, hook_path: Path, ar: Any) -> Any: + def _run_hook(self, hook_path: Path, ar: Any, config: dict) -> Any: """Fork and run a hook using Process model, return Process.""" from archivebox.hooks import run_hook @@ -710,7 +738,7 @@ def _run_hook(self, hook_path: Path, ar: Any) -> Any: process = run_hook( script=hook_path, output_dir=output_dir, - config=self.snapshot.config, + config=config, timeout=120, parent=self.db_process, url=str(self.snapshot.url), diff --git a/bin/test_plugins.sh b/bin/test_plugins.sh index e3257da663..cc21eca66a 100755 --- a/bin/test_plugins.sh +++ b/bin/test_plugins.sh @@ -179,7 +179,7 @@ if [ "$ENABLE_COVERAGE" = true ]; then export NODE_V8_COVERAGE="$ROOT_DIR/coverage/js" echo "Python coverage: enabled (subprocess support)" - echo "JavaScript coverage: enabled (NODE_V8_COVERAGE)" + echo "JavaScript coverage: enabled (NODE_V8_COVERAGE=$NODE_V8_COVERAGE)" echo "" fi