Skip to content
3 changes: 2 additions & 1 deletion docling_jobkit/convert/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def process_chunk_results(
task: Task,
conv_results: Iterable[ConversionResult],
work_dir: Path,
chunker_manager: Optional[DocumentChunkerManager] = None,
) -> DoclingTaskResult:
# Let's start by processing the documents
start_time = time.monotonic()
Expand All @@ -234,7 +235,7 @@ def process_chunk_results(
num_failed = 0

# TODO: DocumentChunkerManager should be initialized outside for really working as a cache
chunker_manager = DocumentChunkerManager()
chunker_manager = chunker_manager or DocumentChunkerManager()
for conv_res in conv_results:
errors = conv_res.errors
filename = conv_res.input.file.name
Expand Down
8 changes: 8 additions & 0 deletions docling_jobkit/orchestrators/local/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import gc
import logging
import tempfile
import uuid
Expand All @@ -10,6 +11,7 @@

from docling.datamodel.base_models import InputFormat

from docling_jobkit.convert.chunking import DocumentChunkerManager
from docling_jobkit.convert.manager import DoclingConverterManager
from docling_jobkit.datamodel.chunking import BaseChunkerOptions, ChunkingExportOptions
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions
Expand Down Expand Up @@ -41,6 +43,8 @@ def __init__(
self.task_queue: asyncio.Queue[str] = asyncio.Queue()
self.queue_list: list[str] = []
self.cm = converter_manager
self.chunker_manager = DocumentChunkerManager()
self.worker_cms: list[DoclingConverterManager] = []
self._task_results: dict[str, DoclingTaskResult] = {}
self.scratch_dir = self.config.scratch_dir or Path(
tempfile.mkdtemp(prefix="docling_")
Expand Down Expand Up @@ -129,6 +133,10 @@ async def delete_task(self, task_id: str):

async def clear_converters(self):
self.cm.clear_cache()
self.chunker_manager.clear_cache()
for cm in self.worker_cms:
cm.clear_cache()
gc.collect()

async def check_connection(self):
pass
11 changes: 6 additions & 5 deletions docling_jobkit/orchestrators/local/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ def __init__(

async def loop(self):
_log.debug(f"Starting loop for worker {self.worker_id}")
cm = (
self.orchestrator.cm
if self.use_shared_manager
else DoclingConverterManager(self.orchestrator.cm.config)
)
if self.use_shared_manager:
cm = self.orchestrator.cm
else:
cm = DoclingConverterManager(self.orchestrator.cm.config)
self.orchestrator.worker_cms.append(cm)
while True:
task_id: str = await self.orchestrator.task_queue.get()
self.orchestrator.queue_list.remove(task_id)
Expand Down Expand Up @@ -94,6 +94,7 @@ def run_task() -> DoclingTaskResult:
task=task,
conv_results=conv_results,
work_dir=workdir,
chunker_manager=self.orchestrator.chunker_manager,
)

return processed_results
Expand Down
16 changes: 13 additions & 3 deletions docling_jobkit/orchestrators/rq/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,22 @@ async def process_queue(self):

async def delete_task(self, task_id: str):
_log.info(f"Deleting result of task {task_id=}")

# Delete the result data from Redis if it exists
if task_id in self._task_result_keys:
await self._async_redis_conn.delete(self._task_result_keys[task_id])
del self._task_result_keys[task_id]
# TODO: consider also deleting the task
# job = Job.fetch(task_id, connection=self._redis_conn)
# job.delete()

# Delete the RQ job itself to free up Redis memory
# This includes the job metadata and result stream
try:
job = Job.fetch(task_id, connection=self._redis_conn)
job.delete()
_log.debug(f"Deleted RQ job {task_id=}")
except Exception as e:
# Job may not exist or already be deleted - this is not an error
_log.debug(f"Could not delete RQ job {task_id=}: {e}")

await super().delete_task(task_id)

async def warm_up_caches(self):
Expand Down
190 changes: 190 additions & 0 deletions tests/test_local_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from docling.datamodel.pipeline_options_vlm_model import ResponseFormat
from docling.utils.model_downloader import download_models

from docling_jobkit.convert.chunking import process_chunk_results
from docling_jobkit.convert.manager import (
DoclingConverterManager,
DoclingConverterManagerConfig,
Expand Down Expand Up @@ -305,3 +306,192 @@ async def test_chunk_file(

if isinstance(chunking_options, HierarchicalChunkerOptions):
assert task_result.result.chunks[0].num_tokens is None


@pytest.mark.asyncio
async def test_clear_converters_clears_caches():
"""Test that clear_converters clears all caches including chunker_manager and worker_cms."""
cm_config = DoclingConverterManagerConfig()
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(num_workers=2, shared_models=False)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

# Verify chunker_manager exists
assert orchestrator.chunker_manager is not None
assert orchestrator.worker_cms == []

# Start queue processing to initialize workers
queue_task = asyncio.create_task(orchestrator.process_queue())

# Enqueue a chunking task to populate caches
doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name))

task = await orchestrator.enqueue(
task_type=TaskType.CHUNK,
sources=sources,
convert_options=ConvertDocumentsOptions(to_formats=[]),
chunking_options=HybridChunkerOptions(),
target=InBodyTarget(),
)

# Wait for task to complete
await _wait_task_complete(orchestrator, task.task_id, max_wait=30)

# Verify worker_cms list is populated (workers with non-shared models)
assert len(orchestrator.worker_cms) > 0, (
"worker_cms should be populated with worker converter managers"
)

# Check that caches have items before clearing
chunker_cache_info_before = (
orchestrator.chunker_manager._get_chunker_from_cache.cache_info()
)
assert chunker_cache_info_before.currsize > 0, (
"Chunker cache should have items before clearing"
)

# Call clear_converters
await orchestrator.clear_converters()

# Verify chunker cache is cleared
chunker_cache_info_after = (
orchestrator.chunker_manager._get_chunker_from_cache.cache_info()
)
assert chunker_cache_info_after.currsize == 0, (
"Chunker cache should be empty after clearing"
)

# Verify worker converter manager caches are cleared
for worker_cm in orchestrator.worker_cms:
worker_cache_info = worker_cm._get_converter_from_hash.cache_info()
assert worker_cache_info.currsize == 0, (
"Worker converter cache should be empty after clearing"
)

# Cleanup
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass


@pytest.mark.asyncio
async def test_chunker_manager_shared_across_workers():
"""Test that chunker_manager is passed to process_chunk_results in workers."""
from unittest.mock import patch

cm_config = DoclingConverterManagerConfig()
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(num_workers=2, shared_models=True)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

# Verify chunker_manager is initialized
assert orchestrator.chunker_manager is not None
expected_chunker_manager = orchestrator.chunker_manager

# Start queue processing
queue_task = asyncio.create_task(orchestrator.process_queue())

# Patch process_chunk_results to capture the call arguments
with patch(
"docling_jobkit.orchestrators.local.worker.process_chunk_results",
wraps=process_chunk_results,
) as mock_process:
# Enqueue a chunking task
doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(
FileSource(base64_string=encoded_doc, filename=doc_filename.name)
)

task = await orchestrator.enqueue(
task_type=TaskType.CHUNK,
sources=sources,
convert_options=ConvertDocumentsOptions(to_formats=[]),
chunking_options=HybridChunkerOptions(),
target=InBodyTarget(),
)

await _wait_task_complete(orchestrator, task.task_id, max_wait=30)
task_result = await orchestrator.task_result(task_id=task.task_id)

# Verify task completed successfully
assert task_result is not None
assert isinstance(task_result.result, ChunkedDocumentResult)

# Verify process_chunk_results was called with the orchestrator's chunker_manager
assert mock_process.called, "process_chunk_results should have been called"
call_kwargs = mock_process.call_args.kwargs
assert "chunker_manager" in call_kwargs, (
"chunker_manager should be passed as kwarg"
)
assert call_kwargs["chunker_manager"] is expected_chunker_manager, (
"The same chunker_manager instance from orchestrator should be passed to process_chunk_results"
)

# Cleanup
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass


@pytest.mark.asyncio
async def test_worker_cms_tracking():
"""Test that worker_cms list tracks converter managers for non-shared workers."""
cm_config = DoclingConverterManagerConfig()
cm = DoclingConverterManager(config=cm_config)

# Use non-shared models to trigger worker_cms tracking
config = LocalOrchestratorConfig(num_workers=2, shared_models=False)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

# Initially empty
assert orchestrator.worker_cms == []

# Start queue processing
queue_task = asyncio.create_task(orchestrator.process_queue())

# Enqueue a task to trigger worker initialization
doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name))

task = await orchestrator.enqueue(
sources=sources,
convert_options=ConvertDocumentsOptions(),
target=InBodyTarget(),
)

# Wait for task to complete
await _wait_task_complete(orchestrator, task.task_id, max_wait=30)

# Verify worker_cms is populated
assert len(orchestrator.worker_cms) > 0, (
"worker_cms should contain converter managers from workers"
)

# Verify each worker_cm is a DoclingConverterManager instance
for worker_cm in orchestrator.worker_cms:
assert isinstance(worker_cm, DoclingConverterManager), (
"Each worker_cm should be a DoclingConverterManager"
)

# Cleanup
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass
80 changes: 79 additions & 1 deletion tests/test_rq_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
ConvertDocumentsOptions,
)
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.result import ChunkedDocumentResult, ExportResult
from docling_jobkit.datamodel.result import (
ChunkedDocumentResult,
ExportDocumentResponse,
ExportResult,
)
from docling_jobkit.datamodel.task import Task, TaskSource
from docling_jobkit.datamodel.task_meta import TaskType
from docling_jobkit.datamodel.task_targets import InBodyTarget
Expand Down Expand Up @@ -166,3 +170,77 @@ async def test_chunk_file(orchestrator: RQOrchestrator, include_converted_doc: b
)
else:
task_result.result.documents[0].content.json_content is None


@pytest.mark.asyncio
async def test_delete_task_cleans_up_job(orchestrator: RQOrchestrator):
"""Test that delete_task removes both result data and RQ job from Redis."""

import msgpack
from rq.job import Job

options = ConvertDocumentsOptions()

doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name))

# Enqueue a task (this creates the job in Redis but won't process it without a worker)
task = await orchestrator.enqueue(
sources=sources,
convert_options=options,
target=InBodyTarget(),
)

# Verify the RQ job exists in Redis
job = Job.fetch(task.task_id, connection=orchestrator._redis_conn)
assert job is not None, "Job should exist in Redis after enqueue"
assert job.id == task.task_id

# Simulate a completed task by adding a result key
# (normally this would be done by the worker)
result_key = f"{orchestrator.config.results_prefix}:{task.task_id}"
mock_result = ExportResult(
content=ExportDocumentResponse(filename="test.pdf"),
status=ConversionStatus.SUCCESS,
)
packed = msgpack.packb(mock_result.model_dump(), use_bin_type=True)
await orchestrator._async_redis_conn.setex(
result_key, orchestrator.config.results_ttl, packed
)
orchestrator._task_result_keys[task.task_id] = result_key

# Verify result key exists in the tracking dict
assert task.task_id in orchestrator._task_result_keys

# Verify result data exists in Redis
result_data = await orchestrator._async_redis_conn.get(result_key)
assert result_data is not None, "Result data should exist in Redis"

# Delete the task
await orchestrator.delete_task(task.task_id)

# Verify result key is removed from tracking dict
assert task.task_id not in orchestrator._task_result_keys

# Verify result data is deleted from Redis
result_data = await orchestrator._async_redis_conn.get(result_key)
assert result_data is None, "Result data should be deleted from Redis"

# Verify the RQ job is deleted from Redis
try:
Job.fetch(task.task_id, connection=orchestrator._redis_conn)
assert False, "Job should have been deleted from Redis"
except Exception:
# Expected: job should not exist anymore
pass

# Verify task is removed from orchestrator's task tracking
try:
await orchestrator.get_raw_task(task_id=task.task_id)
assert False, "Task should have been removed from orchestrator"
except Exception:
# Expected: task should not exist anymore
pass