Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"jinja2",
"markdown",
"questionary",
"watchdog",
]

[project.urls]
Expand Down
228 changes: 227 additions & 1 deletion src/claude_code_transcripts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import subprocess
import tempfile
import time
import webbrowser
from datetime import datetime
from pathlib import Path
Expand All @@ -18,6 +19,8 @@
from jinja2 import Environment, PackageLoader
import markdown
import questionary
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# Set up Jinja2 environment
_jinja_env = Environment(
Expand Down Expand Up @@ -380,6 +383,106 @@ def generate_batch_html(
}


def generate_incremental_html(
source_folder,
output_dir,
changed_files,
include_agents=False,
progress_callback=None,
):
"""Incrementally regenerate HTML for only the changed session files.

Only regenerates the sessions corresponding to the changed files,
updates the affected project indexes, and updates the master index.

Args:
source_folder: Path to the Claude projects folder
output_dir: Path for output archive
changed_files: Set of Path objects for files that have changed
include_agents: Whether to include agent-* session files
progress_callback: Optional callback(project_name, session_name, current, total)

Returns statistics dict with sessions_regenerated, failed_sessions, projects_updated.
"""
source_folder = Path(source_folder)
output_dir = Path(output_dir)

# Filter to only JSONL files that exist
jsonl_files = {f for f in changed_files if f.suffix == ".jsonl" and f.exists()}

# Skip agent files unless requested
if not include_agents:
jsonl_files = {f for f in jsonl_files if not f.name.startswith("agent-")}

# Track which projects need their index updated
affected_projects = set()
sessions_regenerated = 0
failed_sessions = []

total_count = len(jsonl_files)
processed_count = 0

for session_file in jsonl_files:
# Get summary and skip boring sessions
summary = get_session_summary(session_file)
if summary.lower() == "warmup" or summary == "(no summary)":
continue

# Determine project info
project_folder = session_file.parent
project_key = project_folder.name
project_name = get_project_display_name(project_key)

# Track this project as needing index update
affected_projects.add(project_key)

# Generate session HTML
session_name = session_file.stem
project_dir = output_dir / project_name
project_dir.mkdir(exist_ok=True)
session_dir = project_dir / session_name

try:
generate_html(session_file, session_dir)
sessions_regenerated += 1
except Exception as e:
failed_sessions.append(
{
"project": project_name,
"session": session_name,
"error": str(e),
}
)

processed_count += 1
if progress_callback:
progress_callback(project_name, session_name, processed_count, total_count)

# Rebuild project indexes for affected projects only
# We need the full project data to generate the index
all_projects = find_all_sessions(source_folder, include_agents=include_agents)
projects_by_key = {}
for project in all_projects:
# Match by the original folder name
project_folder_name = project["path"].name
projects_by_key[project_folder_name] = project

for project_key in affected_projects:
if project_key in projects_by_key:
project = projects_by_key[project_key]
project_dir = output_dir / project["name"]
_generate_project_index(project, project_dir)

# Always regenerate master index (session counts might have changed)
_generate_master_index(all_projects, output_dir)

return {
"sessions_regenerated": sessions_regenerated,
"failed_sessions": failed_sessions,
"projects_updated": len(affected_projects),
}


def _generate_project_index(project, output_dir):
"""Generate index.html for a single project."""
template = get_template("project_index.html")
Expand Down Expand Up @@ -505,6 +608,59 @@ class CredentialsError(Exception):
pass


class TranscriptWatcher(FileSystemEventHandler):
"""File system event handler for watching transcript source directory."""

def __init__(self, output_dir, debounce_seconds, quiet=False):
super().__init__()
self.output_dir = output_dir
self.debounce_seconds = debounce_seconds
self.quiet = quiet
self.last_trigger_time = 0
self.pending_update = False
self.changed_files = set()
self.generation_callback = None

def should_process_event(self, event):
"""Check if we should process this file system event."""
if event.is_directory:
return False

event_path = Path(event.src_path)
try:
if self.output_dir in event_path.parents or event_path == self.output_dir:
return False
except (ValueError, AttributeError):
pass

ignore_patterns = [".tmp", ".swp", "~", ".DS_Store", "__pycache__"]
if any(pattern in event_path.name for pattern in ignore_patterns):
return False

return True

def on_any_event(self, event):
"""Handle file system events."""
if not self.should_process_event(event):
return
self.pending_update = True
self.changed_files.add(Path(event.src_path))
self.last_trigger_time = time.time()

def check_and_update(self):
"""Check if enough time has passed and trigger regeneration if needed."""
if not self.pending_update:
return

time_since_last_trigger = time.time() - self.last_trigger_time
if time_since_last_trigger >= self.debounce_seconds:
self.pending_update = False
changed = self.changed_files.copy()
self.changed_files.clear()
if self.generation_callback:
self.generation_callback(changed)


def get_access_token_from_keychain():
"""Get access token from macOS keychain.

Expand Down Expand Up @@ -2030,7 +2186,20 @@ def web_cmd(
is_flag=True,
help="Suppress all output except errors.",
)
def all_cmd(source, output, include_agents, dry_run, open_browser, quiet):
@click.option(
"--watch",
is_flag=True,
help="Watch for changes and regenerate automatically.",
)
@click.option(
"--debounce",
type=int,
default=60,
help="Seconds to wait after last change before regenerating (default: 60).",
)
def all_cmd(
source, output, include_agents, dry_run, open_browser, quiet, watch, debounce
):
"""Convert all local Claude Code sessions to a browsable HTML archive.

Creates a directory structure with:
Expand All @@ -2049,6 +2218,63 @@ def all_cmd(source, output, include_agents, dry_run, open_browser, quiet):

output = Path(output)

if watch:
_run_watch_mode(source, output, include_agents, open_browser, quiet, debounce)
else:
_run_all_generation(
source, output, include_agents, dry_run, open_browser, quiet
)


def _run_watch_mode(source, output, include_agents, open_browser, quiet, debounce):
"""Watch source directory and regenerate transcripts on changes."""

def run_incremental_generation(changed_files):
start_time = time.time()
if not quiet:
click.echo(f"\nRegenerating {len(changed_files)} changed file(s)...")

stats = generate_incremental_html(
source, output, changed_files, include_agents=include_agents
)

duration = time.time() - start_time
if not quiet:
click.echo(
f"Regenerated {stats['sessions_regenerated']} session(s) ({duration:.1f}s)"
)

# Initial full generation
if not quiet:
click.echo(f"Generating initial archive...")
_run_all_generation(source, output, include_agents, False, False, quiet)

if open_browser:
index_url = (output / "index.html").resolve().as_uri()
webbrowser.open(index_url)

if not quiet:
click.echo(f"\nWatching {source} for changes...")
click.echo("Press Ctrl+C to stop.")

observer = Observer()
handler = TranscriptWatcher(output, debounce, quiet)
handler.generation_callback = run_incremental_generation
observer.schedule(handler, str(source), recursive=True)
observer.start()

try:
while True:
time.sleep(1)
handler.check_and_update()
except KeyboardInterrupt:
observer.stop()

observer.join()


def _run_all_generation(source, output, include_agents, dry_run, open_browser, quiet):
"""Run the all-sessions generation logic."""
if not quiet:
click.echo(f"Scanning {source}...")

Expand Down
Loading