Skip to content

Conversation

@gabe-l-hart
Copy link

@gabe-l-hart gabe-l-hart commented Jan 21, 2026

Description

This PR addresses several sources of memory leaks in the current docling-serve implementation caused by logic in this library. This is part of the investigation of docling-project/docling-serve#366 and docling-project/docling-serve#474.

  • Redis keys not correctly purged: When /clear/results is invoked with the RQ engine, the content from redis that stored the job (including the full document contents) was not being deleted.
  • Local engine not garbage collecting: When /clear/converters is invoked, the local cache was cleared for the converters, but they were never garbage collected which could cause OOMs if enough other memory was allocated before garbage collection happened.
  • Local engine not clearing worker ConverterManagers: When use_shared_manager is set to False (the default), each AsyncLocalWorker creates its own DoclingConverterManager instance which is not tracked by the parent LocalOrchestrator and therefore not cleared when /clear/converters is invoked.
    • ⚠️ This seems to have been the biggest source of leaking memory for the local backend! See discussion below about the real root cause.
  • Ephemeral ChunkerManagers never cleared: When process_chunk_results is called to process a CHUNK task, it creates an ephemeral instance of the DocumentChunkerManager which in turn uses an @lru_cache to store the internal chunker. These ephemeral instances go out of scope when process_chunk_results terminates, but the purge semantics of the cached internal chunker are not clear, so it's possible that this was leaking cached instances.
    • This did not show big improvements, but is still tighter code, so I kept it

Further Investigation

The fixes in this PR should solve the memory leak issues IF (and only if) the user invokes both /clear/results and /clear/converters regularly. Memory will continue to accumulate if this doesn't happen. My current best guess as to why memory accumulates, despite the caching for the converters is related to docling-project/docling#2209 which suggests there may be memory leaks in the core of docling's DocumentConverter. Without purging the converters regularly, the cached instances of the converters are free to leak and accumulate memory over time.

Given this, I suspect that the RQ backend has not yet been fully solved since we don't have proper clear_converters implementation in the RQOrchestrator which will likely lead the redis workers to accumulate over time.

Testing

This fix was developed in conjunction with Claude Code and Bob which produced the following scripts to measure the memory leak:

reproduce_memory_leak.py
#!/usr/bin/env python3
"""
Reproduction script for memory leak issue #366 and #474.

This script:
1. Automatically starts RQ worker and docling-serve as subprocesses
2. Submits multiple documents for processing
3. Measures Redis, worker, and server memory usage throughout
4. Tests the /v1/clear endpoints to verify cleanup
5. Reports detailed memory leak metrics

Usage:
    # 1. Start Redis server (if not already running):
    # docker run -d -p 6379:6379 redis:latest

    # 2. Run this script (it manages everything else):
    # uv run python reproduce_memory_leak.py

    # Optional arguments:
    # --iterations N     Process N documents (default: 5)
    # --keep-running     Keep services running after test (for manual inspection)
"""

import argparse
import asyncio
import atexit
import base64
import gc
import json
import os
import psutil
import signal
import subprocess
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional

import httpx
import redis


class ProcessManager:
    """Manage RQ worker and docling-serve subprocesses."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_url = redis_url
        # self.worker_process: Optional[subprocess.Popen] = None
        self.server_process: Optional[subprocess.Popen] = None
        self.worker_pid: Optional[int] = None
        self.server_pid: Optional[int] = None

        # Register cleanup on exit
        atexit.register(self.cleanup)

    # def start_worker(self) -> bool:
    #     """Start the RQ worker subprocess."""
    #     print("\n🚀 Starting RQ worker...")

    #     env = os.environ.copy()
    #     env["DOCLING_SERVE_ENG_KIND"] = "rq"
    #     env["DOCLING_SERVE_ENG_RQ_REDIS_URL"] = self.redis_url

    #     try:
    #         self.worker_process = subprocess.Popen(
    #             ["uv", "run", "docling-serve", "rq-worker"],
    #             env=env,
    #             stdout=subprocess.PIPE,
    #             stderr=subprocess.PIPE,
    #             text=True,
    #         )

    #         # Wait for worker to start and get its PID
    #         time.sleep(3)
    #         if self.worker_process.poll() is None:
    #             self.worker_pid = self.worker_process.pid
    #             print(f"✓ RQ worker started (PID: {self.worker_pid})")
    #             return True
    #         else:
    #             stderr = self.worker_process.stderr.read() if self.worker_process.stderr else ""
    #             print(f"✗ RQ worker failed to start: {stderr}")
    #             return False
    #     except Exception as e:
    #         print(f"✗ Failed to start RQ worker: {e}")
    #         return False

    def start_server(self) -> bool:
        """Start the docling-serve subprocess."""
        print("🚀 Starting docling-serve...")

        # env = os.environ.copy()
        # env["DOCLING_SERVE_ENG_KIND"] = "rq"
        # env["DOCLING_SERVE_ENG_RQ_REDIS_URL"] = self.redis_url

        try:
            self.server_process = subprocess.Popen(
                ["uv", "run", "docling-serve", "run"],
                # env=env,
                # stdout=subprocess.PIPE,
                # stderr=subprocess.PIPE,
                text=True,
            )

            # Wait for server to start and get its PID
            time.sleep(5)
            if self.server_process.poll() is None:
                self.server_pid = self.server_process.pid
                print(f"✓ Docling-serve started (PID: {self.server_pid})")
                return True
            else:
                stderr = self.server_process.stderr.read() if self.server_process.stderr else ""
                print(f"✗ Docling-serve failed to start: {stderr}")
                return False
        except Exception as e:
            print(f"✗ Failed to start docling-serve: {e}")
            return False

    def get_worker_memory_mb(self) -> float:
        """Get worker process memory in MB."""
        if self.worker_pid:
            try:
                process = psutil.Process(self.worker_pid)
                return process.memory_info().rss / 1024 / 1024
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return 0.0

    def get_server_memory_mb(self) -> float:
        """Get server process memory in MB."""
        if self.server_pid:
            try:
                process = psutil.Process(self.server_pid)
                # Include child processes (uvicorn workers)
                memory = process.memory_info().rss
                for child in process.children(recursive=True):
                    try:
                        memory += child.memory_info().rss
                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        pass
                return memory / 1024 / 1024
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return 0.0

    def cleanup(self):
        """Stop all managed processes."""
        print("\n🧹 Cleaning up processes...")

        if self.server_process:
            try:
                self.server_process.terminate()
                self.server_process.wait(timeout=5)
                print("✓ Docling-serve stopped")
            except subprocess.TimeoutExpired:
                self.server_process.kill()
                print("✓ Docling-serve killed (forced)")
            except Exception as e:
                print(f"Warning: Error stopping server: {e}")

        # if self.worker_process:
        #     try:
        #         self.worker_process.terminate()
        #         self.worker_process.wait(timeout=5)
        #         print("✓ RQ worker stopped")
        #     except subprocess.TimeoutExpired:
        #         self.worker_process.kill()
        #         print("✓ RQ worker killed (forced)")
        #     except Exception as e:
        #         print(f"Warning: Error stopping worker: {e}")


class MemoryMonitor:
    """Monitor process and Redis memory usage."""

    def __init__(self, redis_url: str = "redis://localhost:6379", process_manager: Optional[ProcessManager] = None):
        self.process = psutil.Process()
        self.measurements: List[Dict[str, float]] = []
        # self.redis_client = redis.from_url(redis_url)
        self.process_manager = process_manager

    def get_memory_mb(self) -> float:
        """Get current process memory usage in MB."""
        return self.process.memory_info().rss / 1024 / 1024

    # def get_redis_memory_mb(self) -> float:
    #     """Get current Redis memory usage in MB."""
    #     try:
    #         info = self.redis_client.info("memory")
    #         return info["used_memory"] / 1024 / 1024
    #     except Exception as e:
    #         print(f"Warning: Could not get Redis memory: {e}")
    #         return 0.0

    # def get_redis_key_count(self) -> int:
    #     """Get total number of keys in Redis."""
    #     try:
    #         return self.redis_client.dbsize()
    #     except Exception as e:
    #         print(f"Warning: Could not get Redis key count: {e}")
    #         return 0

    def measure(self, label: str) -> Dict[str, float]:
        """Take a memory measurement with label."""
        gc.collect()  # Force garbage collection before measuring
        time.sleep(0.5)  # Let GC settle

        mem_mb = self.get_memory_mb()
        # redis_mb = self.get_redis_memory_mb()
        # redis_keys = self.get_redis_key_count()

        # Get subprocess memory if available
        # worker_mb = self.process_manager.get_worker_memory_mb() if self.process_manager else 0.0
        server_mb = self.process_manager.get_server_memory_mb() if self.process_manager else 0.0

        measurement = {
            "label": label,
            "memory_mb": mem_mb,
            # "redis_mb": redis_mb,
            # "redis_keys": redis_keys,
            # "worker_mb": worker_mb,
            "server_mb": server_mb,
            "timestamp": time.time()
        }
        self.measurements.append(measurement)

        print(f"[MEMORY] {label}: Script={mem_mb:.2f} MB, Server={server_mb:.2f} MB")
        # if self.process_manager:
        #     print(f"[MEMORY] {label}: Script={mem_mb:.2f} MB, Worker={worker_mb:.2f} MB, "
        #           f"Server={server_mb:.2f} MB, Redis={redis_mb:.2f} MB ({redis_keys} keys)")
        # else:
        #     print(f"[MEMORY] {label}: Process={mem_mb:.2f} MB, Redis={redis_mb:.2f} MB ({redis_keys} keys)")

        return measurement

    def report(self):
        """Generate memory leak report."""
        if len(self.measurements) < 2:
            print("Not enough measurements to report")
            return

        baseline = self.measurements[0]["memory_mb"]
        # redis_baseline = self.measurements[0]["redis_mb"]
        # keys_baseline = self.measurements[0]["redis_keys"]
        # worker_baseline = self.measurements[0].get("worker_mb", 0)
        server_baseline = self.measurements[0].get("server_mb", 0)

        print("\n" + "="*100)
        print("MEMORY LEAK REPORT")
        print("="*100)

        print(f"{'Label':40} {'Script':>10} {'Server':>10}")
        print("-"*100)
        for m in self.measurements:
            print(f"{m['label']:40} {m['memory_mb']:8.2f} MB  "
                    f"{m.get('server_mb', 0):8.2f} MB")

        # Calculate leak indicators
        after_processing = next(
            (m for m in self.measurements if m["label"] == "After all processing complete"),
            None
        )
        after_clear = next(
            (m for m in self.measurements if m["label"] == "After /v1/clear/results"),
            None
        )

        if after_processing and after_clear:
            # Process memory
            processing_growth = after_processing["memory_mb"] - baseline
            clear_reduction = after_processing["memory_mb"] - after_clear["memory_mb"]
            remaining_leak = after_clear["memory_mb"] - baseline

            print("\n" + "-"*80)
            print("PROCESS MEMORY:")
            print(f"  Growth during processing: {processing_growth:+.2f} MB")
            print(f"  Freed by /v1/clear/results: {clear_reduction:+.2f} MB")
            print(f"  Remaining leak: {remaining_leak:+.2f} MB")

            # Subprocess memory analysis
            server_growth = after_processing.get("server_mb", 0) - server_baseline
            server_after_clear = after_clear.get("server_mb", 0) - server_baseline

            print("\nSERVER PROCESS MEMORY:")
            print(f"  Growth during processing: {server_growth:+.2f} MB")
            print(f"  Remaining after clear: {server_after_clear:+.2f} MB")

        print("="*100 + "\n")


async def convert_document(client: httpx.AsyncClient, pdf_path: Path, headers: dict) -> str:
    """Convert a document and return task_id."""
    encoded_doc = base64.b64encode(pdf_path.read_bytes()).decode()

    payload = {
        "options": {"to_formats": ["json"]},
        "sources": [
            {
                "kind": "file",
                "base64_string": encoded_doc,
                "filename": pdf_path.name,
            }
        ],
    }

    response = await client.post(
        "/v1/convert/source/async",
        json=payload,
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()

    task_data = response.json()
    return task_data["task_id"]


async def wait_for_task(client: httpx.AsyncClient, task_id: str, headers: dict) -> dict:
    """Wait for a task to complete."""
    while True:
        response = await client.get(
            f"/v1/status/poll/{task_id}",
            headers=headers,
            timeout=60.0
        )
        response.raise_for_status()

        task = response.json()

        if task["task_status"] in ("success", "failure"):
            return task

        await asyncio.sleep(0.5)


async def get_result(client: httpx.AsyncClient, task_id: str, headers: dict) -> dict:
    """Get task result."""
    response = await client.get(
        f"/v1/result/{task_id}",
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()
    return response.json()


async def clear_results(client: httpx.AsyncClient, headers: dict, older_than: float = 0):
    """Call /v1/clear/results endpoint."""
    response = await client.get(
        f"/v1/clear/results?older_then={older_than}",
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()
    return response.json()


async def clear_converters(client: httpx.AsyncClient, headers: dict):
    """Call /v1/clear/converters endpoint."""
    response = await client.get(
        "/v1/clear/converters",
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()
    return response.json()


async def main():
    """Main reproduction script."""

    # Parse arguments
    parser = argparse.ArgumentParser(description="Memory leak reproduction script")
    parser.add_argument("--iterations", type=int, default=5, help="Number of documents to process")
    parser.add_argument("--keep-running", action="store_true", help="Keep services running after test")
    parser.add_argument("--no-subprocesses", action="store_true", help="Don't start subprocesses (use existing services)")
    args = parser.parse_args()

    # Configuration
    BASE_URL = "http://localhost:5001"
    REDIS_URL = "redis://localhost:6379"
    API_KEY = None  # Set if using authentication
    NUM_ITERATIONS = args.iterations

    # Find test PDF
    test_pdf = Path("tests/2408.09869v5.pdf")
    if not test_pdf.exists():
        print(f"Error: Test PDF not found at {test_pdf}")
        print("Please ensure you're running from the docling-serve directory")
        sys.exit(1)

    print(f"Using test PDF: {test_pdf} ({test_pdf.stat().st_size / 1024 / 1024:.2f} MB)")

    # Setup headers
    headers = {}
    if API_KEY:
        headers["X-Api-Key"] = API_KEY

    # Initialize process manager and start services if needed
    process_manager = None
    if not args.no_subprocesses:
        print("\n" + "="*100)
        print("STARTING SERVICES")
        print("="*100)

        process_manager = ProcessManager(redis_url=REDIS_URL)

        # # Start RQ worker
        # if not process_manager.start_worker():
        #     print("Failed to start RQ worker. Exiting.")
        #     sys.exit(1)

        # Start docling-serve
        if not process_manager.start_server():
            print("Failed to start docling-serve. Exiting.")
            process_manager.cleanup()
            sys.exit(1)

        print(f"\n✓ All services started successfully")
        print("="*100)

    # Initialize memory monitor
    monitor = MemoryMonitor(redis_url=REDIS_URL, process_manager=process_manager)
    monitor.measure("Baseline (after services started)")

    try:
        async with httpx.AsyncClient(base_url=BASE_URL) as client:
            # Check server is running
            print(f"\n🔍 Checking server connection...")
            max_retries = 30
            for attempt in range(max_retries):
                try:
                    response = await client.get("/health", timeout=5.0)
                    response.raise_for_status()
                    print(f"✓ Connected to docling-serve at {BASE_URL}")
                    break
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"Waiting for server to be ready... ({attempt + 1}/{max_retries})")
                        await asyncio.sleep(2)
                    else:
                        print(f"✗ Could not connect to docling-serve: {e}")
                        if args.no_subprocesses:
                            print(f"Please start services manually:")
                            print(f"  1. DOCLING_SERVE_ENG_KIND=rq DOCLING_SERVE_ENG_RQ_REDIS_URL={REDIS_URL} uv run docling-serve rq-worker")
                            print(f"  2. DOCLING_SERVE_ENG_KIND=rq DOCLING_SERVE_ENG_RQ_REDIS_URL={REDIS_URL} uv run docling-serve dev --no-reload")
                        sys.exit(1)

            monitor.measure("After server connection")

            # Process documents multiple times to accumulate memory
            task_ids = []

            # await client.get("/mem-dump", timeout=120, params={"label": "baseline"})
            for i in range(NUM_ITERATIONS):
                print(f"\n--- Iteration {i+1}/{NUM_ITERATIONS} ---")

                # Submit document
                task_id = await convert_document(client, test_pdf, headers)
                task_ids.append(task_id)
                print(f"Submitted task: {task_id}")

                # Wait for completion
                task = await wait_for_task(client, task_id, headers)
                print(f"Task completed: {task['task_status']}")

                # Fetch result (this keeps result in memory)
                result = await get_result(client, task_id, headers)
                print(f"Result retrieved: {len(json.dumps(result))} bytes")

                # Measure memory after each iteration
                monitor.measure(f"After iteration {i+1}")

                # Clear results and converters
                # clear_resp = await clear_results(client, headers, older_than=0)
                # clear_conv_resp = await clear_converters(client, headers)
                # monitor.measure(f"After clear iteration {i+1}")

            monitor.measure("After all processing complete")

            # Try clearing converters
            print("\n--- Testing /v1/clear/converters ---")
            clear_conv_resp = await clear_converters(client, headers)
            print(f"Clear converters response: {clear_conv_resp}")
            monitor.measure("After /v1/clear/converters")
            # await client.get("/mem-dump", timeout=120, params={"label": f"clear/converters", "diff_label": "baseline"})
            print(json.dumps((await client.get("/debug/tasks")).json(), indent=2))

            # Try clearing results
            print("\n--- Testing /v1/clear/results ---")
            clear_resp = await clear_results(client, headers, older_than=0)
            print(f"Clear results response: {clear_resp}")
            monitor.measure("After /v1/clear/results")
            # await client.get("/mem-dump", timeout=120, params={"label": f"clear/results", "diff_label": "baseline"})
            print(json.dumps((await client.get("/debug/tasks")).json(), indent=2))

            # Verify results are actually deleted
            print("\n--- Verifying results deleted ---")
            for task_id in task_ids[:2]:  # Check first 2
                try:
                    await get_result(client, task_id, headers)
                    print(f"⚠️  Task {task_id} still has results (should be deleted)")
                except httpx.HTTPStatusError as e:
                    if e.response.status_code == 404:
                        print(f"✓ Task {task_id} properly deleted")
                    else:
                        print(f"✗ Unexpected error for {task_id}: {e}")

            # Final measurement after everything settles
            print("\nWaiting 5 seconds for cleanup to settle...")
            await asyncio.sleep(5)
            monitor.measure("Final (after 5s settle)")

        # Generate report
        monitor.report()

    finally:
        # Cleanup processes unless user wants to keep them running
        if process_manager and not args.keep_running:
            process_manager.cleanup()
        elif args.keep_running:
            print("\n" + "="*100)
            print("SERVICES KEPT RUNNING")
            print("="*100)
            print(f"Worker PID: {process_manager.worker_pid if process_manager else 'N/A'}")
            print(f"Server PID: {process_manager.server_pid if process_manager else 'N/A'}")
            print("\nTo stop services manually:")
            if process_manager:
                print(f"  kill {process_manager.worker_pid} {process_manager.server_pid}")
            print("="*100)


if __name__ == "__main__":
    asyncio.run(main())
reproduce_memory_leak_redis.py
#!/usr/bin/env python3
"""
Reproduction script for memory leak issue #366 and #474.

This script:
1. Automatically starts RQ worker and docling-serve as subprocesses
2. Submits multiple documents for processing
3. Measures Redis, worker, and server memory usage throughout
4. Tests the /v1/clear endpoints to verify cleanup
5. Reports detailed memory leak metrics

Usage:
    # 1. Start Redis server (if not already running):
    # docker run -d -p 6379:6379 redis:latest

    # 2. Run this script (it manages everything else):
    # uv run python reproduce_memory_leak.py

    # Optional arguments:
    # --iterations N     Process N documents (default: 5)
    # --keep-running     Keep services running after test (for manual inspection)
"""

import argparse
import asyncio
import atexit
import base64
import gc
import json
import os
import psutil
import signal
import subprocess
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional

import httpx
import redis


class ProcessManager:
    """Manage RQ worker and docling-serve subprocesses."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_url = redis_url
        self.worker_process: Optional[subprocess.Popen] = None
        self.server_process: Optional[subprocess.Popen] = None
        self.worker_pid: Optional[int] = None
        self.server_pid: Optional[int] = None

        # Register cleanup on exit
        atexit.register(self.cleanup)

    def start_worker(self) -> bool:
        """Start the RQ worker subprocess."""
        print("\n🚀 Starting RQ worker...")

        env = os.environ.copy()
        env["DOCLING_SERVE_ENG_KIND"] = "rq"
        env["DOCLING_SERVE_ENG_RQ_REDIS_URL"] = self.redis_url

        try:
            self.worker_process = subprocess.Popen(
                ["uv", "run", "docling-serve", "rq-worker"],
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
            )

            # Wait for worker to start and get its PID
            time.sleep(3)
            if self.worker_process.poll() is None:
                self.worker_pid = self.worker_process.pid
                print(f"✓ RQ worker started (PID: {self.worker_pid})")
                return True
            else:
                stderr = self.worker_process.stderr.read() if self.worker_process.stderr else ""
                print(f"✗ RQ worker failed to start: {stderr}")
                return False
        except Exception as e:
            print(f"✗ Failed to start RQ worker: {e}")
            return False

    def start_server(self) -> bool:
        """Start the docling-serve subprocess."""
        print("🚀 Starting docling-serve...")

        env = os.environ.copy()
        env["DOCLING_SERVE_ENG_KIND"] = "rq"
        env["DOCLING_SERVE_ENG_RQ_REDIS_URL"] = self.redis_url

        try:
            self.server_process = subprocess.Popen(
                ["uv", "run", "docling-serve", "dev", "--no-reload"],
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
            )

            # Wait for server to start and get its PID
            time.sleep(5)
            if self.server_process.poll() is None:
                self.server_pid = self.server_process.pid
                print(f"✓ Docling-serve started (PID: {self.server_pid})")
                return True
            else:
                stderr = self.server_process.stderr.read() if self.server_process.stderr else ""
                print(f"✗ Docling-serve failed to start: {stderr}")
                return False
        except Exception as e:
            print(f"✗ Failed to start docling-serve: {e}")
            return False

    def get_worker_memory_mb(self) -> float:
        """Get worker process memory in MB."""
        if self.worker_pid:
            try:
                process = psutil.Process(self.worker_pid)
                return process.memory_info().rss / 1024 / 1024
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return 0.0

    def get_server_memory_mb(self) -> float:
        """Get server process memory in MB."""
        if self.server_pid:
            try:
                process = psutil.Process(self.server_pid)
                # Include child processes (uvicorn workers)
                memory = process.memory_info().rss
                for child in process.children(recursive=True):
                    try:
                        memory += child.memory_info().rss
                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        pass
                return memory / 1024 / 1024
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return 0.0

    def cleanup(self):
        """Stop all managed processes."""
        print("\n🧹 Cleaning up processes...")

        if self.server_process:
            try:
                self.server_process.terminate()
                self.server_process.wait(timeout=5)
                print("✓ Docling-serve stopped")
            except subprocess.TimeoutExpired:
                self.server_process.kill()
                print("✓ Docling-serve killed (forced)")
            except Exception as e:
                print(f"Warning: Error stopping server: {e}")

        if self.worker_process:
            try:
                self.worker_process.terminate()
                self.worker_process.wait(timeout=5)
                print("✓ RQ worker stopped")
            except subprocess.TimeoutExpired:
                self.worker_process.kill()
                print("✓ RQ worker killed (forced)")
            except Exception as e:
                print(f"Warning: Error stopping worker: {e}")


class MemoryMonitor:
    """Monitor process and Redis memory usage."""

    def __init__(self, redis_url: str = "redis://localhost:6379", process_manager: Optional[ProcessManager] = None):
        self.process = psutil.Process()
        self.measurements: List[Dict[str, float]] = []
        self.redis_client = redis.from_url(redis_url)
        self.process_manager = process_manager

    def get_memory_mb(self) -> float:
        """Get current process memory usage in MB."""
        return self.process.memory_info().rss / 1024 / 1024

    def get_redis_memory_mb(self) -> float:
        """Get current Redis memory usage in MB."""
        try:
            info = self.redis_client.info("memory")
            return info["used_memory"] / 1024 / 1024
        except Exception as e:
            print(f"Warning: Could not get Redis memory: {e}")
            return 0.0

    def get_redis_key_count(self) -> int:
        """Get total number of keys in Redis."""
        try:
            return self.redis_client.dbsize()
        except Exception as e:
            print(f"Warning: Could not get Redis key count: {e}")
            return 0

    def measure(self, label: str) -> Dict[str, float]:
        """Take a memory measurement with label."""
        gc.collect()  # Force garbage collection before measuring
        time.sleep(0.5)  # Let GC settle

        mem_mb = self.get_memory_mb()
        redis_mb = self.get_redis_memory_mb()
        redis_keys = self.get_redis_key_count()

        # Get subprocess memory if available
        worker_mb = self.process_manager.get_worker_memory_mb() if self.process_manager else 0.0
        server_mb = self.process_manager.get_server_memory_mb() if self.process_manager else 0.0

        measurement = {
            "label": label,
            "memory_mb": mem_mb,
            "redis_mb": redis_mb,
            "redis_keys": redis_keys,
            "worker_mb": worker_mb,
            "server_mb": server_mb,
            "timestamp": time.time()
        }
        self.measurements.append(measurement)

        if self.process_manager:
            print(f"[MEMORY] {label}: Script={mem_mb:.2f} MB, Worker={worker_mb:.2f} MB, "
                  f"Server={server_mb:.2f} MB, Redis={redis_mb:.2f} MB ({redis_keys} keys)")
        else:
            print(f"[MEMORY] {label}: Process={mem_mb:.2f} MB, Redis={redis_mb:.2f} MB ({redis_keys} keys)")

        return measurement

    def report(self):
        """Generate memory leak report."""
        if len(self.measurements) < 2:
            print("Not enough measurements to report")
            return

        baseline = self.measurements[0]["memory_mb"]
        redis_baseline = self.measurements[0]["redis_mb"]
        keys_baseline = self.measurements[0]["redis_keys"]
        worker_baseline = self.measurements[0].get("worker_mb", 0)
        server_baseline = self.measurements[0].get("server_mb", 0)

        has_subprocess_data = worker_baseline > 0 or server_baseline > 0

        print("\n" + "="*100)
        print("MEMORY LEAK REPORT")
        print("="*100)

        if has_subprocess_data:
            print(f"{'Label':40} {'Script':>10} {'Worker':>10} {'Server':>10} {'Redis':>10} {'Keys':>8}")
            print("-"*100)
            for m in self.measurements:
                print(f"{m['label']:40} {m['memory_mb']:8.2f} MB  {m.get('worker_mb', 0):8.2f} MB  "
                      f"{m.get('server_mb', 0):8.2f} MB  {m['redis_mb']:8.2f} MB  {m['redis_keys']:6d} keys")
        else:
            print(f"{'Label':40} {'Process':>12} {'Redis':>12} {'Keys':>8}")
            print("-"*100)
            for m in self.measurements:
                print(f"{m['label']:40} {m['memory_mb']:8.2f} MB  {m['redis_mb']:8.2f} MB  {m['redis_keys']:6d} keys")

        # Calculate leak indicators
        after_processing = next(
            (m for m in self.measurements if m["label"] == "After all processing complete"),
            None
        )
        after_clear = next(
            (m for m in self.measurements if m["label"] == "After /v1/clear/results"),
            None
        )

        if after_processing and after_clear:
            # Process memory
            processing_growth = after_processing["memory_mb"] - baseline
            clear_reduction = after_processing["memory_mb"] - after_clear["memory_mb"]
            remaining_leak = after_clear["memory_mb"] - baseline

            # Redis memory
            redis_growth = after_processing["redis_mb"] - redis_baseline
            redis_freed = after_processing["redis_mb"] - after_clear["redis_mb"]
            redis_remaining = after_clear["redis_mb"] - redis_baseline

            # Redis keys
            keys_growth = after_processing["redis_keys"] - keys_baseline
            keys_freed = after_processing["redis_keys"] - after_clear["redis_keys"]
            keys_remaining = after_clear["redis_keys"] - keys_baseline

            print("\n" + "-"*80)
            print("PROCESS MEMORY:")
            print(f"  Growth during processing: {processing_growth:+.2f} MB")
            print(f"  Freed by /v1/clear/results: {clear_reduction:+.2f} MB")
            print(f"  Remaining leak: {remaining_leak:+.2f} MB")

            print("\nREDIS MEMORY:")
            print(f"  Growth during processing: {redis_growth:+.2f} MB")
            print(f"  Freed by /v1/clear/results: {redis_freed:+.2f} MB")
            print(f"  Remaining leak: {redis_remaining:+.2f} MB")

            print("\nREDIS KEYS:")
            print(f"  Added during processing: {keys_growth:+d} keys")
            print(f"  Deleted by /v1/clear/results: {keys_freed:+d} keys")
            print(f"  Remaining orphaned: {keys_remaining:+d} keys")

            # Subprocess memory analysis
            if has_subprocess_data:
                worker_growth = after_processing.get("worker_mb", 0) - worker_baseline
                server_growth = after_processing.get("server_mb", 0) - server_baseline
                worker_after_clear = after_clear.get("worker_mb", 0) - worker_baseline
                server_after_clear = after_clear.get("server_mb", 0) - server_baseline

                print("\nWORKER PROCESS MEMORY:")
                print(f"  Growth during processing: {worker_growth:+.2f} MB")
                print(f"  Remaining after clear: {worker_after_clear:+.2f} MB")

                print("\nSERVER PROCESS MEMORY:")
                print(f"  Growth during processing: {server_growth:+.2f} MB")
                print(f"  Remaining after clear: {server_after_clear:+.2f} MB")

            # Overall assessment
            print("\n" + "-"*100)
            if redis_remaining > 10 or keys_remaining > 10:
                print("⚠️  REDIS MEMORY LEAK DETECTED - Keys/data not properly cleaned up")
            elif remaining_leak > 50:
                print("⚠️  PROCESS MEMORY LEAK DETECTED")
            elif remaining_leak > 10:
                print("⚠️  Minor process memory leak detected")
            else:
                print("✅ Memory properly cleaned up")

        print("="*100 + "\n")


async def convert_document(client: httpx.AsyncClient, pdf_path: Path, headers: dict) -> str:
    """Convert a document and return task_id."""
    encoded_doc = base64.b64encode(pdf_path.read_bytes()).decode()

    payload = {
        "options": {"to_formats": ["json"]},
        "sources": [
            {
                "kind": "file",
                "base64_string": encoded_doc,
                "filename": pdf_path.name,
            }
        ],
    }

    response = await client.post(
        "/v1/convert/source/async",
        json=payload,
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()

    task_data = response.json()
    return task_data["task_id"]


async def wait_for_task(client: httpx.AsyncClient, task_id: str, headers: dict) -> dict:
    """Wait for a task to complete."""
    while True:
        response = await client.get(
            f"/v1/status/poll/{task_id}",
            headers=headers,
            timeout=60.0
        )
        response.raise_for_status()

        task = response.json()

        if task["task_status"] in ("success", "failure"):
            return task

        await asyncio.sleep(0.5)


async def get_result(client: httpx.AsyncClient, task_id: str, headers: dict) -> dict:
    """Get task result."""
    response = await client.get(
        f"/v1/result/{task_id}",
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()
    return response.json()


async def clear_results(client: httpx.AsyncClient, headers: dict, older_than: float = 0):
    """Call /v1/clear/results endpoint."""
    response = await client.get(
        f"/v1/clear/results?older_then={older_than}",
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()
    return response.json()


async def clear_converters(client: httpx.AsyncClient, headers: dict):
    """Call /v1/clear/converters endpoint."""
    response = await client.get(
        "/v1/clear/converters",
        headers=headers,
        timeout=60.0
    )
    response.raise_for_status()
    return response.json()


async def main():
    """Main reproduction script."""

    # Parse arguments
    parser = argparse.ArgumentParser(description="Memory leak reproduction script")
    parser.add_argument("--iterations", type=int, default=5, help="Number of documents to process")
    parser.add_argument("--keep-running", action="store_true", help="Keep services running after test")
    parser.add_argument("--no-subprocesses", action="store_true", help="Don't start subprocesses (use existing services)")
    args = parser.parse_args()

    # Configuration
    BASE_URL = "http://localhost:5001"
    REDIS_URL = "redis://localhost:6379"
    API_KEY = None  # Set if using authentication
    NUM_ITERATIONS = args.iterations

    # Find test PDF
    test_pdf = Path("tests/2408.09869v5.pdf")
    if not test_pdf.exists():
        print(f"Error: Test PDF not found at {test_pdf}")
        print("Please ensure you're running from the docling-serve directory")
        sys.exit(1)

    print(f"Using test PDF: {test_pdf} ({test_pdf.stat().st_size / 1024 / 1024:.2f} MB)")

    # Setup headers
    headers = {}
    if API_KEY:
        headers["X-Api-Key"] = API_KEY

    # Initialize process manager and start services if needed
    process_manager = None
    if not args.no_subprocesses:
        print("\n" + "="*100)
        print("STARTING SERVICES")
        print("="*100)

        process_manager = ProcessManager(redis_url=REDIS_URL)

        # Start RQ worker
        if not process_manager.start_worker():
            print("Failed to start RQ worker. Exiting.")
            sys.exit(1)

        # Start docling-serve
        if not process_manager.start_server():
            print("Failed to start docling-serve. Exiting.")
            process_manager.cleanup()
            sys.exit(1)

        print(f"\n✓ All services started successfully")
        print("="*100)

    # Initialize memory monitor
    monitor = MemoryMonitor(redis_url=REDIS_URL, process_manager=process_manager)
    monitor.measure("Baseline (after services started)")

    try:
        async with httpx.AsyncClient(base_url=BASE_URL) as client:
            # Check server is running
            print(f"\n🔍 Checking server connection...")
            max_retries = 10
            for attempt in range(max_retries):
                try:
                    response = await client.get("/health", timeout=5.0)
                    response.raise_for_status()
                    print(f"✓ Connected to docling-serve at {BASE_URL}")
                    break
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"Waiting for server to be ready... ({attempt + 1}/{max_retries})")
                        await asyncio.sleep(2)
                    else:
                        print(f"✗ Could not connect to docling-serve: {e}")
                        if args.no_subprocesses:
                            print(f"Please start services manually:")
                            print(f"  1. DOCLING_SERVE_ENG_KIND=rq DOCLING_SERVE_ENG_RQ_REDIS_URL={REDIS_URL} uv run docling-serve rq-worker")
                            print(f"  2. DOCLING_SERVE_ENG_KIND=rq DOCLING_SERVE_ENG_RQ_REDIS_URL={REDIS_URL} uv run docling-serve dev --no-reload")
                        sys.exit(1)

            monitor.measure("After server connection")

            # Process documents multiple times to accumulate memory
            task_ids = []

            for i in range(NUM_ITERATIONS):
                print(f"\n--- Iteration {i+1}/{NUM_ITERATIONS} ---")

                # Submit document
                task_id = await convert_document(client, test_pdf, headers)
                task_ids.append(task_id)
                print(f"Submitted task: {task_id}")

                # Wait for completion
                task = await wait_for_task(client, task_id, headers)
                print(f"Task completed: {task['task_status']}")

                # Fetch result (this keeps result in memory)
                result = await get_result(client, task_id, headers)
                print(f"Result retrieved: {len(json.dumps(result))} bytes")

                # Measure memory after each iteration
                monitor.measure(f"After iteration {i+1}")

            monitor.measure("After all processing complete")

            # Try clearing results
            print("\n--- Testing /v1/clear/results ---")
            clear_resp = await clear_results(client, headers, older_than=0)
            print(f"Clear results response: {clear_resp}")

            monitor.measure("After /v1/clear/results")

            # Verify results are actually deleted
            print("\n--- Verifying results deleted ---")
            for task_id in task_ids[:2]:  # Check first 2
                try:
                    await get_result(client, task_id, headers)
                    print(f"⚠️  Task {task_id} still has results (should be deleted)")
                except httpx.HTTPStatusError as e:
                    if e.response.status_code == 404:
                        print(f"✓ Task {task_id} properly deleted")
                    else:
                        print(f"✗ Unexpected error for {task_id}: {e}")

            # Try clearing converters
            print("\n--- Testing /v1/clear/converters ---")
            clear_conv_resp = await clear_converters(client, headers)
            print(f"Clear converters response: {clear_conv_resp}")

            monitor.measure("After /v1/clear/converters")

            # Final measurement after everything settles
            print("\nWaiting 5 seconds for cleanup to settle...")
            await asyncio.sleep(5)
            monitor.measure("Final (after 5s settle)")

        # Generate report
        monitor.report()

    finally:
        # Cleanup processes unless user wants to keep them running
        if process_manager and not args.keep_running:
            process_manager.cleanup()
        elif args.keep_running:
            print("\n" + "="*100)
            print("SERVICES KEPT RUNNING")
            print("="*100)
            print(f"Worker PID: {process_manager.worker_pid if process_manager else 'N/A'}")
            print(f"Server PID: {process_manager.server_pid if process_manager else 'N/A'}")
            print("\nTo stop services manually:")
            if process_manager:
                print(f"  kill {process_manager.worker_pid} {process_manager.server_pid}")
            print("="*100)


if __name__ == "__main__":
    asyncio.run(main())

Branch: RedisTaskMemoryLeak

Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>
@github-actions
Copy link
Contributor

github-actions bot commented Jan 21, 2026

DCO Check Passed

Thanks @gabe-l-hart, all your commits are properly signed off. 🎉

@mergify
Copy link

mergify bot commented Jan 21, 2026

Merge Protections

Your pull request matches the following merge protections and will not be merged until they are valid.

🟢 Enforce conventional commit

Wonderful, this rule succeeded.

Make sure that we follow https://www.conventionalcommits.org/en/v1.0.0/

  • title ~= ^(fix|feat|docs|style|refactor|perf|test|build|ci|chore|revert)(?:\(.+\))?(!)?:

@dolfim-ibm dolfim-ibm self-requested a review January 22, 2026 07:37
@codecov
Copy link

codecov bot commented Jan 22, 2026

Codecov Report

❌ Patch coverage is 0% with 6 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
docling_jobkit/orchestrators/rq/orchestrator.py 0.00% 6 Missing ⚠️

📢 Thoughts on this report? Let us know!

Branch: RedisTaskMemoryLeak

Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>
Branch: RedisTaskMemoryLeak

Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>
…orkers and clear them

Branch: RedisTaskMemoryLeak

Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>
Branch: RedisTaskMemoryLeak

Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>
Branch: RedisTaskMemoryLeak

Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>
@gabe-l-hart gabe-l-hart changed the title fix: Clean up jobs on completion in the RQOrchestrator fix: Memory leak fixes in RQ and Local engines Jan 22, 2026
@gabe-l-hart
Copy link
Author

I found a number of additional issues leading to memory leakage that should now be fixed in this PR and will update the description accordingly.

@gabe-l-hart
Copy link
Author

gabe-l-hart commented Jan 22, 2026

Some additional numbers from the various fixes to the local engine:


uv run python reproduce_memory_leak.py --iterations 5

baseline

Phase Script Server
Baseline (after services started) 44.64 MB 395.47 MB
After server connection 51.95 MB 1135.36 MB
After iteration 1 61.66 MB 2176.64 MB
After iteration 2 70.14 MB 2408.09 MB
After iteration 3 70.38 MB 2469.28 MB
After iteration 4 70.39 MB 2484.05 MB
After iteration 5 70.39 MB 2490.73 MB
After all processing complete 70.39 MB 2490.73 MB
After /v1/clear/converters 70.39 MB 2366.98 MB
After /v1/clear/results 70.39 MB 2366.98 MB
Final (after 5s settle) 70.41 MB 2366.94 MB

All fixes

Phase Script Server
Baseline (after services started) 44.66 MB 396.97 MB
After server connection 52.02 MB 1135.17 MB
After iteration 1 62.05 MB 2187.38 MB
After iteration 2 70.44 MB 2427.25 MB
After iteration 3 70.64 MB 2432.66 MB
After iteration 4 70.88 MB 2449.69 MB
After iteration 5 70.88 MB 2459.53 MB
After all processing complete 70.88 MB 2459.53 MB
After /v1/clear/converters 70.88 MB 2088.28 MB
After /v1/clear/results 70.88 MB 2088.28 MB
Final (after 5s settle) 70.89 MB 2088.25 MB

gc only

Phase Script Server
Baseline (after services started) 44.83 MB 398.58 MB
After server connection 52.03 MB 1135.38 MB
After iteration 1 61.88 MB 2193.55 MB
After iteration 2 70.33 MB 2430.56 MB
After iteration 3 70.55 MB 2442.55 MB
After iteration 4 70.73 MB 2497.53 MB
After iteration 5 70.73 MB 2512.17 MB
After all processing complete 70.73 MB 2512.17 MB
After /v1/clear/converters 70.73 MB 2388.39 MB
After /v1/clear/results 70.73 MB 2388.39 MB
Final (after 5s settle) 70.75 MB 2388.39 MB

clear chunker manager cache

Phase Script Server
Baseline (after services started) 44.66 MB 393.03 MB
After server connection 52.02 MB 1120.97 MB
After iteration 1 61.59 MB 2165.64 MB
After iteration 2 70.12 MB 2406.75 MB
After iteration 3 70.33 MB 2419.08 MB
After iteration 4 70.34 MB 2433.19 MB
After iteration 5 70.38 MB 2440.72 MB
After all processing complete 70.38 MB 2440.72 MB
After /v1/clear/converters 70.38 MB 2316.97 MB
After /v1/clear/results 70.38 MB 2316.94 MB
Final (after 5s settle) 70.39 MB 2316.94 MB

clear worker convert managers

Phase Script Server
Baseline (after services started) 44.78 MB 396.78 MB
After server connection 52.12 MB 1120.84 MB
After iteration 1 61.84 MB 2186.89 MB
After iteration 2 70.05 MB 2424.06 MB
After iteration 3 70.47 MB 2480.42 MB
After iteration 4 70.70 MB 2495.14 MB
After iteration 5 70.70 MB 2508.83 MB
After all processing complete 70.70 MB 2508.83 MB
After /v1/clear/converters 70.70 MB 2137.58 MB
After /v1/clear/results 70.72 MB 2137.58 MB
Final (after 5s settle) 70.73 MB 2137.53 MB

The trend to note here is that once fixed, the memory for the server should reduce to around the same memory use from iteration 1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant