|
| 1 | +# Junjo Library Overview for LLM Agents |
| 2 | + |
| 3 | +This document provides a concise overview of the Junjo library, designed to give LLM agents the necessary context to understand, modify, and extend the codebase. |
| 4 | + |
| 5 | +## High-Level Summary |
| 6 | + |
| 7 | +Junjo is a Python library for building and managing complex, graph-based AI workflows. It is designed to be consumed by Python application developers. The library provides a framework for defining workflows as a series of nodes and edges, managing state in a predictable and concurrency-safe manner, and visualizing the workflow's structure and execution. |
| 8 | + |
| 9 | +**Key Use Cases:** |
| 10 | + |
| 11 | +* Building complex LLM-powered agents and applications. |
| 12 | +* Orchestrating data processing pipelines. |
| 13 | +* Creating workflows with conditional branching and parallel execution. |
| 14 | + |
| 15 | +## Core Concepts |
| 16 | + |
| 17 | +The following are the fundamental building blocks of the Junjo library: |
| 18 | + |
| 19 | +### 1. `State` and `Store` |
| 20 | + |
| 21 | +* **`BaseState` (`src/junjo/state.py`):** A Pydantic `BaseModel` that defines the data structure for a workflow's state. All workflow states must inherit from this class. |
| 22 | +* **`BaseStore` (`src/junjo/store.py`):** A class that manages the state of a workflow. It holds the `BaseState` and provides methods (actions) to update the state in a controlled and predictable manner. It uses an `asyncio.Lock` to ensure that state updates are atomic and concurrency-safe. |
| 23 | + |
| 24 | +**Example:** |
| 25 | + |
| 26 | +```python |
| 27 | +from junjo import BaseState, BaseStore |
| 28 | + |
| 29 | +class MyWorkflowState(BaseState): |
| 30 | + user_input: str |
| 31 | + processed_data: dict | None = None |
| 32 | + |
| 33 | +class MyWorkflowStore(BaseStore[MyWorkflowState]): |
| 34 | + async def set_processed_data(self, data: dict) -> None: |
| 35 | + await self.set_state({"processed_data": data}) |
| 36 | +``` |
| 37 | + |
| 38 | +### 2. `Node` |
| 39 | + |
| 40 | +* **`Node` (`src/junjo/node.py`):** An abstract base class that represents a single unit of work in a workflow. The business logic is implemented in the `service` method. Nodes interact with the `Store` to get and set state. |
| 41 | + |
| 42 | +**Example:** |
| 43 | + |
| 44 | +```python |
| 45 | +from junjo import Node |
| 46 | + |
| 47 | +class ProcessDataNode(Node[MyWorkflowStore]): |
| 48 | + async def service(self, store: MyWorkflowStore) -> None: |
| 49 | + state = await store.get_state() |
| 50 | + processed_data = {"result": "some_value"} |
| 51 | + await store.set_processed_data(processed_data) |
| 52 | +``` |
| 53 | + |
| 54 | +### 3. `Edge` and `Condition` |
| 55 | + |
| 56 | +* **`Edge` (`src/junjo/edge.py`):** Defines a directed connection between two nodes (`tail` and `head`) in a workflow graph. |
| 57 | +* **`Condition` (`src/junjo/condition.py`):** An abstract base class for implementing conditional logic on an `Edge`. The `evaluate` method determines if a transition between nodes should occur based on the current state. |
| 58 | + |
| 59 | +**Example:** |
| 60 | + |
| 61 | +```python |
| 62 | +from junjo import Edge, Condition |
| 63 | + |
| 64 | +class DataIsProcessed(Condition[MyWorkflowState]): |
| 65 | + def evaluate(self, state: MyWorkflowState) -> bool: |
| 66 | + return state.processed_data is not None |
| 67 | + |
| 68 | +edge = Edge(tail=node1, head=node2, condition=DataIsProcessed()) |
| 69 | +``` |
| 70 | + |
| 71 | +### 4. `Graph` |
| 72 | + |
| 73 | +* **`Graph` (`src/junjo/graph.py`):** A collection of nodes and edges that defines the complete structure of a workflow. It has a single entry point (`source`) and a single exit point (`sink`). |
| 74 | + |
| 75 | +**Example:** |
| 76 | + |
| 77 | +```python |
| 78 | +from junjo import Graph |
| 79 | + |
| 80 | +workflow_graph = Graph( |
| 81 | + source=start_node, |
| 82 | + sink=end_node, |
| 83 | + edges=[ |
| 84 | + Edge(tail=start_node, head=process_node), |
| 85 | + Edge(tail=process_node, head=end_node, condition=DataIsProcessed()) |
| 86 | + ] |
| 87 | +) |
| 88 | +``` |
| 89 | + |
| 90 | +### 5. `Workflow` and `Subflow` |
| 91 | + |
| 92 | +* **`Workflow` (`src/junjo/workflow.py`):** The main executable component that takes a `graph_factory` and a `store_factory` and runs the defined process. |
| 93 | +* **`Subflow` (`src/junjo/workflow.py`):** A workflow that can be nested within another workflow, allowing for modular and reusable logic. It has its own isolated state and can interact with its parent's state via `pre_run_actions` and `post_run_actions`. |
| 94 | + |
| 95 | +### 6. `RunConcurrent` |
| 96 | + |
| 97 | +* **`RunConcurrent` (`src/junjo/run_concurrent.py`):** A special type of `Node` that allows for the parallel execution of multiple nodes or subflows using `asyncio.gather`. |
| 98 | + |
| 99 | +## Key Design Patterns |
| 100 | + |
| 101 | +* **Asyncio-native:** The library is built on top of Python's `asyncio`, enabling non-blocking I/O operations and efficient concurrency. |
| 102 | +* **Immutable State Management:** Inspired by Redux, state is treated as immutable. State changes are made by calling methods on the `Store`, which creates a new state object with the updates. This ensures predictable state transitions and concurrency safety. |
| 103 | +* **Dependency Injection via Factories:** `Workflow` and `Subflow` are initialized with `graph_factory` and `store_factory` callables. This ensures that each workflow execution gets a fresh, isolated graph and store, which is critical for concurrency safety. |
| 104 | + |
| 105 | +## How to... |
| 106 | + |
| 107 | +### Create a new Node |
| 108 | + |
| 109 | +1. Create a new class that inherits from `junjo.Node`. |
| 110 | +2. Specify the `Store` type as a generic parameter. |
| 111 | +3. Implement the `async def service(self, store: StoreT) -> None:` method with your business logic. |
| 112 | + |
| 113 | +### Add a Condition to an Edge |
| 114 | + |
| 115 | +1. Create a new class that inherits from `junjo.Condition`. |
| 116 | +2. Specify the `State` type as a generic parameter. |
| 117 | +3. Implement the `def evaluate(self, state: StateT) -> bool:` method. |
| 118 | +4. Instantiate the condition and pass it to the `condition` parameter of the `Edge`. |
| 119 | + |
| 120 | +### Create a Subflow |
| 121 | + |
| 122 | +1. Create a new class that inherits from `junjo.Subflow`. |
| 123 | +2. Specify the `SubflowState`, `SubflowStore`, `ParentState`, and `ParentStore` types as generic parameters. |
| 124 | +3. Implement the `pre_run_actions` and `post_run_actions` methods to interact with the parent store. |
| 125 | +4. Instantiate the subflow with its own `graph_factory` and `store_factory`. |
| 126 | + |
| 127 | +## Important Files |
| 128 | + |
| 129 | +* `src/junjo/workflow.py`: Contains the core logic for workflow and subflow execution. |
| 130 | +* `src/junjo/graph.py`: Defines the structure of a workflow. |
| 131 | +* `src/junjo/node.py`: Defines the base class for all nodes. |
| 132 | +* `src/junjo/store.py`: Contains the base class for state management. |
| 133 | +* `src/junjo/edge.py`: Defines the connection between nodes. |
| 134 | +* `src/junjo/condition.py`: Defines the base class for conditional logic. |
0 commit comments