Skip to content

Commit f6a1412

Browse files
authored
Merge pull request #1 from mdrideout/enforce-graph-builder-pattern
Enforce graph builder pattern
2 parents 9e3a9be + 8433391 commit f6a1412

File tree

29 files changed

+837
-405
lines changed

29 files changed

+837
-405
lines changed

.github/workflows/publish.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
name: Publish to PyPI
2+
3+
on:
4+
release:
5+
types: [created]
6+
7+
jobs:
8+
publish:
9+
runs-on: ubuntu-latest
10+
environment:
11+
name: pypi
12+
url: https://pypi.org/project/junjo/
13+
permissions:
14+
id-token: write
15+
contents: read
16+
steps:
17+
- name: Checkout
18+
uses: actions/checkout@v4
19+
20+
- name: Set up Python
21+
uses: actions/setup-python@v5
22+
with:
23+
python-version: '3.x'
24+
25+
- name: Build package
26+
run: |
27+
pip install build
28+
python -m build
29+
30+
- name: Publish package distributions to PyPI
31+
uses: pypa/gh-action-pypi-publish@release/v1

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ $ brew install graphviz
101101

102102
```python
103103
# visualize.py
104-
from base.sample_workflow.workflow import sample_workflow_graph
104+
from base.sample_workflow.graph import create_sample_workflow_graph
105105

106106
def main():
107107
# Every graph can execute .export_graphviz_assets() to generate all graphs and subflow graphs in a workflow
108108
# Creates .svg renderings, .dot notation files, and an HTML template to render the graphs
109-
sample_workflow_graph.export_graphviz_assets()
109+
create_sample_workflow_graph().export_graphviz_assets()
110110

111111
if __name__ == "__main__":
112112
main()
@@ -141,6 +141,13 @@ This project utilizes [ruff](https://astral.sh/ruff) for linting and auto format
141141
$ sphinx-build -b html docs docs/_build
142142
```
143143

144+
### Tests
145+
146+
```bash
147+
# Run the tests with uv
148+
$ uv run pytest
149+
```
150+
144151
## Code Generation
145152

146153
### Protobuf schema generation

docs/.DS_Store

0 Bytes
Binary file not shown.

docs/core_concepts.rst

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
.. _core_concepts:
2+
3+
##############################################################
4+
Core Concepts
5+
##############################################################
6+
7+
.. meta::
8+
:description: Understand the core concepts of Junjo, including State, Store, Node, Edge, Condition, Graph, and Workflow. Learn how these components work together to build powerful and scalable Python workflows.
9+
:keywords: junjo, python, workflow, state management, node, edge, graph, core concepts
10+
11+
This page breaks down the fundamental building blocks of the Junjo library. Understanding these concepts is key to effectively designing, building, and debugging your workflows.
12+
13+
State
14+
=====
15+
16+
**What is it?**
17+
A `BaseState` is a Pydantic model that defines the data structure for your workflow's state. It acts as a centralized, type-safe container for all the data that your workflow will operate on.
18+
19+
**Key Characteristics:**
20+
- **Pydantic-Based:** Leverages Pydantic for data validation and type hinting.
21+
- **Immutable in Practice:** While the state object itself can be replaced, it is treated as immutable within the workflow. Nodes do not modify the state directly; they request changes through the store.
22+
23+
.. code-block:: python
24+
25+
from junjo import BaseState
26+
27+
class MyWorkflowState(BaseState):
28+
user_input: str
29+
processed_data: dict | None = None
30+
is_complete: bool = False
31+
32+
Store
33+
=====
34+
35+
**What is it?**
36+
A `BaseStore` is a class that manages the state of a workflow. It holds the `BaseState` and provides methods (often called "actions") to update the state in a controlled and predictable manner.
37+
38+
**Key Characteristics:**
39+
- **State Management:** The single source of truth for the workflow's state.
40+
- **Redux-Inspired:** Follows a pattern where state is updated by dispatching actions, ensuring that state changes are explicit and traceable.
41+
- **Concurrency Safe:** Uses an `asyncio.Lock` to ensure that state updates are atomic, preventing race conditions.
42+
43+
.. code-block:: python
44+
45+
from junjo import BaseStore
46+
47+
class MyWorkflowStore(BaseStore[MyWorkflowState]):
48+
async def set_processed_data(self, data: dict) -> None:
49+
await self.set_state({"processed_data": data})
50+
51+
async def mark_as_complete(self) -> None:
52+
await self.set_state({"is_complete": True})
53+
54+
Node
55+
====
56+
57+
**What is it?**
58+
A `Node` represents a single unit of work in your workflow. It's where your business logic, API calls, or any other operations are executed.
59+
60+
**Key Characteristics:**
61+
- **Atomic Unit of Work:** Each node should have a single, well-defined responsibility.
62+
- **Interacts with the Store:** Nodes receive the workflow's store as an argument to their `service` method, allowing them to read the current state and dispatch actions to update it.
63+
- **Asynchronous:** The `service` method is an `async` function, allowing for non-blocking I/O operations.
64+
65+
.. code-block:: python
66+
67+
from junjo import Node
68+
69+
class ProcessDataNode(Node[MyWorkflowStore]):
70+
async def service(self, store: MyWorkflowStore) -> None:
71+
state = await store.get_state()
72+
# Perform some processing on state.user_input
73+
processed_data = {"result": "some_value"}
74+
await store.set_processed_data(processed_data)
75+
76+
Edge
77+
====
78+
79+
**What is it?**
80+
An `Edge` defines a directed connection between two nodes in a workflow graph. It represents a potential path of execution.
81+
82+
**Key Characteristics:**
83+
- **Defines Flow:** Edges connect a `tail` node to a `head` node, establishing the sequence of operations.
84+
- **Can be Conditional:** An edge can have an associated `Condition` that determines whether the transition from the tail to the head should occur.
85+
86+
.. code-block:: python
87+
88+
from junjo import Edge
89+
90+
edge = Edge(tail=node1, head=node2)
91+
92+
Condition
93+
=========
94+
95+
**What is it?**
96+
A `Condition` is a class that contains logic to determine whether an `Edge` should be traversed.
97+
98+
**Key Characteristics:**
99+
- **Pure Function of State:** A condition's `evaluate` method should only depend on the current state of the workflow. It should not have any side effects.
100+
- **Enables Branching:** Conditions are the primary mechanism for creating branching logic in your workflows.
101+
102+
.. code-block:: python
103+
104+
from junjo import Condition
105+
106+
class DataIsProcessed(Condition[MyWorkflowState]):
107+
def evaluate(self, state: MyWorkflowState) -> bool:
108+
return state.processed_data is not None
109+
110+
edge = Edge(tail=node1, head=node2, condition=DataIsProcessed())
111+
112+
Graph
113+
=====
114+
115+
**What is it?**
116+
A `Graph` is a collection of nodes and edges that defines the complete structure of your workflow.
117+
118+
**Key Characteristics:**
119+
- **Source and Sink:** A graph has a single entry point (`source`) and a single exit point (`sink`).
120+
- **Defines the Workflow Structure:** The graph is a complete representation of all possible paths of execution in your workflow.
121+
122+
.. code-block:: python
123+
124+
from junjo import Graph
125+
126+
workflow_graph = Graph(
127+
source=start_node,
128+
sink=end_node,
129+
edges=[
130+
Edge(tail=start_node, head=process_node),
131+
Edge(tail=process_node, head=end_node, condition=DataIsProcessed())
132+
]
133+
)
134+
135+
Workflow
136+
========
137+
138+
**What is it?**
139+
A `Workflow` is the main executable component that takes a `graph_factory` and a `store_factory` and runs the defined process.
140+
141+
**Key Characteristics:**
142+
- **Executable:** The `Workflow` class has an `execute` method that starts the workflow.
143+
- **Manages Execution:** It traverses the graph, executing nodes and evaluating conditions, until the `sink` node is reached.
144+
- **Isolated Execution:** Each call to `execute` uses the provided factories to create a fresh `Graph` and `Store`, ensuring that each execution is isolated and concurrency-safe.
145+
146+
.. code-block:: python
147+
148+
from junjo import Workflow
149+
150+
def create_graph() -> Graph:
151+
# ... (graph creation logic)
152+
return workflow_graph
153+
154+
sample_workflow = Workflow[MyWorkflowState, MyWorkflowStore](
155+
name="My First Workflow",
156+
graph_factory=create_graph,
157+
store_factory=lambda: MyWorkflowStore(
158+
initial_state=MyWorkflowState(user_input="hello")
159+
)
160+
)
161+
162+
await sample_workflow.execute()

docs/getting_started.rst

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -87,34 +87,40 @@ More advanced examples can be found in the `examples directory <https://github.c
8787
return False
8888
return count % 2 == 0
8989
90-
# Instantiate the nodes
91-
first_node = FirstNode()
92-
count_items_node = CountItemsNode()
93-
even_items_node = EvenItemsNode()
94-
odd_items_node = OddItemsNode()
95-
final_node = FinalNode()
96-
97-
# Create the workflow graph
98-
workflow_graph = Graph(
99-
source=first_node,
100-
sink=final_node,
101-
edges=[
102-
Edge(tail=first_node, head=count_items_node),
103-
104-
# Branching based on the count of items
105-
Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()), # Only transitions if count is even
106-
Edge(tail=count_items_node, head=odd_items_node), # Fallback if first condition is not met
107-
108-
# Branched paths converge to the final node
109-
Edge(tail=even_items_node, head=final_node),
110-
Edge(tail=odd_items_node, head=final_node),
111-
]
112-
)
90+
def create_graph() -> Graph:
91+
"""
92+
Factory function to create a new instance of the sample workflow graph.
93+
This ensures that each workflow execution gets a fresh, isolated graph,
94+
preventing state conflicts in concurrent environments.
95+
"""
96+
# Instantiate the nodes
97+
first_node = FirstNode()
98+
count_items_node = CountItemsNode()
99+
even_items_node = EvenItemsNode()
100+
odd_items_node = OddItemsNode()
101+
final_node = FinalNode()
102+
103+
# Create the workflow graph
104+
return Graph(
105+
source=first_node,
106+
sink=final_node,
107+
edges=[
108+
Edge(tail=first_node, head=count_items_node),
109+
110+
# Branching based on the count of items
111+
Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()), # Only transitions if count is even
112+
Edge(tail=count_items_node, head=odd_items_node), # Fallback if first condition is not met
113+
114+
# Branched paths converge to the final node
115+
Edge(tail=even_items_node, head=final_node),
116+
Edge(tail=odd_items_node, head=final_node),
117+
]
118+
)
113119
114120
# Create the workflow
115121
sample_workflow = Workflow[SampleWorkflowState, SampleWorkflowStore](
116122
name="Getting Started Example Workflow",
117-
graph=workflow_graph,
123+
graph_factory=create_graph,
118124
store_factory=lambda: SampleWorkflowStore(
119125
initial_state=SampleWorkflowState(
120126
items=["laser", "coffee", "horse"]

docs/index.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
:hidden:
55

66
getting_started
7+
tutorial
8+
core_concepts
9+
state_management
710
concurrency
811
subflows
912
visualizing_workflows

docs/state_management.rst

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
.. _state_management:
2+
3+
##############################################################
4+
State Management
5+
##############################################################
6+
7+
.. meta::
8+
:description: A deep dive into Junjo's Redux-inspired, immutable state management. Learn how to use BaseState and BaseStore to build predictable and concurrency-safe Python workflows.
9+
:keywords: junjo, python, state management, redux, immutable state, pydantic, workflow, BaseStore, BaseState
10+
11+
Junjo's state management is designed to be predictable, traceable, and safe for concurrent operations. It is heavily inspired by the principles of Redux, a popular state management library in the JavaScript ecosystem. This page provides a deep dive into how to effectively manage state in your Junjo workflows.
12+
13+
The Core Principles
14+
===================
15+
16+
1. **Single Source of Truth:** The state of your entire workflow is stored in a single object tree within a single **Store**.
17+
2. **State is Read-Only:** The only way to change the state is to emit an "action," an object describing what happened. This prevents nodes from directly modifying the state, which could lead to unpredictable behavior.
18+
3. **Changes are Made with Store Methods:** State modifications are encapsulated within methods in your **Store**. Similar to "reducers" in Redux, these methods are the only place where `set_state` should be called, ensuring that all state changes are predictable and centralized.
19+
20+
BaseState: Defining Your State's Shape
21+
=======================================
22+
23+
The `BaseState` class, which is a Pydantic `BaseModel`, is used to define the structure of your workflow's state. Because it's a Pydantic model, you get all the benefits of type hinting and data validation out of the box.
24+
25+
.. code-block:: python
26+
27+
from junjo import BaseState
28+
29+
class ChatWorkflowState(BaseState):
30+
messages: list[dict] = []
31+
current_user: str
32+
is_typing: bool = False
33+
error_message: str | None = None
34+
35+
In this example, we've defined a state for a chat application. Any workflow that uses this state will have access to these fields, and Pydantic will ensure that the data conforms to the specified types.
36+
37+
BaseStore: Managing Your State
38+
==============================
39+
40+
The `BaseStore` is the heart of Junjo's state management. It holds the state and provides methods for updating it. You will create a custom store for each workflow that inherits from `BaseStore` and is typed with your custom `BaseState`.
41+
42+
.. code-block:: python
43+
44+
from junjo import BaseStore
45+
46+
class ChatWorkflowStore(BaseStore[ChatWorkflowState]):
47+
async def add_message(self, message: dict) -> None:
48+
# Get the current messages and append the new one
49+
new_messages = self._state.messages + [message]
50+
await self.set_state({"messages": new_messages})
51+
52+
async def set_is_typing(self, is_typing: bool) -> None:
53+
await self.set_state({"is_typing": is_typing})
54+
55+
async def set_error(self, error: str) -> None:
56+
await self.set_state({"error_message": error})
57+
58+
### The `set_state` Method
59+
60+
The `set_state` method is the **only** way to update the state in the store. It takes a dictionary of the fields you want to update and their new values.
61+
62+
**Key Behaviors of `set_state`:**
63+
- **Immutable Updates:** `set_state` creates a *copy* of the state with the updates applied. It does not mutate the original state object. This is crucial for preventing side effects and ensuring predictable state transitions.
64+
- **Concurrency-Safe:** All calls to `set_state` are protected by an `asyncio.Lock`, so you can safely call actions from multiple concurrent nodes without worrying about race conditions.
65+
- **Validation:** Before applying the update, `set_state` validates the new state against your Pydantic model. If the update is invalid, it will raise a `ValueError`.
66+
67+
Using the Store in a Node
68+
=========================
69+
70+
Nodes receive an instance of the store in their `service` method. This allows them to read the current state and dispatch actions to update it.
71+
72+
.. code-block:: python
73+
74+
from junjo import Node
75+
76+
class SendMessageNode(Node[ChatWorkflowStore]):
77+
async def service(self, store: ChatWorkflowStore) -> None:
78+
state = await store.get_state()
79+
user = state.current_user
80+
81+
# In a real app, you would get the message from an external source
82+
new_message = {"user": user, "text": "Hello, Junjo!"}
83+
84+
# Dispatch an action to add the message to the state
85+
await store.add_message(new_message)
86+
87+
By following this pattern, you create a clear and predictable data flow in your application. Nodes don't need to know how the state is updated; they just need to know which actions to call on the store. This separation of concerns makes your code easier to test, debug, and reason about.

0 commit comments

Comments
 (0)