Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docling_jobkit/convert/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def process_chunk_results(
task: Task,
conv_results: Iterable[ConversionResult],
work_dir: Path,
chunker_manager: Optional[DocumentChunkerManager] = None,
) -> DoclingTaskResult:
# Let's start by processing the documents
start_time = time.monotonic()
Expand All @@ -234,7 +235,7 @@ def process_chunk_results(
num_failed = 0

# TODO: DocumentChunkerManager should be initialized outside for really working as a cache
chunker_manager = DocumentChunkerManager()
chunker_manager = chunker_manager or DocumentChunkerManager()
for conv_res in conv_results:
errors = conv_res.errors
filename = conv_res.input.file.name
Expand Down
8 changes: 8 additions & 0 deletions docling_jobkit/orchestrators/local/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import gc
import logging
import tempfile
import uuid
Expand All @@ -10,6 +11,7 @@

from docling.datamodel.base_models import InputFormat

from docling_jobkit.convert.chunking import DocumentChunkerManager
from docling_jobkit.convert.manager import DoclingConverterManager
from docling_jobkit.datamodel.chunking import BaseChunkerOptions, ChunkingExportOptions
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions
Expand Down Expand Up @@ -41,6 +43,8 @@ def __init__(
self.task_queue: asyncio.Queue[str] = asyncio.Queue()
self.queue_list: list[str] = []
self.cm = converter_manager
self.chunker_manager = DocumentChunkerManager()
self.worker_cms = []
self._task_results: dict[str, DoclingTaskResult] = {}
self.scratch_dir = self.config.scratch_dir or Path(
tempfile.mkdtemp(prefix="docling_")
Expand Down Expand Up @@ -129,6 +133,10 @@ async def delete_task(self, task_id: str):

async def clear_converters(self):
self.cm.clear_cache()
self.chunker_manager.clear_cache()
for cm in self.worker_cms:
cm.clear_cache()
gc.collect()

async def check_connection(self):
pass
11 changes: 6 additions & 5 deletions docling_jobkit/orchestrators/local/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ def __init__(

async def loop(self):
_log.debug(f"Starting loop for worker {self.worker_id}")
cm = (
self.orchestrator.cm
if self.use_shared_manager
else DoclingConverterManager(self.orchestrator.cm.config)
)
if self.use_shared_manager:
cm = self.orchestrator.cm
else:
cm = DoclingConverterManager(self.orchestrator.cm.config)
self.orchestrator.worker_cms.append(cm)
while True:
task_id: str = await self.orchestrator.task_queue.get()
self.orchestrator.queue_list.remove(task_id)
Expand Down Expand Up @@ -94,6 +94,7 @@ def run_task() -> DoclingTaskResult:
task=task,
conv_results=conv_results,
work_dir=workdir,
chunker_manager=self.orchestrator.chunker_manager,
)

return processed_results
Expand Down
16 changes: 13 additions & 3 deletions docling_jobkit/orchestrators/rq/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,22 @@ async def process_queue(self):

async def delete_task(self, task_id: str):
_log.info(f"Deleting result of task {task_id=}")

# Delete the result data from Redis if it exists
if task_id in self._task_result_keys:
await self._async_redis_conn.delete(self._task_result_keys[task_id])
del self._task_result_keys[task_id]
# TODO: consider also deleting the task
# job = Job.fetch(task_id, connection=self._redis_conn)
# job.delete()

# Delete the RQ job itself to free up Redis memory
# This includes the job metadata and result stream
try:
job = Job.fetch(task_id, connection=self._redis_conn)
job.delete()
_log.debug(f"Deleted RQ job {task_id=}")
except Exception as e:
# Job may not exist or already be deleted - this is not an error
_log.debug(f"Could not delete RQ job {task_id=}: {e}")

await super().delete_task(task_id)

async def warm_up_caches(self):
Expand Down
168 changes: 168 additions & 0 deletions tests/test_local_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from docling.datamodel.pipeline_options_vlm_model import ResponseFormat
from docling.utils.model_downloader import download_models

from docling_jobkit.convert.chunking import process_chunk_results
from docling_jobkit.convert.manager import (
DoclingConverterManager,
DoclingConverterManagerConfig,
Expand Down Expand Up @@ -305,3 +306,170 @@ async def test_chunk_file(

if isinstance(chunking_options, HierarchicalChunkerOptions):
assert task_result.result.chunks[0].num_tokens is None




@pytest.mark.asyncio
async def test_clear_converters_clears_caches():
"""Test that clear_converters clears all caches including chunker_manager and worker_cms."""
cm_config = DoclingConverterManagerConfig()
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(num_workers=2, shared_models=False)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

# Verify chunker_manager exists
assert orchestrator.chunker_manager is not None
assert orchestrator.worker_cms == []

# Start queue processing to initialize workers
queue_task = asyncio.create_task(orchestrator.process_queue())

# Enqueue a chunking task to populate caches
doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name))

task = await orchestrator.enqueue(
task_type=TaskType.CHUNK,
sources=sources,
convert_options=ConvertDocumentsOptions(to_formats=[]),
chunking_options=HybridChunkerOptions(),
target=InBodyTarget(),
)

# Wait for task to complete
await _wait_task_complete(orchestrator, task.task_id, max_wait=30)

# Verify worker_cms list is populated (workers with non-shared models)
assert len(orchestrator.worker_cms) > 0, "worker_cms should be populated with worker converter managers"

# Check that caches have items before clearing
chunker_cache_info_before = orchestrator.chunker_manager._get_chunker_from_cache.cache_info()
assert chunker_cache_info_before.currsize > 0, "Chunker cache should have items before clearing"

# Call clear_converters
await orchestrator.clear_converters()

# Verify chunker cache is cleared
chunker_cache_info_after = orchestrator.chunker_manager._get_chunker_from_cache.cache_info()
assert chunker_cache_info_after.currsize == 0, "Chunker cache should be empty after clearing"

# Verify worker converter manager caches are cleared
for worker_cm in orchestrator.worker_cms:
worker_cache_info = worker_cm._get_converter_from_hash.cache_info()
assert worker_cache_info.currsize == 0, "Worker converter cache should be empty after clearing"

# Cleanup
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass


@pytest.mark.asyncio
async def test_chunker_manager_shared_across_workers():
"""Test that chunker_manager is passed to process_chunk_results in workers."""
from unittest.mock import patch

cm_config = DoclingConverterManagerConfig()
cm = DoclingConverterManager(config=cm_config)

config = LocalOrchestratorConfig(num_workers=2, shared_models=True)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

# Verify chunker_manager is initialized
assert orchestrator.chunker_manager is not None
expected_chunker_manager = orchestrator.chunker_manager

# Start queue processing
queue_task = asyncio.create_task(orchestrator.process_queue())

# Patch process_chunk_results to capture the call arguments
with patch('docling_jobkit.orchestrators.local.worker.process_chunk_results', wraps=process_chunk_results) as mock_process:
# Enqueue a chunking task
doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name))

task = await orchestrator.enqueue(
task_type=TaskType.CHUNK,
sources=sources,
convert_options=ConvertDocumentsOptions(to_formats=[]),
chunking_options=HybridChunkerOptions(),
target=InBodyTarget(),
)

await _wait_task_complete(orchestrator, task.task_id, max_wait=30)
task_result = await orchestrator.task_result(task_id=task.task_id)

# Verify task completed successfully
assert task_result is not None
assert isinstance(task_result.result, ChunkedDocumentResult)

# Verify process_chunk_results was called with the orchestrator's chunker_manager
assert mock_process.called, "process_chunk_results should have been called"
call_kwargs = mock_process.call_args.kwargs
assert 'chunker_manager' in call_kwargs, "chunker_manager should be passed as kwarg"
assert call_kwargs['chunker_manager'] is expected_chunker_manager, \
"The same chunker_manager instance from orchestrator should be passed to process_chunk_results"

# Cleanup
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass


@pytest.mark.asyncio
async def test_worker_cms_tracking():
"""Test that worker_cms list tracks converter managers for non-shared workers."""
cm_config = DoclingConverterManagerConfig()
cm = DoclingConverterManager(config=cm_config)

# Use non-shared models to trigger worker_cms tracking
config = LocalOrchestratorConfig(num_workers=2, shared_models=False)
orchestrator = LocalOrchestrator(config=config, converter_manager=cm)

# Initially empty
assert orchestrator.worker_cms == []

# Start queue processing
queue_task = asyncio.create_task(orchestrator.process_queue())

# Enqueue a task to trigger worker initialization
doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name))

task = await orchestrator.enqueue(
sources=sources,
convert_options=ConvertDocumentsOptions(),
target=InBodyTarget(),
)

# Wait for task to complete
await _wait_task_complete(orchestrator, task.task_id, max_wait=30)

# Verify worker_cms is populated
assert len(orchestrator.worker_cms) > 0, "worker_cms should contain converter managers from workers"

# Verify each worker_cm is a DoclingConverterManager instance
for worker_cm in orchestrator.worker_cms:
assert isinstance(worker_cm, DoclingConverterManager), "Each worker_cm should be a DoclingConverterManager"

# Cleanup
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass
78 changes: 77 additions & 1 deletion tests/test_rq_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
ConvertDocumentsOptions,
)
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.result import ChunkedDocumentResult, ExportResult
from docling_jobkit.datamodel.result import ChunkedDocumentResult, ExportDocumentResponse, ExportResult
from docling_jobkit.datamodel.task import Task, TaskSource
from docling_jobkit.datamodel.task_meta import TaskType
from docling_jobkit.datamodel.task_targets import InBodyTarget
Expand Down Expand Up @@ -166,3 +166,79 @@ async def test_chunk_file(orchestrator: RQOrchestrator, include_converted_doc: b
)
else:
task_result.result.documents[0].content.json_content is None


@pytest.mark.asyncio
async def test_delete_task_cleans_up_job(orchestrator: RQOrchestrator):
"""Test that delete_task removes both result data and RQ job from Redis."""
from rq.job import Job
from unittest.mock import Mock
import msgpack

options = ConvertDocumentsOptions()

doc_filename = Path(__file__).parent / "2206.01062v1-pg4.pdf"
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()

sources: list[TaskSource] = []
sources.append(FileSource(base64_string=encoded_doc, filename=doc_filename.name))

# Enqueue a task (this creates the job in Redis but won't process it without a worker)
task = await orchestrator.enqueue(
sources=sources,
convert_options=options,
target=InBodyTarget(),
)

# Verify the RQ job exists in Redis
job = Job.fetch(task.task_id, connection=orchestrator._redis_conn)
assert job is not None, "Job should exist in Redis after enqueue"
assert job.id == task.task_id

# Simulate a completed task by adding a result key
# (normally this would be done by the worker)
result_key = f"{orchestrator.config.results_prefix}:{task.task_id}"
mock_result = ExportResult(
content=ExportDocumentResponse(filename="test.pdf"),
status=ConversionStatus.SUCCESS,
)
packed = msgpack.packb(mock_result.model_dump(), use_bin_type=True)
await orchestrator._async_redis_conn.setex(
result_key,
orchestrator.config.results_ttl,
packed
)
orchestrator._task_result_keys[task.task_id] = result_key

# Verify result key exists in the tracking dict
assert task.task_id in orchestrator._task_result_keys

# Verify result data exists in Redis
result_data = await orchestrator._async_redis_conn.get(result_key)
assert result_data is not None, "Result data should exist in Redis"

# Delete the task
await orchestrator.delete_task(task.task_id)

# Verify result key is removed from tracking dict
assert task.task_id not in orchestrator._task_result_keys

# Verify result data is deleted from Redis
result_data = await orchestrator._async_redis_conn.get(result_key)
assert result_data is None, "Result data should be deleted from Redis"

# Verify the RQ job is deleted from Redis
try:
Job.fetch(task.task_id, connection=orchestrator._redis_conn)
assert False, "Job should have been deleted from Redis"
except Exception:
# Expected: job should not exist anymore
pass

# Verify task is removed from orchestrator's task tracking
try:
await orchestrator.get_raw_task(task_id=task.task_id)
assert False, "Task should have been removed from orchestrator"
except Exception:
# Expected: task should not exist anymore
pass