From adf8c175b1adb711767edd30deb361bd6c935c59 Mon Sep 17 00:00:00 2001 From: Gabe Goodhart Date: Tue, 20 Jan 2026 16:29:36 -0700 Subject: [PATCH 1/6] fix: Clean up jobs on completion in the RQOrchestrator Branch: RedisTaskMemoryLeak Signed-off-by: Gabe Goodhart --- docling_jobkit/orchestrators/rq/orchestrator.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/docling_jobkit/orchestrators/rq/orchestrator.py b/docling_jobkit/orchestrators/rq/orchestrator.py index 0bfef89..532ae1d 100644 --- a/docling_jobkit/orchestrators/rq/orchestrator.py +++ b/docling_jobkit/orchestrators/rq/orchestrator.py @@ -226,12 +226,22 @@ async def process_queue(self): async def delete_task(self, task_id: str): _log.info(f"Deleting result of task {task_id=}") + + # Delete the result data from Redis if it exists if task_id in self._task_result_keys: await self._async_redis_conn.delete(self._task_result_keys[task_id]) del self._task_result_keys[task_id] - # TODO: consider also deleting the task - # job = Job.fetch(task_id, connection=self._redis_conn) - # job.delete() + + # Delete the RQ job itself to free up Redis memory + # This includes the job metadata and result stream + try: + job = Job.fetch(task_id, connection=self._redis_conn) + job.delete() + _log.debug(f"Deleted RQ job {task_id=}") + except Exception as e: + # Job may not exist or already be deleted - this is not an error + _log.debug(f"Could not delete RQ job {task_id=}: {e}") + await super().delete_task(task_id) async def warm_up_caches(self): From 3d6055b11dc2c8f39c4dc259877e9781add290fc Mon Sep 17 00:00:00 2001 From: Gabe Goodhart Date: Thu, 22 Jan 2026 14:57:41 -0700 Subject: [PATCH 2/6] fix: Explicitly call gc on clear_converters Branch: RedisTaskMemoryLeak Signed-off-by: Gabe Goodhart --- docling_jobkit/orchestrators/local/orchestrator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docling_jobkit/orchestrators/local/orchestrator.py b/docling_jobkit/orchestrators/local/orchestrator.py index e8eb487..6776952 100644 --- a/docling_jobkit/orchestrators/local/orchestrator.py +++ b/docling_jobkit/orchestrators/local/orchestrator.py @@ -1,4 +1,5 @@ import asyncio +import gc import logging import tempfile import uuid @@ -129,6 +130,7 @@ async def delete_task(self, task_id: str): async def clear_converters(self): self.cm.clear_cache() + gc.collect() async def check_connection(self): pass From a089f3b6a828105d2eb4f284c6987414ba88a37e Mon Sep 17 00:00:00 2001 From: Gabe Goodhart Date: Thu, 22 Jan 2026 15:00:58 -0700 Subject: [PATCH 3/6] fix: Keep a central chunker manager and explicitly clear its cache Branch: RedisTaskMemoryLeak Signed-off-by: Gabe Goodhart --- docling_jobkit/convert/chunking.py | 3 ++- docling_jobkit/orchestrators/local/orchestrator.py | 3 +++ docling_jobkit/orchestrators/local/worker.py | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/docling_jobkit/convert/chunking.py b/docling_jobkit/convert/chunking.py index 1825860..24863a3 100644 --- a/docling_jobkit/convert/chunking.py +++ b/docling_jobkit/convert/chunking.py @@ -220,6 +220,7 @@ def process_chunk_results( task: Task, conv_results: Iterable[ConversionResult], work_dir: Path, + chunker_manager: Optional[DocumentChunkerManager] = None, ) -> DoclingTaskResult: # Let's start by processing the documents start_time = time.monotonic() @@ -234,7 +235,7 @@ def process_chunk_results( num_failed = 0 # TODO: DocumentChunkerManager should be initialized outside for really working as a cache - chunker_manager = DocumentChunkerManager() + chunker_manager = chunker_manager or DocumentChunkerManager() for conv_res in conv_results: errors = conv_res.errors filename = conv_res.input.file.name diff --git a/docling_jobkit/orchestrators/local/orchestrator.py b/docling_jobkit/orchestrators/local/orchestrator.py index 6776952..eab4d8e 100644 --- a/docling_jobkit/orchestrators/local/orchestrator.py +++ b/docling_jobkit/orchestrators/local/orchestrator.py @@ -11,6 +11,7 @@ from docling.datamodel.base_models import InputFormat +from docling_jobkit.convert.chunking import DocumentChunkerManager from docling_jobkit.convert.manager import DoclingConverterManager from docling_jobkit.datamodel.chunking import BaseChunkerOptions, ChunkingExportOptions from docling_jobkit.datamodel.convert import ConvertDocumentsOptions @@ -42,6 +43,7 @@ def __init__( self.task_queue: asyncio.Queue[str] = asyncio.Queue() self.queue_list: list[str] = [] self.cm = converter_manager + self.chunker_manager = DocumentChunkerManager() self._task_results: dict[str, DoclingTaskResult] = {} self.scratch_dir = self.config.scratch_dir or Path( tempfile.mkdtemp(prefix="docling_") @@ -130,6 +132,7 @@ async def delete_task(self, task_id: str): async def clear_converters(self): self.cm.clear_cache() + self.chunker_manager.clear_cache() gc.collect() async def check_connection(self): diff --git a/docling_jobkit/orchestrators/local/worker.py b/docling_jobkit/orchestrators/local/worker.py index a71bfe4..e108b2c 100644 --- a/docling_jobkit/orchestrators/local/worker.py +++ b/docling_jobkit/orchestrators/local/worker.py @@ -94,6 +94,7 @@ def run_task() -> DoclingTaskResult: task=task, conv_results=conv_results, work_dir=workdir, + chunker_manager=self.orchestrator.chunker_manager, ) return processed_results From 898208479e63327582af878be3981d2f5a5181b7 Mon Sep 17 00:00:00 2001 From: Gabe Goodhart Date: Thu, 22 Jan 2026 15:06:23 -0700 Subject: [PATCH 4/6] fix: Keep track of the DoclingConverterManager instances created in workers and clear them Branch: RedisTaskMemoryLeak Signed-off-by: Gabe Goodhart --- docling_jobkit/orchestrators/local/orchestrator.py | 3 +++ docling_jobkit/orchestrators/local/worker.py | 10 +++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/docling_jobkit/orchestrators/local/orchestrator.py b/docling_jobkit/orchestrators/local/orchestrator.py index eab4d8e..1c750db 100644 --- a/docling_jobkit/orchestrators/local/orchestrator.py +++ b/docling_jobkit/orchestrators/local/orchestrator.py @@ -44,6 +44,7 @@ def __init__( self.queue_list: list[str] = [] self.cm = converter_manager self.chunker_manager = DocumentChunkerManager() + self.worker_cms = [] self._task_results: dict[str, DoclingTaskResult] = {} self.scratch_dir = self.config.scratch_dir or Path( tempfile.mkdtemp(prefix="docling_") @@ -133,6 +134,8 @@ async def delete_task(self, task_id: str): async def clear_converters(self): self.cm.clear_cache() self.chunker_manager.clear_cache() + for cm in self.worker_cms: + cm.clear_cache() gc.collect() async def check_connection(self): diff --git a/docling_jobkit/orchestrators/local/worker.py b/docling_jobkit/orchestrators/local/worker.py index e108b2c..57ece0d 100644 --- a/docling_jobkit/orchestrators/local/worker.py +++ b/docling_jobkit/orchestrators/local/worker.py @@ -34,11 +34,11 @@ def __init__( async def loop(self): _log.debug(f"Starting loop for worker {self.worker_id}") - cm = ( - self.orchestrator.cm - if self.use_shared_manager - else DoclingConverterManager(self.orchestrator.cm.config) - ) + if self.use_shared_manager: + cm = self.orchestrator.cm + else: + cm = DoclingConverterManager(self.orchestrator.cm.config) + self.orchestrator.worker_cms.append(cm) while True: task_id: str = await self.orchestrator.task_queue.get() self.orchestrator.queue_list.remove(task_id) From 6aad26320bb29d376a90d8905eae220c62c1970f Mon Sep 17 00:00:00 2001 From: Gabe Goodhart Date: Thu, 22 Jan 2026 15:44:45 -0700 Subject: [PATCH 5/6] test: Add unit tests for memory cleanup steps in local orchestrator Branch: RedisTaskMemoryLeak Signed-off-by: Gabe Goodhart --- tests/test_local_orchestrator.py | 168 +++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/tests/test_local_orchestrator.py b/tests/test_local_orchestrator.py index f8c6012..b283bb5 100644 --- a/tests/test_local_orchestrator.py +++ b/tests/test_local_orchestrator.py @@ -18,6 +18,7 @@ from docling.datamodel.pipeline_options_vlm_model import ResponseFormat from docling.utils.model_downloader import download_models +from docling_jobkit.convert.chunking import process_chunk_results from docling_jobkit.convert.manager import ( DoclingConverterManager, DoclingConverterManagerConfig, @@ -305,3 +306,170 @@ async def test_chunk_file( if isinstance(chunking_options, HierarchicalChunkerOptions): assert task_result.result.chunks[0].num_tokens is None + + + + +@pytest.mark.asyncio +async def test_clear_converters_clears_caches(): + """Test that clear_converters clears all caches including chunker_manager and worker_cms.""" + cm_config = DoclingConverterManagerConfig() + cm = DoclingConverterManager(config=cm_config) + + config = LocalOrchestratorConfig(num_workers=2, shared_models=False) + orchestrator = LocalOrchestrator(config=config, converter_manager=cm) + + # Verify chunker_manager exists + assert orchestrator.chunker_manager is not None + assert orchestrator.worker_cms == [] + + # Start queue processing to initialize workers + queue_task = asyncio.create_task(orchestrator.process_queue()) + + # Enqueue a chunking task to populate caches + doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf" + encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode() + + sources: list[TaskSource] = [] + sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name)) + + task = await orchestrator.enqueue( + task_type=TaskType.CHUNK, + sources=sources, + convert_options=ConvertDocumentsOptions(to_formats=[]), + chunking_options=HybridChunkerOptions(), + target=InBodyTarget(), + ) + + # Wait for task to complete + await _wait_task_complete(orchestrator, task.task_id, max_wait=30) + + # Verify worker_cms list is populated (workers with non-shared models) + assert len(orchestrator.worker_cms) > 0, "worker_cms should be populated with worker converter managers" + + # Check that caches have items before clearing + chunker_cache_info_before = orchestrator.chunker_manager._get_chunker_from_cache.cache_info() + assert chunker_cache_info_before.currsize > 0, "Chunker cache should have items before clearing" + + # Call clear_converters + await orchestrator.clear_converters() + + # Verify chunker cache is cleared + chunker_cache_info_after = orchestrator.chunker_manager._get_chunker_from_cache.cache_info() + assert chunker_cache_info_after.currsize == 0, "Chunker cache should be empty after clearing" + + # Verify worker converter manager caches are cleared + for worker_cm in orchestrator.worker_cms: + worker_cache_info = worker_cm._get_converter_from_hash.cache_info() + assert worker_cache_info.currsize == 0, "Worker converter cache should be empty after clearing" + + # Cleanup + queue_task.cancel() + try: + await queue_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +async def test_chunker_manager_shared_across_workers(): + """Test that chunker_manager is passed to process_chunk_results in workers.""" + from unittest.mock import patch + + cm_config = DoclingConverterManagerConfig() + cm = DoclingConverterManager(config=cm_config) + + config = LocalOrchestratorConfig(num_workers=2, shared_models=True) + orchestrator = LocalOrchestrator(config=config, converter_manager=cm) + + # Verify chunker_manager is initialized + assert orchestrator.chunker_manager is not None + expected_chunker_manager = orchestrator.chunker_manager + + # Start queue processing + queue_task = asyncio.create_task(orchestrator.process_queue()) + + # Patch process_chunk_results to capture the call arguments + with patch('docling_jobkit.orchestrators.local.worker.process_chunk_results', wraps=process_chunk_results) as mock_process: + # Enqueue a chunking task + doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf" + encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode() + + sources: list[TaskSource] = [] + sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name)) + + task = await orchestrator.enqueue( + task_type=TaskType.CHUNK, + sources=sources, + convert_options=ConvertDocumentsOptions(to_formats=[]), + chunking_options=HybridChunkerOptions(), + target=InBodyTarget(), + ) + + await _wait_task_complete(orchestrator, task.task_id, max_wait=30) + task_result = await orchestrator.task_result(task_id=task.task_id) + + # Verify task completed successfully + assert task_result is not None + assert isinstance(task_result.result, ChunkedDocumentResult) + + # Verify process_chunk_results was called with the orchestrator's chunker_manager + assert mock_process.called, "process_chunk_results should have been called" + call_kwargs = mock_process.call_args.kwargs + assert 'chunker_manager' in call_kwargs, "chunker_manager should be passed as kwarg" + assert call_kwargs['chunker_manager'] is expected_chunker_manager, \ + "The same chunker_manager instance from orchestrator should be passed to process_chunk_results" + + # Cleanup + queue_task.cancel() + try: + await queue_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +async def test_worker_cms_tracking(): + """Test that worker_cms list tracks converter managers for non-shared workers.""" + cm_config = DoclingConverterManagerConfig() + cm = DoclingConverterManager(config=cm_config) + + # Use non-shared models to trigger worker_cms tracking + config = LocalOrchestratorConfig(num_workers=2, shared_models=False) + orchestrator = LocalOrchestrator(config=config, converter_manager=cm) + + # Initially empty + assert orchestrator.worker_cms == [] + + # Start queue processing + queue_task = asyncio.create_task(orchestrator.process_queue()) + + # Enqueue a task to trigger worker initialization + doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf" + encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode() + + sources: list[TaskSource] = [] + sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name)) + + task = await orchestrator.enqueue( + sources=sources, + convert_options=ConvertDocumentsOptions(), + target=InBodyTarget(), + ) + + # Wait for task to complete + await _wait_task_complete(orchestrator, task.task_id, max_wait=30) + + # Verify worker_cms is populated + assert len(orchestrator.worker_cms) > 0, "worker_cms should contain converter managers from workers" + + # Verify each worker_cm is a DoclingConverterManager instance + for worker_cm in orchestrator.worker_cms: + assert isinstance(worker_cm, DoclingConverterManager), "Each worker_cm should be a DoclingConverterManager" + + # Cleanup + queue_task.cancel() + try: + await queue_task + except asyncio.CancelledError: + pass From 4d15a15401cb944c4350b281c060550987e9dd3e Mon Sep 17 00:00:00 2001 From: Gabe Goodhart Date: Thu, 22 Jan 2026 16:09:42 -0700 Subject: [PATCH 6/6] test: Add unit test for redis job deletion Branch: RedisTaskMemoryLeak Signed-off-by: Gabe Goodhart --- tests/test_rq_orchestrator.py | 78 ++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/tests/test_rq_orchestrator.py b/tests/test_rq_orchestrator.py index 45ad55b..e8e9b3a 100644 --- a/tests/test_rq_orchestrator.py +++ b/tests/test_rq_orchestrator.py @@ -21,7 +21,7 @@ ConvertDocumentsOptions, ) from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource -from docling_jobkit.datamodel.result import ChunkedDocumentResult, ExportResult +from docling_jobkit.datamodel.result import ChunkedDocumentResult, ExportDocumentResponse, ExportResult from docling_jobkit.datamodel.task import Task, TaskSource from docling_jobkit.datamodel.task_meta import TaskType from docling_jobkit.datamodel.task_targets import InBodyTarget @@ -166,3 +166,79 @@ async def test_chunk_file(orchestrator: RQOrchestrator, include_converted_doc: b ) else: task_result.result.documents[0].content.json_content is None + + +@pytest.mark.asyncio +async def test_delete_task_cleans_up_job(orchestrator: RQOrchestrator): + """Test that delete_task removes both result data and RQ job from Redis.""" + from rq.job import Job + from unittest.mock import Mock + import msgpack + + options = ConvertDocumentsOptions() + + doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf" + encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode() + + sources: list[TaskSource] = [] + sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name)) + + # Enqueue a task (this creates the job in Redis but won't process it without a worker) + task = await orchestrator.enqueue( + sources=sources, + convert_options=options, + target=InBodyTarget(), + ) + + # Verify the RQ job exists in Redis + job = Job.fetch(task.task_id, connection=orchestrator._redis_conn) + assert job is not None, "Job should exist in Redis after enqueue" + assert job.id == task.task_id + + # Simulate a completed task by adding a result key + # (normally this would be done by the worker) + result_key = f"{orchestrator.config.results_prefix}:{task.task_id}" + mock_result = ExportResult( + content=ExportDocumentResponse(filename="test.pdf"), + status=ConversionStatus.SUCCESS, + ) + packed = msgpack.packb(mock_result.model_dump(), use_bin_type=True) + await orchestrator._async_redis_conn.setex( + result_key, + orchestrator.config.results_ttl, + packed + ) + orchestrator._task_result_keys[task.task_id] = result_key + + # Verify result key exists in the tracking dict + assert task.task_id in orchestrator._task_result_keys + + # Verify result data exists in Redis + result_data = await orchestrator._async_redis_conn.get(result_key) + assert result_data is not None, "Result data should exist in Redis" + + # Delete the task + await orchestrator.delete_task(task.task_id) + + # Verify result key is removed from tracking dict + assert task.task_id not in orchestrator._task_result_keys + + # Verify result data is deleted from Redis + result_data = await orchestrator._async_redis_conn.get(result_key) + assert result_data is None, "Result data should be deleted from Redis" + + # Verify the RQ job is deleted from Redis + try: + Job.fetch(task.task_id, connection=orchestrator._redis_conn) + assert False, "Job should have been deleted from Redis" + except Exception: + # Expected: job should not exist anymore + pass + + # Verify task is removed from orchestrator's task tracking + try: + await orchestrator.get_raw_task(task_id=task.task_id) + assert False, "Task should have been removed from orchestrator" + except Exception: + # Expected: task should not exist anymore + pass