Skip to content

CLAUDE.md

trentleslie edited this page Nov 19, 2025 · 1 revision

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Project Overview

Nexify is a workflow orchestration engine featuring FlowForge, a YAML-driven system for building data pipelines and automating complex workflows. The engine executes self-registering actions through a central registry, providing type-safe execution with Pydantic validation and flexible context handling.

This codebase was extracted from the BiOMapper project to serve as a general-purpose orchestration engine while BiOMapper continues as a specialized biological data harmonization toolkit.

Development Commands

Setup and Installation

# Install dependencies (base installation)
poetry install

# Install with API support (FastAPI + Uvicorn)
poetry install -E api

# Install all extras
poetry install -E full

Testing

# Run all tests with coverage
poetry run pytest

# Run specific test categories
poetry run pytest tests/unit/
poetry run pytest tests/integration/

# Run with coverage report
poetry run pytest --cov=nexify --cov-report=html

# Run specific test file
poetry run pytest -xvs tests/unit/path/to/test_file.py

# Run tests matching a pattern
poetry run pytest -k "test_pattern_name"

Code Quality

# Format code with Ruff
poetry run ruff format .

# Lint and auto-fix issues
poetry run ruff check . --fix

# Type checking with mypy
poetry run mypy src/nexify/

# Run all quality checks (recommended before commits)
poetry run ruff format . && poetry run ruff check . --fix && poetry run mypy src/nexify/

Running Workflows

# Execute a workflow via Python
python -c "
from nexify.core.minimal_strategy_service import MinimalStrategyService
import asyncio

async def run():
    service = MinimalStrategyService('examples/strategies')
    result = await service.execute_strategy('strategy_name', input_identifiers=['id1', 'id2'])
    print(result)

asyncio.run(run())
"

Architecture Overview

Core Components

FlowForge Orchestration Engine (src/nexify/core/)

  • minimal_strategy_service.py: Main orchestration service that loads and executes YAML workflows
  • exceptions.py: Standardized error handling with error codes
  • models/execution_context.py: Pydantic models for execution state
  • infrastructure/parameter_resolver.py: Resolves ${parameters.key} placeholders in workflows

Action Registry System (src/nexify/actions/)

  • registry.py: Central ACTION_REGISTRY dict mapping action names to classes
  • typed_base.py: TypedStrategyAction base class for type-safe actions
  • base.py: Legacy base class for backward compatibility
  • Actions auto-register via @register_action("ACTION_NAME") decorator

Standards Layer (src/nexify/standards/)

  • context_handler.py: UniversalContext wrapper for dict/object context compatibility
  • base_models.py: Pydantic base classes (ActionParamsBase, FlexibleBaseModel, etc.)
  • file_loader.py: Robust file loading with format detection
  • debug_tracer.py: Debug tracing for specific identifiers through pipelines
  • known_issues.py: Registry of documented edge cases and workarounds

API Layer (src/nexify/api/) - Optional

  • main.py: FastAPI application entry point
  • routes/: API endpoints for strategy execution
  • services/: Service layer wrapping core functionality

Client (src/nexify/client/)

  • client_v2.py: Python client for API interactions
  • models.py: Client-side data models
  • progress.py: Progress tracking utilities

Data Flow Architecture

YAML Workflow → MinimalStrategyService → Parameter Resolver
                         ↓
              Load from ACTION_REGISTRY
                         ↓
              TypedStrategyAction.execute()
                         ↓
         Pydantic validation (ActionParamsBase)
                         ↓
         UniversalContext (datasets/stats/files)
                         ↓
              execute_typed() implementation
                         ↓
              ActionResult → Context updates

Context Handling (Critical Pattern)

Actions receive context in two forms:

  1. Dict context: Legacy format, used by MVP actions
  2. Pydantic context: StrategyExecutionContext for type safety

Always use UniversalContext to handle both:

from nexify.standards.context_handler import UniversalContext

async def execute_typed(self, params: MyParams, context: Dict) -> ActionResult:
    ctx = UniversalContext.wrap(context)
    input_data = ctx.get("datasets", {}).get(params.input_key)
    # ... process data ...
    ctx.set("datasets", {params.output_key: output_data})

The orchestration service (MinimalStrategyService) automatically:

  • Creates dual contexts (dict + Pydantic)
  • Syncs data between them after each action
  • Chooses appropriate context based on action compatibility

Action Organization

Actions are organized by domain:

  • entities/: Entity-specific actions (proteins, metabolites, chemistry)
    • proteins/: UniProt, Ensembl, gene symbol processing
    • metabolites/: HMDB, InChIKey, CHEBI handling
    • chemistry/: LOINC, clinical test matching
  • algorithms/: Reusable computational algorithms
  • utils/: General-purpose utilities
  • io/: Data input/output operations
  • workflows/: Composite multi-step actions
  • reports/: Analysis and reporting actions

Parameter Resolution

Workflows support dynamic parameter substitution:

parameters:
  input_file: "/data/proteins.tsv"
  output_dir: "/results"

steps:
  - action:
      type: LOAD_DATASET
      params:
        file_path: "${parameters.input_file}"  # Resolved at runtime
        output_key: "raw_data"

Supports:

  • Parameter references: ${parameters.key}
  • Metadata references: ${metadata.key}
  • Nested resolution in dicts, lists, and strings

Conditional Step Execution

Steps can include conditions:

steps:
  - name: optional_step
    condition: "1 in ${parameters.stages_to_run}"
    action:
      type: SOME_ACTION

Conditions are Python expressions evaluated safely with parameter substitution.

Creating New Actions

Standard Action Template

from nexify.actions.typed_base import TypedStrategyAction
from nexify.actions.registry import register_action
from nexify.standards.base_models import ActionParamsBase
from nexify.standards.context_handler import UniversalContext
from nexify.core.exceptions import ActionResult
from pydantic import Field
from typing import Dict, Any

class MyActionParams(ActionParamsBase):
    """Parameters for MyAction.

    Inherits common fields: debug, trace, timeout, continue_on_error, etc.
    """
    input_key: str = Field(..., description="Input dataset key from context")
    output_key: str = Field(..., description="Output dataset key to store in context")
    threshold: float = Field(0.8, ge=0.0, le=1.0, description="Processing threshold")

@register_action("MY_CUSTOM_ACTION")
class MyCustomAction(TypedStrategyAction[MyActionParams, ActionResult]):
    """Brief description of what this action does.

    Detailed explanation of:
    - Input expectations
    - Processing logic
    - Output format
    - Edge cases handled
    """

    def get_params_model(self) -> type[MyActionParams]:
        return MyActionParams

    async def execute_typed(self, params: MyActionParams, context: Dict) -> ActionResult:
        """Execute the action with validated parameters.

        Args:
            params: Validated action parameters
            context: Execution context (dict or StrategyExecutionContext)

        Returns:
            ActionResult with success status and details
        """
        # Wrap context for safe access
        ctx = UniversalContext.wrap(context)

        # Retrieve input data
        datasets = ctx.get("datasets", {})
        if params.input_key not in datasets:
            return ActionResult(
                success=False,
                message=f"Dataset '{params.input_key}' not found",
                details={"available_keys": list(datasets.keys())}
            )

        input_data = datasets[params.input_key]

        # Process data
        output_data = self._process_data(input_data, params.threshold)

        # Store result in context
        datasets[params.output_key] = output_data
        ctx.set("datasets", datasets)

        return ActionResult(
            success=True,
            message=f"Processed {len(output_data)} items",
            details={
                "input_count": len(input_data),
                "output_count": len(output_data),
                "threshold": params.threshold
            }
        )

    def _process_data(self, data: Any, threshold: float) -> Any:
        """Private helper method for processing logic."""
        # Implementation here
        return data

Action Registration

Actions auto-register when imported. Ensure your action module is imported in the appropriate __init__.py:

# src/nexify/actions/entities/proteins/__init__.py
from .my_action import *  # Triggers @register_action decorator

Testing Actions

Create tests following the three-level pattern:

# tests/unit/core/actions/test_my_action.py
import pytest
from nexify.actions.registry import ACTION_REGISTRY

@pytest.mark.asyncio
async def test_my_action_minimal():
    """Level 1: Minimal unit test (<1s)"""
    action_class = ACTION_REGISTRY["MY_CUSTOM_ACTION"]
    action = action_class()

    params = {"input_key": "test_data", "output_key": "result", "threshold": 0.8}
    context = {"datasets": {"test_data": [1, 2, 3]}}

    result = await action.execute(
        current_identifiers=[],
        current_ontology_type="protein",
        action_params=params,
        source_endpoint=None,
        target_endpoint=None,
        context=context
    )

    assert "result" in context["datasets"]
    assert result.get("details", {}).get("success") is not False

@pytest.mark.asyncio
async def test_my_action_integration():
    """Level 2: Integration test with realistic data (<10s)"""
    # Test with larger dataset, edge cases, etc.
    pass

@pytest.mark.integration
@pytest.mark.asyncio
async def test_my_action_production_subset():
    """Level 3: Production-like subset test (<60s)"""
    # Test with real data patterns, performance validation
    pass

Important Implementation Patterns

Using UniversalContext

Always wrap context to handle both dict and object types:

ctx = UniversalContext.wrap(context)
datasets = ctx.get("datasets", {})
ctx.set("datasets", updated_datasets)

Pydantic Model Flexibility

Use ActionParamsBase for parameters that should accept extra fields:

class MyParams(ActionParamsBase):  # Inherits extra='allow'
    required_field: str
    # Unknown fields won't cause validation errors

For strict validation, use StrictBaseModel instead.

Error Handling

Return structured errors via ActionResult:

if error_condition:
    return ActionResult(
        success=False,
        message="Clear error description",
        details={"error_code": "VALIDATION_ERROR", "field": "problematic_field"}
    )

Dataset Storage Convention

Store tabular data in context["datasets"]:

ctx.set("datasets", {
    "dataset_key": df,  # pandas DataFrame or list of dicts
    "another_key": processed_data
})

Other context keys:

  • statistics: Dict of computed statistics
  • output_files: List of file paths generated
  • provenance: List of processing history records
  • custom_action_data: Free-form action-specific data

Debug Tracing

Enable identifier tracing through pipelines:

debug_config = {
    'trace_identifiers': ['P12345', 'Q6EMK4'],
    'save_trace': '/tmp/debug_trace.json',
    'check_known_issues': True
}

result = await service.execute_strategy(
    'my_workflow',
    debug_config=debug_config
)

Traces log each action for specified identifiers to help debug data transformations.

Workflow YAML Structure

Basic Workflow Format

name: workflow_name
description: Brief description of workflow purpose
parameters:
  input_file: "/default/path.tsv"
  threshold: 0.8
metadata:
  version: "1.0"
  author: "Your Name"

steps:
  - name: load_data
    action:
      type: LOAD_DATASET_IDENTIFIERS
      params:
        file_path: "${parameters.input_file}"
        identifier_column: id
        output_key: raw_data

  - name: process
    condition: "${parameters.threshold} > 0.5"  # Optional
    action:
      type: MY_CUSTOM_ACTION
      params:
        input_key: raw_data
        output_key: processed_data
        threshold: "${parameters.threshold}"

  - name: export
    action:
      type: EXPORT_DATASET_V2
      params:
        input_key: processed_data
        file_path: "${parameters.output_dir}/results.tsv"
        format: tsv

Common Action Types

  • LOAD_DATASET_IDENTIFIERS: Load data from file
  • MERGE_DATASETS: Merge multiple datasets
  • EXPORT_DATASET_V2: Export dataset to file
  • CUSTOM_TRANSFORM_EXPRESSION: Apply pandas transformations
  • See ACTION_REGISTRY keys for full list

Common Development Patterns

Running Strategies from Python

from nexify.core.minimal_strategy_service import MinimalStrategyService
import asyncio

async def main():
    service = MinimalStrategyService(strategies_dir="./workflows")

    result = await service.execute_strategy(
        strategy_name="my_workflow",
        input_identifiers=["id1", "id2", "id3"],
        context={"parameters": {"threshold": 0.9}}  # Override defaults
    )

    print(f"Processed {len(result['current_identifiers'])} identifiers")
    print(f"Output datasets: {list(result['datasets'].keys())}")

asyncio.run(main())

Accessing Action Registry

from nexify.actions.registry import ACTION_REGISTRY

# List all registered actions
print(f"Available actions: {list(ACTION_REGISTRY.keys())}")

# Get action class
action_class = ACTION_REGISTRY["MY_ACTION"]
action = action_class()

Working with Datasets in Context

# Reading datasets
ctx = UniversalContext.wrap(context)
datasets = ctx.get("datasets", {})
my_data = datasets.get("key_name")

# Writing datasets
datasets["new_key"] = processed_df
ctx.set("datasets", datasets)

# Or use direct access if available
ctx.set("datasets", {**datasets, "new_key": processed_df})

Testing Strategy

Test Organization

  • tests/unit/: Fast unit tests (<1s each)
  • tests/integration/: Integration tests (<10s each)
  • tests/performance/: Performance benchmarks
  • tests/test_edge_cases.py: Known edge case validation

Running Focused Tests

# Test specific action
poetry run pytest tests/unit/core/actions/test_my_action.py -v

# Test with debugging
poetry run pytest tests/unit/core/actions/test_my_action.py -xvs

# Test with coverage for specific module
poetry run pytest tests/unit/core/actions/ --cov=nexify.actions --cov-report=term

Test Coverage Standards

  • Minimum coverage: 75% (enforced in pyproject.toml)
  • New actions should have >80% coverage
  • Critical paths (orchestration, registry) should have >90% coverage

Performance Considerations

Large Dataset Handling

For datasets >1000 rows, consider chunking:

def process_large_dataset(df: pd.DataFrame, chunk_size: int = 10000):
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size]
        yield process_chunk(chunk)

Algorithm Complexity

Avoid O(n²) operations on large datasets. Prefer:

  • Vectorized pandas operations
  • Set-based lookups instead of nested loops
  • Pre-computed indexes/mappings

Caching

Use @lru_cache for expensive computations called repeatedly:

from functools import lru_cache

@lru_cache(maxsize=128)
def expensive_lookup(identifier: str) -> str:
    # Computation here
    return result

Code Style and Conventions

Naming Conventions

  • Action classes: PascalCase (e.g., LoadDatasetIdentifiersAction)
  • Action registration names: UPPER_SNAKE_CASE (e.g., LOAD_DATASET_IDENTIFIERS)
  • Functions/methods: snake_case (e.g., process_identifiers)
  • Constants: UPPER_SNAKE_CASE (e.g., DEFAULT_TIMEOUT)

Import Order (Ruff/isort enforced)

  1. Standard library imports
  2. Third-party imports
  3. Local application imports

Type Hints

  • Always include type hints for function signatures
  • Use from typing import for generic types
  • Mypy strict mode is enabled for src/nexify/

Docstrings

  • Use Google-style docstrings
  • Required for public functions/classes
  • Include Args, Returns, Raises sections as applicable

Integration with BiOMapper

While Nexify is now independent, it maintains compatibility with BiOMapper patterns:

  • Action naming conventions align with BiOMapper standards
  • Context handling supports both legacy and modern patterns
  • Biological data actions (proteins, metabolites) originated from BiOMapper

When working on biological data features, refer to BiOMapper documentation for domain-specific patterns.