Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions archivebox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
import sys
from pathlib import Path

# Force unbuffered output for real-time logs
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
os.environ['PYTHONUNBUFFERED'] = '1'

ASCII_LOGO = """
█████╗ ██████╗ ██████╗██╗ ██╗██╗██╗ ██╗███████╗ ██████╗ ██████╗ ██╗ ██╗
██╔══██╗██╔══██╗██╔════╝██║ ██║██║██║ ██║██╔════╝ ██╔══██╗██╔═══██╗╚██╗██╔╝
Expand Down
6 changes: 3 additions & 3 deletions archivebox/base_models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class Meta:

def save(self, *args, **kwargs):
super().save(*args, **kwargs)
self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
# Note: index.json is deprecated, models should use write_index_jsonl() for full data

@property
Expand All @@ -127,5 +127,5 @@ def output_dir_str(self) -> str:
return f'{self.output_dir_parent}/{self.output_dir_name}'

@property
def OUTPUT_DIR(self) -> Path:
return DATA_DIR / self.output_dir_str
def output_dir(self) -> Path:
raise NotImplementedError(f'{self.__class__.__name__} must implement output_dir property')
2 changes: 0 additions & 2 deletions archivebox/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class ArchiveBoxGroup(click.Group):
'manage': 'archivebox.cli.archivebox_manage.main',
# Introspection commands
'pluginmap': 'archivebox.cli.archivebox_pluginmap.main',
# Worker command
'worker': 'archivebox.cli.archivebox_worker.main',
}
all_subcommands = {
**meta_commands,
Expand Down
9 changes: 5 additions & 4 deletions archivebox/cli/archivebox_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ def add(urls: str | list[str],
# Background mode: just queue work and return (orchestrator via server will pick it up)
print('[yellow]\\[*] URLs queued. Orchestrator will process them (run `archivebox server` if not already running).[/yellow]')
else:
# Foreground mode: run orchestrator inline until all work is done
print(f'[green]\\[*] Starting orchestrator to process crawl...[/green]')
orchestrator = Orchestrator(exit_on_idle=True, crawl_id=str(crawl.id))
orchestrator.runloop() # Block until complete
# Foreground mode: run CrawlWorker inline until all work is done
print(f'[green]\\[*] Starting worker to process crawl...[/green]')
from archivebox.workers.worker import CrawlWorker
worker = CrawlWorker(crawl_id=str(crawl.id), worker_id=0)
worker.runloop() # Block until complete

# 6. Return the list of Snapshots in this crawl
return crawl.snapshot_set.all()
Expand Down
67 changes: 0 additions & 67 deletions archivebox/cli/archivebox_orchestrator.py

This file was deleted.

65 changes: 60 additions & 5 deletions archivebox/cli/archivebox_run.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#!/usr/bin/env python3

"""
archivebox run [--daemon]
archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...]

Unified command for processing queued work.

Modes:
- With stdin JSONL: Process piped records, exit when complete
- Without stdin (TTY): Run orchestrator in foreground until killed
- --crawl-id: Run orchestrator for specific crawl only
- --snapshot-id: Run worker for specific snapshot only (internal use)

Examples:
# Run orchestrator in foreground (replaces `archivebox orchestrator`)
# Run orchestrator in foreground
archivebox run

# Run as daemon (don't exit on idle)
Expand All @@ -23,6 +25,12 @@

# Mixed types work too
cat mixed_records.jsonl | archivebox run

# Run orchestrator for specific crawl (shows live progress for that crawl)
archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e

# Run worker for specific snapshot (internal use by orchestrator)
archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
"""

__package__ = 'archivebox.cli'
Expand Down Expand Up @@ -187,15 +195,62 @@ def run_orchestrator(daemon: bool = False) -> int:
return 1


def run_snapshot_worker(snapshot_id: str) -> int:
"""
Run a SnapshotWorker for a specific snapshot.

Args:
snapshot_id: Snapshot UUID to process

Returns exit code (0 = success, 1 = error).
"""
from archivebox.workers.worker import _run_snapshot_worker

try:
_run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0)
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
return 1


@click.command()
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
def main(daemon: bool):
@click.option('--crawl-id', help="Run orchestrator for specific crawl only")
@click.option('--snapshot-id', help="Run worker for specific snapshot only")
def main(daemon: bool, crawl_id: str, snapshot_id: str):
"""
Process queued work.

When stdin is piped: Process those specific records and exit.
When run standalone: Run orchestrator in foreground.
Modes:
- No args + stdin piped: Process piped JSONL records
- No args + TTY: Run orchestrator for all work
- --crawl-id: Run orchestrator for that crawl only
- --snapshot-id: Run worker for that snapshot only
"""
# Snapshot worker mode
if snapshot_id:
sys.exit(run_snapshot_worker(snapshot_id))

# Crawl worker mode
if crawl_id:
from archivebox.workers.worker import CrawlWorker
try:
worker = CrawlWorker(crawl_id=crawl_id, worker_id=0)
worker.runloop()
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)

# Check if stdin has data (non-TTY means piped input)
if not sys.stdin.isatty():
sys.exit(process_stdin_records())
Expand Down
50 changes: 0 additions & 50 deletions archivebox/cli/archivebox_worker.py

This file was deleted.

29 changes: 21 additions & 8 deletions archivebox/config/configset.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ def update_in_place(self, warn: bool = True, persist: bool = False, **kwargs) ->


def get_config(
scope: str = "global",
defaults: Optional[Dict] = None,
persona: Any = None,
user: Any = None,
crawl: Any = None,
snapshot: Any = None,
machine: Any = None,
) -> Dict[str, Any]:
"""
Get merged config from all sources.
Expand All @@ -134,17 +134,18 @@ def get_config(
3. Per-user config (user.config JSON field)
4. Per-persona config (persona.get_derived_config() - includes CHROME_USER_DATA_DIR etc.)
5. Environment variables
6. Config file (ArchiveBox.conf)
7. Plugin schema defaults (config.json)
8. Core config defaults
6. Per-machine config (machine.config JSON field - resolved binary paths)
7. Config file (ArchiveBox.conf)
8. Plugin schema defaults (config.json)
9. Core config defaults

Args:
scope: Config scope ('global', 'crawl', 'snapshot', etc.)
defaults: Default values to start with
persona: Persona object (provides derived paths like CHROME_USER_DATA_DIR)
user: User object with config JSON field
crawl: Crawl object with config JSON field
snapshot: Snapshot object with config JSON field
machine: Machine object with config JSON field (defaults to Machine.current())

Returns:
Merged config dict
Expand Down Expand Up @@ -184,6 +185,18 @@ def get_config(
file_config = BaseConfigSet.load_from_file(config_file)
config.update(file_config)

# Apply machine config overrides (cached binary paths, etc.)
if machine is None:
# Default to current machine if not provided
try:
from archivebox.machine.models import Machine
machine = Machine.current()
except Exception:
pass # Machine might not be available during early init

if machine and hasattr(machine, "config") and machine.config:
config.update(machine.config)

# Override with environment variables
for key in config:
env_val = os.environ.get(key)
Expand Down Expand Up @@ -221,8 +234,8 @@ def get_config(
config.update(crawl.config)

# Add CRAWL_OUTPUT_DIR for snapshot hooks to find shared Chrome session
if crawl and hasattr(crawl, "OUTPUT_DIR"):
config['CRAWL_OUTPUT_DIR'] = str(crawl.OUTPUT_DIR)
if crawl and hasattr(crawl, "output_dir"):
config['CRAWL_OUTPUT_DIR'] = str(crawl.output_dir)

# Apply snapshot config overrides (highest priority)
if snapshot and hasattr(snapshot, "config") and snapshot.config:
Expand Down Expand Up @@ -260,7 +273,7 @@ def get_flat_config() -> Dict[str, Any]:

Replaces abx.pm.hook.get_FLAT_CONFIG()
"""
return get_config(scope="global")
return get_config()


def get_all_configs() -> Dict[str, BaseConfigSet]:
Expand Down
1 change: 1 addition & 0 deletions archivebox/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class ConstantsDict(Mapping):
CRONTABS_DIR_NAME,
"invalid",
"users",
"machine",
# Backwards compatibility with old directory names
"user_plugins", # old name for USER_PLUGINS_DIR (now 'plugins')
"user_templates", # old name for CUSTOM_TEMPLATES_DIR (now 'templates')
Expand Down
18 changes: 15 additions & 3 deletions archivebox/core/migrations/0023_upgrade_to_0_9_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def get_table_columns(table_name):

def upgrade_core_tables(apps, schema_editor):
"""Upgrade core tables from v0.7.2 or v0.8.6rc0 to v0.9.0."""
from archivebox.uuid_compat import uuid7
cursor = connection.cursor()

# Check if core_archiveresult table exists
Expand Down Expand Up @@ -60,8 +61,8 @@ def upgrade_core_tables(apps, schema_editor):

if has_data:
if has_uuid and not has_abid:
# Migrating from v0.7.2 (has uuid, minimal fields)
print('Migrating ArchiveResult from v0.7.2 schema...')
# Migrating from v0.7.2+ (has uuid column)
print('Migrating ArchiveResult from v0.7.2+ schema (with uuid)...')
cursor.execute("""
INSERT OR IGNORE INTO core_archiveresult_new (
id, uuid, snapshot_id, cmd, pwd, cmd_version,
Expand All @@ -86,7 +87,18 @@ def upgrade_core_tables(apps, schema_editor):
FROM core_archiveresult;
""")
else:
print(f'Warning: Unexpected schema - has_uuid={has_uuid}, has_abid={has_abid}')
# Migrating from v0.7.2 (no uuid or abid column - generate fresh UUIDs)
print('Migrating ArchiveResult from v0.7.2 schema (no uuid - generating UUIDs)...')
cursor.execute("SELECT id, snapshot_id, cmd, pwd, cmd_version, start_ts, end_ts, status, extractor, output FROM core_archiveresult")
old_records = cursor.fetchall()
for record in old_records:
new_uuid = uuid7().hex
cursor.execute("""
INSERT OR IGNORE INTO core_archiveresult_new (
id, uuid, snapshot_id, cmd, pwd, cmd_version,
start_ts, end_ts, status, extractor, output
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (record[0], new_uuid, record[1], record[2], record[3], record[4], record[5], record[6], record[7], record[8], record[9]))

cursor.execute("DROP TABLE IF EXISTS core_archiveresult;")
cursor.execute("ALTER TABLE core_archiveresult_new RENAME TO core_archiveresult;")
Expand Down
Loading
Loading