Skip to content

Wrapper layer: eager ML imports, non-PluginRegistry endpoints, and blocking DaemonManagerΒ #1890

@MervinPraison

Description

@MervinPraison

Scope

Deep analysis of src/praisonai/praisonai/ (the wrapper layer only β€” not the core SDK praisonaiagents).

Per the project philosophy, the wrapper must be:

  • Lazy imports + optional deps β€” no heavy work at module top level
  • Protocol-driven β€” unified registries, no parallel implementations (DRY)
  • Async-safe by default β€” no blocking calls in code paths reachable from async

Below are three concrete gaps where the wrapper currently violates these rules, with file:line evidence and proposed fixes. Each has been verified against the current main source tree.


1. Training/upload modules pull torch + unsloth + trl + datasets at module top-level

Where

  • src/praisonai/praisonai/upload_vision.py:8-13
  • src/praisonai/praisonai/train/llm/trainer.py:10-22

Evidence

upload_vision.py (top of file):

import os
import yaml
import torch                                   # heavy: ~500MB + CUDA init
import shutil
import subprocess
from unsloth import FastVisionModel            # heavy: pulls torch, triton, transformers

train/llm/trainer.py (top of file):

import os
import sys
import yaml
import torch
import shutil
import subprocess
from transformers import TextStreamer
from unsloth import FastLanguageModel, is_bfloat16_supported
from trl import SFTTrainer
from transformers import TrainingArguments
from datasets import load_dataset, concatenate_datasets
from psutil import virtual_memory
from unsloth.chat_templates import standardize_sharegpt, get_chat_template

Why it's a gap

The wrapper philosophy is lazy imports + optional deps. The two sibling files in the same package already demonstrate the canonical pattern:

train.py:17-43:

# Lazy import training dependencies to avoid import-time overhead
def _lazy_import_training_deps():
    """Import heavy training dependencies only when needed."""
    try:
        import torch
        from transformers import TextStreamer, TrainingArguments
        from unsloth import FastLanguageModel, is_bfloat16_supported
        ...
        globals().update({ 'torch': torch, ... })
    except ImportError as e:
        raise ImportError(f"Training dependencies not available. Install with: ...")

train_vision.py:17-30 follows the same pattern. upload_vision.py and train/llm/trainer.py are the only training-side modules that were missed when this conversion happened. As a result, any code path that touches these modules (entry-point scanning, an IDE import, a doctor command, python -c "import praisonai.upload_vision") loads CUDA, torch, and unsloth even when the user only wants to upload β€” let alone run an unrelated CLI command.

Fix

Mirror the existing _lazy_import_training_deps() pattern.

upload_vision.py β€” replace the top-level import torch and from unsloth import FastVisionModel with a lazy helper, then call it from __init__ / prepare_model:

# At top of file: keep only stdlib + yaml
import os
import yaml
import shutil
import subprocess


def _lazy_import_vision_upload_deps():
    """Import heavy vision deps only when needed (mirrors train_vision.py)."""
    try:
        import torch
        from unsloth import FastVisionModel
        globals().update({"torch": torch, "FastVisionModel": FastVisionModel})
    except ImportError as e:
        raise ImportError(
            "Vision upload dependencies missing. "
            "Install with: pip install torch unsloth"
        ) from e


class UploadVisionModel:
    def __init__(self, config_path="config.yaml"):
        _lazy_import_vision_upload_deps()
        self.load_config(config_path)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        ...

train/llm/trainer.py β€” apply the exact same transformation already present in train.py. Move the torch/transformers/unsloth/trl/datasets/psutil imports into a _lazy_import_training_deps() function and call it from the trainer's entry point (e.g. __init__ or top of the train()/main() function), then use globals().update(...) so the rest of the module continues to reference them by name.

Impact when fixed

Importing any unrelated wrapper symbol (e.g. from praisonai import PraisonAI) no longer transitively pulls ~2 GB of ML libs and never calls CUDA init unless the user actually invokes training/upload.


2. endpoints/ provider registry bypasses the unified PluginRegistry (no entry-points, no thread safety, no aliases)

Where

  • src/praisonai/praisonai/endpoints/registry.py:13 β€” bare module-level dict
  • vs. src/praisonai/praisonai/_registry.py β€” generic PluginRegistry[T] used by every other registry

Evidence

The wrapper already has a single canonical registry pattern (_registry.PluginRegistry) that supports thread safety (RLock), built-in lazy loaders, importlib.metadata.entry_points discovery, and aliases. Every other registry inherits from it:

src/praisonai/praisonai/framework_adapters/registry.py:50    class FrameworkAdapterRegistry(PluginRegistry[FrameworkAdapter])
src/praisonai/praisonai/persistence/registry.py:22           class StoreRegistry(PluginRegistry[Callable[..., Any]])
src/praisonai/praisonai/llm/registry.py:34                   class LLMProviderRegistry(PluginRegistry[ProviderType])
src/praisonai/praisonai/cli_backends/registry.py             (uses PluginRegistry)
src/praisonai/praisonai/integrations/registry.py             (uses PluginRegistry)
src/praisonai/praisonai/mcp_server/registry.py               (uses PluginRegistry)
src/praisonai/praisonai/recipe/registry.py                   (uses PluginRegistry)

endpoints/registry.py is the lone holdout β€” it uses a bare module dict and a hardcoded _ensure_providers_registered():

# src/praisonai/praisonai/endpoints/registry.py:13
_providers: Dict[str, Type[BaseProvider]] = {}


def register_provider(provider_type: str, provider_class: Type[BaseProvider]) -> None:
    _providers[provider_type] = provider_class           # no lock


def get_provider(provider_type, base_url="...", api_key=None, **kwargs):
    _ensure_providers_registered()
    if provider_type not in _providers:
        return None
    return _providers[provider_type](base_url=base_url, api_key=api_key, **kwargs)


def _ensure_providers_registered() -> None:
    if _providers:
        return
    from .providers.recipe import RecipeProvider
    from .providers.agents_api import AgentsAPIProvider
    from .providers.mcp import MCPProvider
    from .providers.tools_mcp import ToolsMCPProvider
    from .providers.a2a import A2AProvider
    from .providers.a2u import A2UProvider
    register_provider("recipe", RecipeProvider)
    register_provider("agents-api", AgentsAPIProvider)
    register_provider("mcp", MCPProvider)
    register_provider("tools-mcp", ToolsMCPProvider)
    register_provider("a2a", A2AProvider)
    register_provider("a2u", A2UProvider)

Why it's a gap

  1. DRY β€” duplicates the canonical registry, with worse semantics (no RLock, no aliases, no unregister, no list_all_names, no is_available).
  2. Protocol-driven core β€” every other registry exposes an entry_point_group, so third parties can ship a new conversation store / framework adapter / LLM provider via a pyproject.toml entry point with zero changes to PraisonAI. endpoints does not β€” there is no way to register a third-party endpoint provider (e.g. a custom A2A variant) without monkey-patching _providers.
  3. Race β€” _ensure_providers_registered() is invoked lazily without a lock; two threads (e.g. two FastAPI workers serving the discovery endpoint concurrently) can both pass the if _providers: return guard and double-register.

Fix

Convert endpoints/registry.py to match framework_adapters/registry.py exactly:

# src/praisonai/praisonai/endpoints/registry.py
from __future__ import annotations

import threading
from typing import Any, List, Optional, Type

from .providers.base import BaseProvider
from .._registry import PluginRegistry


def _recipe_loader():
    from .providers.recipe import RecipeProvider
    return RecipeProvider


def _agents_api_loader():
    from .providers.agents_api import AgentsAPIProvider
    return AgentsAPIProvider


def _mcp_loader():
    from .providers.mcp import MCPProvider
    return MCPProvider


def _tools_mcp_loader():
    from .providers.tools_mcp import ToolsMCPProvider
    return ToolsMCPProvider


def _a2a_loader():
    from .providers.a2a import A2AProvider
    return A2AProvider


def _a2u_loader():
    from .providers.a2u import A2UProvider
    return A2UProvider


_BUILTIN_PROVIDERS = {
    "recipe":      _recipe_loader,
    "agents-api":  _agents_api_loader,
    "mcp":         _mcp_loader,
    "tools-mcp":   _tools_mcp_loader,
    "a2a":         _a2a_loader,
    "a2u":         _a2u_loader,
}


class ProviderRegistry(PluginRegistry[Type[BaseProvider]]):
    """Endpoint provider registry β€” unified with the rest of the wrapper."""

    def __init__(self) -> None:
        super().__init__(
            entry_point_group="praisonai.endpoint_providers",
            builtins=_BUILTIN_PROVIDERS,
        )

    def get(
        self,
        provider_type: str,
        base_url: str = "http://localhost:8765",
        api_key: Optional[str] = None,
        **kwargs: Any,
    ) -> Optional[BaseProvider]:
        try:
            cls = self.resolve(provider_type)
        except ValueError:
            return None
        return cls(base_url=base_url, api_key=api_key, **kwargs)


_default_registry: Optional[ProviderRegistry] = None
_default_lock = threading.Lock()


def get_default_registry() -> ProviderRegistry:
    global _default_registry
    if _default_registry is None:
        with _default_lock:
            if _default_registry is None:
                _default_registry = ProviderRegistry()
    return _default_registry


# Module-level functions kept for backwards compat β€” now delegate to the registry
def register_provider(provider_type: str, provider_class: Type[BaseProvider]) -> None:
    get_default_registry().register(provider_type, provider_class)


def get_provider(provider_type, base_url="http://localhost:8765", api_key=None, **kwargs):
    return get_default_registry().get(provider_type, base_url=base_url, api_key=api_key, **kwargs)


def list_provider_types() -> List[str]:
    return get_default_registry().list_names()


def get_provider_class(provider_type: str) -> Optional[Type[BaseProvider]]:
    try:
        return get_default_registry().resolve(provider_type)
    except ValueError:
        return None

This removes the bare _providers dict, eliminates the _ensure_providers_registered() race, and immediately enables third parties to ship endpoint providers via:

# pyproject.toml of an external package
[project.entry-points."praisonai.endpoint_providers"]
my-custom-a2a = "mypkg.endpoints:MyCustomA2AProvider"

Impact when fixed

One consistent registry pattern across all extension points; external endpoint providers become installable without monkey-patching; the lazy-load + thread-safety semantics match the rest of the wrapper.


3. DaemonManager.stop_daemon blocks with time.sleep and has no async counterpart

Where

  • src/praisonai/praisonai/scheduler/daemon_manager.py:121-155

Evidence

# src/praisonai/praisonai/scheduler/daemon_manager.py
def stop_daemon(self, pid: int, timeout: int = 10) -> bool:
    """Stop a daemon process gracefully."""
    try:
        os.kill(pid, signal.SIGTERM)

        # Wait for process to terminate
        import time
        for _ in range(timeout * 10):
            try:
                os.kill(pid, 0)         # check if still alive
                time.sleep(0.1)         # <-- blocks the event loop
            except (OSError, ProcessLookupError):
                return True

        # Force kill if still alive
        try:
            os.kill(pid, signal.SIGKILL)
            time.sleep(0.2)             # <-- blocks the event loop
        except (OSError, ProcessLookupError):
            pass

        return True
    except (OSError, ProcessLookupError):
        return False

scheduler/deployment.py:161 has the same anti-pattern:

time.sleep(30)  # Wait before retry

Why it's a gap

scheduler/async_agent_scheduler.py:65 introduces AsyncAgentScheduler, an explicitly async-native scheduler with asyncio.Event, asyncio.Lock, and loop-bound primitives:

# scheduler/async_agent_scheduler.py
class AsyncAgentScheduler:
    """Async-native scheduler ... No global state pollution / native async coordination."""

    async def stop(self) -> bool:
        ...
        self._stop_event.set()
        await asyncio.wait_for(self._task, timeout=10)

But once an async-scheduled job (or any async caller β€” a FastAPI route, an MCP server tool, an UI task) needs to manage the daemon lifecycle (stop_daemon, retry-with-backoff in deployment.py), it hits sync time.sleep and stalls the entire event loop for up to timeout seconds. This contradicts the "async-safe by default" rule.

Worse, there is no async API surface at all β€” there is no AsyncDaemonManager, no astop_daemon, no async retry β€” so an async caller has only two options today: call the sync method and freeze the loop, or wrap every call in asyncio.to_thread(...) at every call site (DRY violation, and easy to forget).

Fix

Add async-native variants alongside the sync ones (same pattern that AsyncAgentScheduler follows next to AgentScheduler):

# src/praisonai/praisonai/scheduler/daemon_manager.py
import asyncio

class DaemonManager:
    ...

    async def astop_daemon(self, pid: int, timeout: int = 10) -> bool:
        """Async variant of stop_daemon β€” never blocks the event loop."""
        try:
            os.kill(pid, signal.SIGTERM)

            for _ in range(timeout * 10):
                try:
                    os.kill(pid, 0)
                    await asyncio.sleep(0.1)         # cooperative wait
                except (OSError, ProcessLookupError):
                    return True

            try:
                os.kill(pid, signal.SIGKILL)
                await asyncio.sleep(0.2)
            except (OSError, ProcessLookupError):
                pass

            return True
        except (OSError, ProcessLookupError):
            return False

And in scheduler/deployment.py:161, replace:

time.sleep(30)  # Wait before retry

with the existing _stop_event-aware wait used in agent_scheduler.py:296 (sync path), and add an async def aretry_* variant that awaits asyncio.sleep. For symmetry, also expose aget_status and aread_logs (the latter can use aiofiles or asyncio.to_thread for file IO).

For the existing sync entry points used by the CLI (praisonai schedule stop ...), keep stop_daemon unchanged so the CLI path stays sync β€” but make the async surface explicit so async code never needs to call the blocking version.

Impact when fixed

AsyncAgentScheduler and every async caller (FastAPI routes that hit /schedules/{name}/stop, async deployment retries, etc.) can manage daemons without freezing the event loop. The async/sync split mirrors what scheduler/ already does with agent_scheduler.py vs async_agent_scheduler.py and what persistence/ already does with sqlite.py vs async_sqlite.py.


Summary

# File:line Violates Effort
1 upload_vision.py:8-13, train/llm/trainer.py:10-22 Lazy imports + optional deps Small (mechanical)
2 endpoints/registry.py:13 DRY, protocol-driven core, entry-point extensibility Small
3 scheduler/daemon_manager.py:121-155, scheduler/deployment.py:161 Async-safe by default Small–Medium

All three are local, mechanical changes β€” no public API breakage required (each fix keeps the existing sync API; the registry change keeps register_provider/get_provider as thin shims).

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysis

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions