Skip to content

Wrapper layer gaps: sync/async path divergence, file-handle leak in kanban dispatcher, swallowed errors blocking task releaseΒ #1863

@MervinPraison

Description

@MervinPraison

Summary

After an in-depth pass over src/praisonai/praisonai/, three concrete gaps stand out that directly contradict the project philosophy (multi-agent safe + async-safe by default, production-ready, DRY). They are not stylistic β€” each one produces a real, observable behavioral defect today.

Each finding below is reproduced against the current main (HEAD a9f4bd5) with file paths and line numbers, plus a proposed fix.


1. arun() silently skips observability initialization and adapter setup β€” sync and async paths have drifted

File: src/praisonai/praisonai/agents_generator.py
Sync path: generate_crew_and_kickoff β€” lines 472–626
Async path: agenerate_crew_and_kickoff + _arun_framework β€” lines 628–795

Where the issue is

The synchronous generate_crew_and_kickoff performs three lifecycle calls that are completely absent from the async path:

# agents_generator.py:596-617  (SYNC path only)
framework = self.framework or config.get('framework', 'crewai')

# Get initial adapter and resolve to concrete variant
initial_adapter = self._get_framework_adapter(framework)
adapter = initial_adapter.resolve()                       # <-- async path does NOT call .resolve()

from .framework_adapters.validators import assert_framework_available
assert_framework_available(adapter.name)

# Initialize observability hooks
from .observability.hooks import init_observability
init_observability(adapter.name)                          # <-- async path NEVER initializes observability

# Run adapter setup hooks
adapter.setup(framework_tag=adapter.name)                 # <-- async path NEVER runs adapter setup

grep confirms these three calls exist exactly once in the file β€” only in the sync path:

599:        initial_adapter = self._get_framework_adapter(framework)
600:        adapter = initial_adapter.resolve()
607:        from .observability.hooks import init_observability
608:        init_observability(adapter.name)
611:        adapter.setup(framework_tag=adapter.name)

Meanwhile _arun_framework (lines 742–787) takes its own divergent shape:

# agents_generator.py:742-787  (ASYNC path only)
framework = self.framework or config.get('framework', 'crewai')

# AutoGen version selection logic β€” does NOT exist in the sync path
if framework == "autogen":
    autogen_v4_adapter = self._get_framework_adapter("autogen_v4")
    autogen_v2_adapter = self._get_framework_adapter("autogen")
    ...
    framework = "autogen_v4" if use_v4 else "autogen"

# AgentOps init β€” does NOT exist in the sync path
agentops_api_key = os.getenv("AGENTOPS_API_KEY")
if agentops_api_key:
    try:
        import agentops
        agentops.init(agentops_api_key, default_tags=[framework])
    except ImportError:
        pass

And on top of that, the tool-resolution loop differs subtly:

# SYNC (line 561) β€” instantiates class tools eagerly
resolved_tool = self.tool_resolver.resolve(tool_name, instantiate=True)

# ASYNC (line 704) β€” does NOT pass instantiate=True; instantiation is open-coded below
resolved_tool = self.tool_resolver.resolve(tool_name)
...
tools_dict[tool_name] = (
    resolved_tool() if inspect.isclass(resolved_tool) else resolved_tool
)

Format-conversion blocks (sync lines 515–535 vs async lines 663–675) are also near-identical copies that have already begun drifting (the sync copy still carries comments that the async one dropped).

Why it matters

Same AgentsGenerator(...), same YAML, two entrypoints β€” run() and arun() β€” produce different runtime behavior:

Behavior run() (sync) arun() (async)
adapter.resolve() for framework variant βœ… ❌
init_observability(adapter.name) βœ… ❌
adapter.setup(framework_tag=...) βœ… ❌
Manual autogen v2/v4 selection ❌ βœ…
agentops.init(...) from AGENTOPS_API_KEY ❌ βœ…
instantiate=True in tool resolver call βœ… ❌

Concrete user-visible consequences:

  • Langfuse/observability traces silently disappear the moment a user switches to arun() or await gen.agenerate_crew_and_kickoff(). Observability hooks are never wired.
  • Adapter setup() hooks never fire on the async path (any framework adapter that depends on setup() for telemetry, callback registration, or environment preparation is broken under async).
  • Drift is one-directional: AgentOps init is only on the async path, so a sync run never registers with AgentOps even if the env var is set.
  • Every future bug fix has to be applied twice and review pressure inevitably misses one side.

This is a direct violation of the β€œmulti-agent + async safe by default” pillar β€” same feature, three ways (CLI, YAML, Python), but only two of the three (sync paths) are actually fully wired.

Proposed fix

Extract the shared preparation into one method, then have the two entrypoints reduce to just prepare β†’ dispatch:

# agents_generator.py

def _prepare_run(self, config):
    """Shared preparation for sync and async runs.

    Returns:
        Tuple of (adapter, topic, tools_dict, framework_name, config) ready
        for adapter.run()/adapter.arun() dispatch.
    """
    # --- Canonical format conversion ---------------------------------
    if 'agents' in config and 'roles' not in config:
        config['roles'] = {}
        for agent_name, agent_config in config['agents'].items():
            role_config = dict(agent_config) if agent_config else {}
            if 'instructions' in role_config and 'backstory' not in role_config:
                role_config['backstory'] = role_config['instructions']
            if 'role' not in role_config:
                role_config['role'] = agent_name.replace('_', ' ').title()
            if 'goal' not in role_config:
                role_config['goal'] = role_config.get('backstory', 'Complete the assigned task')
            if 'backstory' not in role_config:
                role_config['backstory'] = f'You are a {role_config["role"]}'
            config['roles'][agent_name] = role_config

    topic = config.get('input', config.get('topic', ''))
    self._validate_agents_config(config)

    # --- Tool resolution (single source of truth) --------------------
    tools_dict = self._collect_tools(config)

    # --- Framework adapter resolution + lifecycle --------------------
    framework = self.framework or config.get('framework', 'crewai')
    initial_adapter = self._get_framework_adapter(framework)
    adapter = initial_adapter.resolve()

    from .framework_adapters.validators import assert_framework_available
    assert_framework_available(adapter.name)

    # Observability + adapter setup β€” MUST run for both sync and async
    from .observability.hooks import init_observability
    init_observability(adapter.name)
    adapter.setup(framework_tag=adapter.name)

    # AgentOps (was only in async path β€” should apply to both)
    agentops_api_key = os.getenv("AGENTOPS_API_KEY")
    if agentops_api_key:
        try:
            import agentops
            agentops.init(agentops_api_key, default_tags=[adapter.name])
        except ImportError:
            pass

    self.framework = adapter.name
    self.framework_adapter = adapter
    self._validate_cli_backend_compatibility(config, adapter.name)
    self.logger.info(f"Using framework: {adapter.name}")

    return adapter, topic, tools_dict, config


def _load_config(self):
    if self.agent_yaml:
        return yaml.safe_load(self.agent_yaml)
    if self.agent_file in ('/app/api:app', 'api:app'):
        self.agent_file = 'agents.yaml'
    with open(self.agent_file, 'r') as f:
        config = yaml.safe_load(f)
    if self.cli_config:
        self._merge_cli_config(config, self.cli_config)
    return config


def generate_crew_and_kickoff(self):
    config = self._load_config()
    if config is None:
        return
    if self._is_workflow_yaml(config):
        return self._run_yaml_workflow(config)

    adapter, topic, tools_dict, config = self._prepare_run(config)
    return adapter.run(
        config, self.config_list, topic,
        tools_dict=tools_dict,
        agent_callback=getattr(self, 'agent_callback', None),
        task_callback=getattr(self, 'task_callback', None),
        cli_config=getattr(self, 'cli_config', None),
    )


async def agenerate_crew_and_kickoff(self):
    config = self._load_config()
    if config is None:
        return
    if self._is_workflow_yaml(config):
        return await self._arun_yaml_workflow(config)

    adapter, topic, tools_dict, config = self._prepare_run(config)
    return await adapter.arun(
        config, self.config_list, topic,
        tools_dict=tools_dict,
        agent_callback=getattr(self, 'agent_callback', None),
        task_callback=getattr(self, 'task_callback', None),
        cli_config=getattr(self, 'cli_config', None),
    )

This collapses ~150 lines of duplicated logic into one shared path, makes observability and adapter setup mandatory on both sides, and removes the AgentOps/AutoGen-version drift.


2. File handle leak in KanbanDispatcher._spawn_worker

File: src/praisonai/praisonai/gateway/kanban_dispatcher.py
Line: 155

Where the issue is

# kanban_dispatcher.py:147-166
# Start process with output redirect to avoid deadlock
import tempfile
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.log') as temp_log:
    temp_log_path = temp_log.name

process = subprocess.Popen(
    cmd,
    env=env,
    stdout=open(temp_log_path, 'w'),     # <-- file object is never assigned, never closed
    stderr=subprocess.STDOUT,
    text=True
)

# Store log path for later cleanup
if not hasattr(self, '_temp_logs'):
    self._temp_logs = {}
self._temp_logs[task.id] = temp_log_path

# Track the running task
self.running_tasks[task.id] = process

grep confirms this is the only place in the package that constructs an inline open(...) argument to subprocess.Popen without storing the handle:

$ rg 'stdout=open\(|stderr=open\(' src/praisonai/praisonai
src/praisonai/praisonai/gateway/kanban_dispatcher.py:155: stdout=open(temp_log_path, 'w'),

Why it matters

Popen does dup the file descriptor for the child, but the parent process keeps its own copy of the FD open until garbage collection β€” the file object is anonymous and held only by Popen for as long as the process tracks it. Even after the subprocess exits and _cleanup_completed_tasks (line 200) removes the process from running_tasks, the parent FD is only released when CPython gets around to GC-ing the Popen object.

For a gateway that polls every poll_interval=5.0s and spawns up to max_concurrent=3 worker subprocesses, this is a slow but unbounded leak. On long-running deployments β€” exactly the production case this dispatcher targets β€” the dispatcher eventually trips the per-process FD limit and crashes with OSError: [Errno 24] Too many open files. Worse, by that point in-flight tasks are already in claimed state in the SQLite store, so a restart leaves orphaned claims that other workers can’t pick up.

Compare the correct pattern already used in scheduler/daemon_manager.py:54-65 (the file is opened, passed to Popen, then the parent-side handle is dropped at the end of the with block β€” Popen keeps the child-side FD it duped).

Proposed fix

Either keep an explicit reference and close it on cleanup, or open inside a with block (parent drops its FD after Popen dups it):

# Option A β€” explicit ownership, close in _cleanup_completed_tasks
log_handle = open(temp_log_path, 'w')
try:
    process = subprocess.Popen(
        cmd,
        env=env,
        stdout=log_handle,
        stderr=subprocess.STDOUT,
        text=True,
    )
except BaseException:
    log_handle.close()
    raise

self.running_tasks[task.id] = process
self._temp_logs[task.id] = temp_log_path
self._log_handles[task.id] = log_handle  # close in _cleanup_completed_tasks

# ...in _cleanup_completed_tasks, after the subprocess is reaped:
handle = self._log_handles.pop(task_id, None)
if handle is not None:
    handle.close()
# Option B β€” let the parent FD drop immediately, like daemon_manager.py
with open(temp_log_path, 'w') as log_handle:
    process = subprocess.Popen(
        cmd,
        env=env,
        stdout=log_handle,
        stderr=subprocess.STDOUT,
        text=True,
    )
# parent FD closed here; child still has its duped copy
self.running_tasks[task.id] = process
self._temp_logs[task.id] = temp_log_path

Option B matches the existing convention elsewhere in the codebase and is the smaller diff.


3. Bare except: in the kanban dispatcher silently strands tasks and breaks clean shutdown

File: src/praisonai/praisonai/gateway/kanban_dispatcher.py
Lines: 108–114, 273–279, 332–338

grep confirms this file is one of only two in the package (the other is a .bak backup) with bare except::

$ rg '^\s+except:\s*$' src/praisonai/praisonai
src/praisonai/praisonai/gateway/kanban_dispatcher.py
src/praisonai/praisonai/cli/features/agent_scheduler.py.bak

Where the issue is

Site 1 β€” claim-release in dispatch_once (lines 108–114):

except Exception as e:
    logger.error(f"Error processing task {task.id}: {e}")
    # Release claim on error
    try:
        store.release_claim(task.id, self.worker_id)
    except:                      # <-- bare except
        pass                     # <-- silent swallow

Site 2 β€” claim-release in _cleanup_completed_tasks (lines 273–279):

except Exception as e:
    logger.error(f"Error processing completed task {task_id}: {e}")
    # Release claim as fallback
    try:
        store.release_claim(task_id, self.worker_id)
    except:                      # <-- bare except
        pass                     # <-- silent swallow

Site 3 β€” force-terminate in _shutdown (lines 332–338):

for task_id, process in self.running_tasks.items():
    try:
        logger.warning(f"Force terminating task {task_id}")
        process.terminate()
        process.wait(timeout=5)
    except:                      # <-- bare except
        process.kill()           # always runs on KeyboardInterrupt too

Why it matters

Bare except: catches BaseException, which means KeyboardInterrupt, SystemExit, and asyncio.CancelledError are all swallowed.

  1. Stranded claims (sites 1 & 2): When release_claim fails (DB locked, connection dropped, network blip), the failure is silently swallowed. The task remains claimed by a worker that no longer holds it. The next dispatch cycle sees the task is "in progress" and skips it. Other workers can’t pick it up either β€” they look for status='ready'. The task is stuck until someone manually clears the claim. Because the error is pass’d with no log, operators have nothing to triage from. For a feature whose entire purpose is multi-agent coordination, that’s a quiet deadlock.

  2. Broken shutdown (site 3): A KeyboardInterrupt arriving mid-terminate() jumps into process.kill() instead of propagating. Combined with the leaked file handle from finding Merge pull request #1 from MervinPraison/developΒ #2, Ctrl-C against a busy dispatcher leaves zombie processes, leaked FDs, and stuck claims β€” none of which surface in logs.

  3. Async cancellation is swallowed: run_forever (line 295) is driven by asyncio.create_task (line 368 in start_kanban_dispatcher). When the task is cancelled (e.g. server shutdown), cancellation enters _shutdown β†’ loops the tasks β†’ may hit process.wait(timeout=5). The bare except at line 337 catches CancelledError, runs process.kill(), and continues the loop. Cancellation never propagates β€” the parent await may hang past the shutdown timeout.

Proposed fix

Three small, targeted changes β€” narrow the catches, log the failures, never silently pass:

# Site 1 (lines 108-114)
except Exception as e:
    logger.error(f"Error processing task {task.id}: {e}")
    try:
        store.release_claim(task.id, self.worker_id)
    except Exception as release_err:
        logger.error(
            "Failed to release claim for task %s (worker %s); task may be stuck "
            "in 'claimed' state until manually released: %s",
            task.id, self.worker_id, release_err,
            exc_info=True,
        )

# Site 2 (lines 273-279) β€” same shape
except Exception as e:
    logger.error(f"Error processing completed task {task_id}: {e}")
    try:
        store.release_claim(task_id, self.worker_id)
    except Exception as release_err:
        logger.error(
            "Failed to release claim during cleanup for task %s: %s",
            task_id, release_err,
            exc_info=True,
        )

# Site 3 (lines 332-338) β€” let BaseException propagate; only handle timeout/oserror
for task_id, process in self.running_tasks.items():
    logger.warning(f"Force terminating task {task_id}")
    try:
        process.terminate()
        process.wait(timeout=5)
    except subprocess.TimeoutExpired:
        logger.warning(f"Task {task_id} did not terminate in 5s; sending SIGKILL")
        process.kill()
    except OSError as os_err:
        logger.error(f"OS error while terminating task {task_id}: {os_err}")
        process.kill()
    # KeyboardInterrupt, SystemExit, CancelledError now propagate as intended

Scope of validation

  • All three findings were validated by reading the actual files at HEAD a9f4bd5 on branch claude/bold-bohr-Yb720.
  • A separate ruamel.yaml safety claim that came up during the investigation was tested and confirmed to be a false positive (ruamel.yaml.YAML() round-trip loader is safe against !!python/... tags), so it was excluded from this issue.
  • Line numbers above match the current source.

Suggested order of attack

  1. Finding Github actions fixΒ #1 is the highest-leverage fix β€” it removes ~150 lines of drifting duplication and restores observability/adapter lifecycle parity between run() and arun(). Touch this once; never re-fix bugs in two places again.
  2. Finding MainΒ #3 is a low-risk surgical change with immediate impact on multi-agent coordination correctness.
  3. Finding Merge pull request #1 from MervinPraison/developΒ #2 is a one-line fix that unblocks long-running gateway deployments.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysisperformance

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions