Conversation
Greptile SummaryThis PR refactors the CPU/GPU operator separation across all NeMo Retriever pipeline stages (OCR, page elements, charts, tables, text embedding) into dedicated
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/ocr/init.py | Removes OCRCPUActor and OCRGPUActor from package-level exports, breaking any consumer that imported them via from nemo_retriever.ocr import ... |
| nemo_retriever/src/nemo_retriever/ocr/ocr.py | Refactored to a thin backward-compat shim; _patched_shared_runtime mutates module globals without locking, which is fragile under concurrent access. |
| nemo_retriever/src/nemo_retriever/graph/multi_type_extract_operator.py | Adds _extract_params_need_local_gpu, _instantiate_resolved, and _local_resources; logic for prefers_cpu_variant is improved but page-elements fallthrough may over-request GPU in some configurations. |
| nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py | Adds allow_no_gpu parameter to batch_tuning_to_node_overrides and auto-detects no-GPU clusters to force num_gpus=0.0 overrides; correctly threads through effective_allow_no_gpu. |
| nemo_retriever/src/nemo_retriever/graph/operator_archetype.py | Exposes cpu_variant_class() and gpu_variant_class() as overridable classmethods to support lazy import patterns; no issues found. |
| nemo_retriever/src/nemo_retriever/text_embed/operators.py | Refactored to thin archetype with lazy-import cpu_variant_class/gpu_variant_class; backward compat shims via __getattr__; looks correct. |
| api/src/nv_ingest_api/internal/primitives/nim/model_interface/decorators.py | Replaces multiprocessing.Manager() with threading.RLock + plain dict/int; fixes macOS spawn-mode process creation at import time; call_count is now a per-closure nonlocal, fully thread-safe inside the lock. |
| nemo_retriever/src/nemo_retriever/graph_ingestor.py | Correctly computes effective_allow_no_gpu from both the explicit flag and zero-GPU cluster detection before building node overrides. |
| nemo_retriever/tests/test_cpu_module_import_safety.py | New test verifying that CPU actor modules do not trigger nemo_retriever.model.local import; good coverage of the lazy-import guarantee. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[MultiTypeExtractOperator archetype] --> B{prefers_cpu_variant?}
B -- _extract_params_need_local_gpu == False --> C[MultiTypeExtractCPUActor]
B -- _extract_params_need_local_gpu == True --> D{GPUs available?}
D -- yes --> E[MultiTypeExtractGPUActor]
D -- no --> C
C & E --> F[_instantiate_resolved per sub-operator]
F --> G{resolve_operator_class}
G --> H[PageElementDetectionActor archetype]
G --> I[OCRActor archetype]
G --> J[TableStructureActor archetype]
G --> K[GraphicElementsActor archetype]
G --> L[NemotronParseActor archetype]
H --> M{invoke_url set or no local GPU?}
M -- yes --> N[PageElementDetectionCPUActor remote endpoint]
M -- no --> O[PageElementDetectionGPUActor local model]
I --> P{ocr_invoke_url set or no local GPU?}
P -- yes --> Q[OCRCPUActor remote endpoint]
P -- no --> R[OCRGPUActor local model]
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ocr/__init__.py
Line: 4-10
Comment:
**`OCRCPUActor` and `OCRGPUActor` removed from package-level exports**
`OCRCPUActor` and `OCRGPUActor` were in `__all__` and importable as `from nemo_retriever.ocr import OCRCPUActor`. After this change the package `__init__.py` no longer re-exports them, so any existing code using that import path will receive an `ImportError`. The `__getattr__` fallback lives in `ocr/ocr.py`, not in `__init__.py`, so it does not cover `nemo_retriever.ocr.*` imports.
Consider adding a `__getattr__` shim directly in `__init__.py` (or re-exporting) to maintain backward compatibility, as is already done for `NemotronParseCPUActor` / `NemotronParseGPUActor` in `ocr.py`.
**Rule Used:** Changes to public API surfaces (FastAPI endpoints,... ([source](https://app.greptile.com/review/custom-context?memory=api-backward-compatibility))
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ocr/ocr.py
Line: 38-56
Comment:
**`_patched_shared_runtime` mutates module globals without a lock**
The context manager patches `_shared._np_rgb_to_b64_png` and `_shared.invoke_image_inference_batches` on the live module object, then restores them in `finally`. If two callers invoke this shim concurrently in the same process (e.g. multi-threaded Ray tasks), the second entry will capture the already-patched values as "originals" and restore them incorrectly, leaving `_shared` permanently overwritten with the nim versions.
Since the actual CPU/GPU actors import from `shared.py` directly (bypassing this shim), the risk is limited to legacy callers of `from nemo_retriever.ocr.ocr import ocr_page_elements`, but it is still a fragile design. Adding a module-level lock around the patch/unpatch would make this safe.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/multi_type_extract_operator.py
Line: 337-341
Comment:
**`_local_resources()` lazy init is not thread-safe**
`_resolved_resources` is checked and set without a lock:
```python
def _local_resources(self):
if self._resolved_resources is None:
self._resolved_resources = gather_local_resources()
return self._resolved_resources
```
Two concurrent calls (possible if a Ray actor has `max_concurrency > 1`) will both observe `None` and call `gather_local_resources()` simultaneously. While double-initialization is benign, the write races on `self._resolved_resources`. Adding a simple lock or using `functools.cached_property` would make this safe.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/multi_type_extract_operator.py
Line: 57-110
Comment:
**`_extract_params_need_local_gpu` returns `True` on missing `page_elements_invoke_url` even for text-only OCR modes**
When `page_elements_invoke_url` is not set, the function immediately returns `True` regardless of whether the page elements stage would actually run. For example, with `method="pdfium_hybrid"` and `extract_text=True` but `extract_tables=False, extract_charts=False, extract_infographics=False`, the pipeline could run entirely via remote OCR — but this function still reports local GPU needed, causing unnecessary GPU reservation.
How can I resolve this? If you propose a fix, please make it concise.Reviews (8): Last reviewed commit: "fix decorator logic with global cache an..." | Re-trigger Greptile
| while indexed_rows < num_elements: | ||
| pos_movement = 10 # number of iteration allowed without noticing an increase in indexed_rows | ||
| for i in range(20): | ||
| prev_indexed_rows = indexed_rows | ||
| indexed_rows = client.describe_index(collection_name, index_name)["indexed_rows"] | ||
| new_indexed_rows = client.describe_index(collection_name, index_name)["indexed_rows"] | ||
| time.sleep(1) | ||
| logger.info(f"Indexed rows, {collection_name}, {index_name} - {indexed_rows} / {rows_expected}") | ||
| if indexed_rows >= rows_expected: | ||
| logger.info( | ||
| f"polling for indexed rows, {collection_name}, {index_name} - {new_indexed_rows} / {num_elements}" | ||
| ) | ||
| if new_indexed_rows == already_indexed_rows + num_elements: | ||
| indexed_rows = new_indexed_rows | ||
| break | ||
| # check if indexed_rows is staying the same, too many times means something is wrong | ||
| if indexed_rows == prev_indexed_rows: | ||
| if new_indexed_rows == indexed_rows: | ||
| pos_movement -= 1 | ||
| else: | ||
| pos_movement = start_pos_movement | ||
| pos_movement = 10 | ||
| # if pos_movement is 0, raise an error, means the rows are not getting indexed as expected | ||
| if pos_movement == 0: | ||
| raise ValueError(f"Rows are not getting indexed as expected for: {index_name} - {collection_name}") | ||
| raise ValueError("Rows are not getting indexed as expected") | ||
| indexed_rows = new_indexed_rows |
There was a problem hiding this comment.
wait_for_index exits early when collection has pre-existing rows
The outer while indexed_rows < num_elements condition uses num_elements (the count of newly inserted rows) as its upper bound, but indexed_rows is updated to new_indexed_rows — the total index count — after each inner-loop pass. If the collection already had rows before insertion (already_indexed_rows > 0), then indexed_rows will exceed num_elements after the first few poll cycles, causing the loop to exit before all new rows are actually indexed.
For example: with already_indexed_rows=100 and num_elements=50, the inner for loop completes without a break (target is 150), sets indexed_rows ≈ 120, and the outer while exits immediately (120 ≥ 50) even though indexing is incomplete. The condition should compare against the absolute expected total:
| while indexed_rows < num_elements: | |
| pos_movement = 10 # number of iteration allowed without noticing an increase in indexed_rows | |
| for i in range(20): | |
| prev_indexed_rows = indexed_rows | |
| indexed_rows = client.describe_index(collection_name, index_name)["indexed_rows"] | |
| new_indexed_rows = client.describe_index(collection_name, index_name)["indexed_rows"] | |
| time.sleep(1) | |
| logger.info(f"Indexed rows, {collection_name}, {index_name} - {indexed_rows} / {rows_expected}") | |
| if indexed_rows >= rows_expected: | |
| logger.info( | |
| f"polling for indexed rows, {collection_name}, {index_name} - {new_indexed_rows} / {num_elements}" | |
| ) | |
| if new_indexed_rows == already_indexed_rows + num_elements: | |
| indexed_rows = new_indexed_rows | |
| break | |
| # check if indexed_rows is staying the same, too many times means something is wrong | |
| if indexed_rows == prev_indexed_rows: | |
| if new_indexed_rows == indexed_rows: | |
| pos_movement -= 1 | |
| else: | |
| pos_movement = start_pos_movement | |
| pos_movement = 10 | |
| # if pos_movement is 0, raise an error, means the rows are not getting indexed as expected | |
| if pos_movement == 0: | |
| raise ValueError(f"Rows are not getting indexed as expected for: {index_name} - {collection_name}") | |
| raise ValueError("Rows are not getting indexed as expected") | |
| indexed_rows = new_indexed_rows | |
| while indexed_rows < already_indexed_rows + num_elements: |
Prompt To Fix With AI
This is a comment left during a code review.
Path: client/src/nv_ingest_client/util/vdb/milvus.py
Line: 908-927
Comment:
**`wait_for_index` exits early when collection has pre-existing rows**
The outer `while indexed_rows < num_elements` condition uses `num_elements` (the count of newly inserted rows) as its upper bound, but `indexed_rows` is updated to `new_indexed_rows` — the **total** index count — after each inner-loop pass. If the collection already had rows before insertion (`already_indexed_rows > 0`), then `indexed_rows` will exceed `num_elements` after the first few poll cycles, causing the loop to exit before all new rows are actually indexed.
For example: with `already_indexed_rows=100` and `num_elements=50`, the inner `for` loop completes without a `break` (target is 150), sets `indexed_rows ≈ 120`, and the outer while exits immediately (`120 ≥ 50`) even though indexing is incomplete. The condition should compare against the absolute expected total:
```suggestion
while indexed_rows < already_indexed_rows + num_elements:
```
How can I resolve this? If you propose a fix, please make it concise.| time.sleep(2**attempt) | ||
| continue | ||
| start = time.time() | ||
| response = requests.post(f"{reranker_endpoint}", headers=headers, json=payload) |
There was a problem hiding this comment.
requests.post without a timeout can hang indefinitely
The previous implementation passed timeout=120 to requests.post. Removing it means a stalled or unresponsive reranker endpoint will block the calling thread forever. The original 120-second timeout should be restored:
| response = requests.post(f"{reranker_endpoint}", headers=headers, json=payload) | |
| response = requests.post(f"{reranker_endpoint}", headers=headers, json=payload, timeout=120) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: client/src/nv_ingest_client/util/vdb/milvus.py
Line: 1514
Comment:
**`requests.post` without a timeout can hang indefinitely**
The previous implementation passed `timeout=120` to `requests.post`. Removing it means a stalled or unresponsive reranker endpoint will block the calling thread forever. The original 120-second timeout should be restored:
```suggestion
response = requests.post(f"{reranker_endpoint}", headers=headers, json=payload, timeout=120)
```
How can I resolve this? If you propose a fix, please make it concise.| return [row.to_list()] | ||
|
|
||
| logger.debug(f"Removing file {base64_file_path}") | ||
| logger.error(f"Removing file {base64_file_path}") |
There was a problem hiding this comment.
Routine file deletion logged at ERROR level
Deleting a temporary audio file is expected, successful behavior — not an error. Using logger.error here will fire in every error-monitoring dashboard on normal operation. It should remain logger.debug.
| logger.error(f"Removing file {base64_file_path}") | |
| logger.debug(f"Removing file {base64_file_path}") |
Prompt To Fix With AI
This is a comment left during a code review.
Path: api/src/nv_ingest_api/internal/extract/audio/audio_extraction.py
Line: 79
Comment:
**Routine file deletion logged at ERROR level**
Deleting a temporary audio file is expected, successful behavior — not an error. Using `logger.error` here will fire in every error-monitoring dashboard on normal operation. It should remain `logger.debug`.
```suggestion
logger.debug(f"Removing file {base64_file_path}")
```
How can I resolve this? If you propose a fix, please make it concise.| to_remove = Path(to_remove) | ||
| if to_remove.is_file(): | ||
| logger.debug(f"Removing file {to_remove}") | ||
| logger.error(f"Removing file {to_remove}") |
There was a problem hiding this comment.
Routine file cleanup and FFmpeg stderr logged at ERROR level
Removing a temporary chunk file on the cleanup path is normal, successful behavior. logger.error will generate spurious entries on every successful transcoding job. Similarly, a few lines above, logging.error(f"{original_input_path} - {capture_error}") logs FFmpeg's stderr (which frequently contains purely informational output) at error level. Both should be debug or info.
| logger.error(f"Removing file {to_remove}") | |
| logger.debug(f"Removing file {to_remove}") |
Prompt To Fix With AI
This is a comment left during a code review.
Path: api/src/nv_ingest_api/util/dataloader/dataloader.py
Line: 235
Comment:
**Routine file cleanup and FFmpeg stderr logged at ERROR level**
Removing a temporary chunk file on the cleanup path is normal, successful behavior. `logger.error` will generate spurious entries on every successful transcoding job. Similarly, a few lines above, `logging.error(f"{original_input_path} - {capture_error}")` logs FFmpeg's stderr (which frequently contains purely informational output) at error level. Both should be `debug` or `info`.
```suggestion
logger.debug(f"Removing file {to_remove}")
```
How can I resolve this? If you propose a fix, please make it concise.| @@ -28,10 +36,14 @@ def multiprocessing_cache(max_calls): | |||
| """ | |||
|
|
|||
| def decorator(func): | |||
| call_count = manager.Value("i", 0) # Shared integer for call counting | |||
| call_count = None # Initialized lazily alongside the manager | |||
|
|
|||
| @wraps(func) | |||
| def wrapper(*args, **kwargs): | |||
| nonlocal call_count | |||
| _ensure_manager() | |||
| if call_count is None: | |||
| call_count = _manager.Value("i", 0) | |||
There was a problem hiding this comment.
Lazy
call_count initialization is outside the lock
_ensure_manager() and the if call_count is None: call_count = _manager.Value(...) assignment both run before the with lock: block. In a concurrent scenario two threads could both observe call_count is None and create independent Value objects, effectively resetting the counter mid-flight. Moving initialization inside the lock prevents this:
with lock:
_ensure_manager()
if call_count is None:
call_count = _manager.Value("i", 0)
call_count.value += 1
# ... rest of cache logicPrompt To Fix With AI
This is a comment left during a code review.
Path: api/src/nv_ingest_api/internal/primitives/nim/model_interface/decorators.py
Line: 19-46
Comment:
**Lazy `call_count` initialization is outside the lock**
`_ensure_manager()` and the `if call_count is None: call_count = _manager.Value(...)` assignment both run before the `with lock:` block. In a concurrent scenario two threads could both observe `call_count is None` and create independent `Value` objects, effectively resetting the counter mid-flight. Moving initialization inside the lock prevents this:
```python
with lock:
_ensure_manager()
if call_count is None:
call_count = _manager.Value("i", 0)
call_count.value += 1
# ... rest of cache logic
```
How can I resolve this? If you propose a fix, please make it concise.| ) | ||
| num_elements = len(cleaned_records) | ||
| if num_elements == 0: | ||
| logger.warning("No records with Embeddings to insert detected.") | ||
| return | ||
| raise ValueError("No records with Embeddings to insert detected.") |
There was a problem hiding this comment.
Empty-records now raises instead of returning gracefully
The previous implementation logged a warning and returned when num_elements == 0, allowing callers to tolerate empty batches. Raising ValueError will break any pipeline that passes an empty-filtered batch without explicit try/except handling. If this change is intentional, callers and the docstring should be updated accordingly.
Prompt To Fix With AI
This is a comment left during a code review.
Path: client/src/nv_ingest_client/util/vdb/milvus.py
Line: 1038-1041
Comment:
**Empty-records now raises instead of returning gracefully**
The previous implementation logged a warning and returned when `num_elements == 0`, allowing callers to tolerate empty batches. Raising `ValueError` will break any pipeline that passes an empty-filtered batch without explicit `try/except` handling. If this change is intentional, callers and the docstring should be updated accordingly.
How can I resolve this? If you propose a fix, please make it concise.
Description
Based on @charlesbluca PR, but adds changes to ensure graph only pulls CPU operators in non-GPU environments and also if the user passes endpoints. Separated the CPU and GPU operators so imports dont cause problems in target environments.
Checklist