Skip to content

Commit 1e4dfed

Browse files
authored
CrewAI enterprise + CrewAI crews (#122)
1 parent 0e64d95 commit 1e4dfed

File tree

7 files changed

+394
-31
lines changed

7 files changed

+394
-31
lines changed

typescript-sdk/integrations/crewai/python/agui_crewai/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
from .endpoint import add_crewai_fastapi_endpoint
1+
from .endpoint import add_crewai_flow_fastapi_endpoint
22
from .sdk import (
33
CopilotKitState,
44
copilotkit_predict_state,
55
copilotkit_emit_state,
66
copilotkit_stream
77
)
8+
from .enterprise import CrewEnterpriseEventListener
9+
10+
CREW_ENTERPRISE_EVENT_LISTENER = CrewEnterpriseEventListener()
11+
812
__all__ = [
9-
"add_crewai_fastapi_endpoint",
13+
"add_crewai_flow_fastapi_endpoint",
1014
"CopilotKitState",
1115
"copilotkit_predict_state",
1216
"copilotkit_emit_state",
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import uuid
2+
import copy
3+
import json
4+
from typing import Any, cast
5+
from crewai import Crew, Flow
6+
from crewai.flow import start
7+
from crewai.cli.crew_chat import (
8+
initialize_chat_llm as crew_chat_initialize_chat_llm,
9+
generate_crew_chat_inputs as crew_chat_generate_crew_chat_inputs,
10+
generate_crew_tool_schema as crew_chat_generate_crew_tool_schema,
11+
build_system_message as crew_chat_build_system_message,
12+
create_tool_function as crew_chat_create_tool_function
13+
)
14+
from litellm import completion
15+
from .sdk import (
16+
copilotkit_stream,
17+
copilotkit_exit,
18+
)
19+
20+
_CREW_INPUTS_CACHE = {}
21+
22+
23+
CREW_EXIT_TOOL = {
24+
"type": "function",
25+
"function": {
26+
"name": "crew_exit",
27+
"description": "Call this when the user has indicated that they are done with the crew",
28+
"parameters": {
29+
"type": "object",
30+
"properties": {},
31+
"required": [],
32+
},
33+
},
34+
}
35+
36+
37+
class ChatWithCrewFlow(Flow):
38+
"""Chat with crew"""
39+
40+
def __init__(
41+
self, *,
42+
crew: Crew
43+
):
44+
super().__init__()
45+
46+
47+
self.crew = copy.deepcopy(cast(Any, crew).crew())
48+
49+
if self.crew.chat_llm is None:
50+
raise ValueError("Crew chat LLM is not set")
51+
52+
self.crew_name = crew.name
53+
self.chat_llm = crew_chat_initialize_chat_llm(self.crew)
54+
55+
if crew.name not in _CREW_INPUTS_CACHE:
56+
self.crew_chat_inputs = crew_chat_generate_crew_chat_inputs(
57+
self.crew,
58+
self.crew_name,
59+
self.chat_llm
60+
)
61+
_CREW_INPUTS_CACHE[ crew.name] = self.crew_chat_inputs
62+
else:
63+
self.crew_chat_inputs = _CREW_INPUTS_CACHE[ crew.name]
64+
65+
self.crew_tool_schema = crew_chat_generate_crew_tool_schema(self.crew_chat_inputs)
66+
self.system_message = crew_chat_build_system_message(self.crew_chat_inputs)
67+
68+
super().__init__()
69+
70+
@start()
71+
async def chat(self):
72+
"""Chat with the crew"""
73+
74+
system_message = self.system_message
75+
if self.state.get("inputs"):
76+
system_message += "\n\nCurrent inputs: " + json.dumps(self.state["inputs"])
77+
78+
messages = [
79+
{
80+
"role": "system",
81+
"content": system_message,
82+
"id": str(uuid.uuid4()) + "-system"
83+
},
84+
*self.state["messages"]
85+
]
86+
87+
tools = [action for action in self.state["copilotkit"]["actions"]
88+
if action["function"]["name"] != self.crew_name]
89+
90+
tools += [self.crew_tool_schema, CREW_EXIT_TOOL]
91+
92+
response = await copilotkit_stream(
93+
completion(
94+
model=self.crew.chat_llm,
95+
messages=messages,
96+
tools=tools,
97+
parallel_tool_calls=False,
98+
stream=True
99+
)
100+
)
101+
102+
message = cast(Any, response).choices[0]["message"]
103+
self.state["messages"].append(message)
104+
105+
if message.get("tool_calls"):
106+
if message["tool_calls"][0]["function"]["name"] == self.crew_name:
107+
# run the crew
108+
crew_function = crew_chat_create_tool_function(self.crew, messages)
109+
args = json.loads(message["tool_calls"][0]["function"]["arguments"])
110+
result = crew_function(**args)
111+
112+
if isinstance(result, str):
113+
self.state["outputs"] = result
114+
elif hasattr(result, "json_dict"):
115+
self.state["outputs"] = result.json_dict
116+
elif hasattr(result, "raw"):
117+
self.state["outputs"] = result.raw
118+
else:
119+
raise ValueError("Unexpected result type", type(result))
120+
121+
self.state["messages"].append({
122+
"role": "tool",
123+
"content": result,
124+
"tool_call_id": message["tool_calls"][0]["id"]
125+
})
126+
elif message["tool_calls"][0]["function"]["name"] == CREW_EXIT_TOOL["function"]["name"]:
127+
await copilotkit_exit()
128+
self.state["messages"].append({
129+
"role": "tool",
130+
"content": "Crew exited",
131+
"tool_call_id": message["tool_calls"][0]["id"]
132+
})
133+
134+
response = await copilotkit_stream(
135+
completion( # pylint: disable=too-many-arguments
136+
model=self.crew.chat_llm,
137+
messages = [
138+
{
139+
"role": "system",
140+
"content": "Indicate to the user that the crew has exited",
141+
"id": str(uuid.uuid4()) + "-system"
142+
},
143+
*self.state["messages"]
144+
],
145+
tools=tools,
146+
parallel_tool_calls=False,
147+
stream=True,
148+
tool_choice="none"
149+
)
150+
)
151+
message = cast(Any, response).choices[0]["message"]
152+
self.state["messages"].append(message)

typescript-sdk/integrations/crewai/python/agui_crewai/dojo.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uvicorn
33
from fastapi import FastAPI
44

5-
from .endpoint import add_crewai_fastapi_endpoint
5+
from .endpoint import add_crewai_flow_fastapi_endpoint
66
from .examples.agentic_chat import AgenticChatFlow
77
from .examples.human_in_the_loop import HumanInTheLoopFlow
88
from .examples.tool_based_generative_ui import ToolBasedGenerativeUIFlow
@@ -12,39 +12,39 @@
1212

1313
app = FastAPI(title="CrewAI Dojo Example Server")
1414

15-
add_crewai_fastapi_endpoint(
15+
add_crewai_flow_fastapi_endpoint(
1616
app=app,
17-
flow_class=AgenticChatFlow,
17+
flow=AgenticChatFlow(),
1818
path="/agentic_chat",
1919
)
2020

21-
add_crewai_fastapi_endpoint(
21+
add_crewai_flow_fastapi_endpoint(
2222
app=app,
23-
flow_class=HumanInTheLoopFlow,
23+
flow=HumanInTheLoopFlow(),
2424
path="/human_in_the_loop",
2525
)
2626

27-
add_crewai_fastapi_endpoint(
27+
add_crewai_flow_fastapi_endpoint(
2828
app=app,
29-
flow_class=ToolBasedGenerativeUIFlow,
29+
flow=ToolBasedGenerativeUIFlow(),
3030
path="/tool_based_generative_ui",
3131
)
3232

33-
add_crewai_fastapi_endpoint(
33+
add_crewai_flow_fastapi_endpoint(
3434
app=app,
35-
flow_class=AgenticGenerativeUIFlow,
35+
flow=AgenticGenerativeUIFlow(),
3636
path="/agentic_generative_ui",
3737
)
3838

39-
add_crewai_fastapi_endpoint(
39+
add_crewai_flow_fastapi_endpoint(
4040
app=app,
41-
flow_class=SharedStateFlow,
41+
flow=SharedStateFlow(),
4242
path="/shared_state",
4343
)
4444

45-
add_crewai_fastapi_endpoint(
45+
add_crewai_flow_fastapi_endpoint(
4646
app=app,
47-
flow_class=PredictiveStateUpdatesFlow,
47+
flow=PredictiveStateUpdatesFlow(),
4848
path="/predictive_state_updates",
4949
)
5050

typescript-sdk/integrations/crewai/python/agui_crewai/endpoint.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
AG-UI FastAPI server for CrewAI.
33
"""
4-
4+
import copy
55
import asyncio
66
from typing import List
77
from fastapi import FastAPI, Request
@@ -15,6 +15,7 @@
1515
MethodExecutionFinishedEvent,
1616
)
1717
from crewai.flow.flow import Flow
18+
from crewai import Crew
1819

1920
from ag_ui.core import (
2021
RunAgentInput,
@@ -44,14 +45,19 @@
4445
)
4546
from .context import flow_context
4647
from .sdk import litellm_messages_to_ag_ui_messages
48+
from .crews import ChatWithCrewFlow
4749

4850

49-
def add_crewai_fastapi_endpoint(app: FastAPI, flow_class: type[Flow], path: str = "/"):
51+
def add_crewai_flow_fastapi_endpoint(app: FastAPI, flow: Flow, path: str = "/"):
5052
"""Adds a CrewAI endpoint to the FastAPI app."""
5153

54+
5255
@app.post(path)
5356
async def agentic_chat_endpoint(input_data: RunAgentInput, request: Request):
5457
"""Agentic chat endpoint"""
58+
59+
flow_copy = copy.deepcopy(flow)
60+
5561
# Get the accept header from the request
5662
accept_header = request.headers.get("accept")
5763

@@ -66,14 +72,13 @@ async def agentic_chat_endpoint(input_data: RunAgentInput, request: Request):
6672

6773
async def event_generator():
6874
queue = asyncio.Queue()
69-
flow = flow_class()
70-
token = flow_context.set(flow)
75+
token = flow_context.set(flow_copy)
7176
try:
7277
with crewai_event_bus.scoped_handlers():
7378

7479
@crewai_event_bus.on(FlowStartedEvent)
7580
def _(source, event): # pylint: disable=unused-argument
76-
if source == flow:
81+
if source == flow_copy:
7782
queue.put_nowait(
7883
RunStartedEvent(
7984
type=EventType.RUN_STARTED,
@@ -84,7 +89,7 @@ def _(source, event): # pylint: disable=unused-argument
8489

8590
@crewai_event_bus.on(FlowFinishedEvent)
8691
def _(source, event): # pylint: disable=unused-argument
87-
if source == flow:
92+
if source == flow_copy:
8893
queue.put_nowait(
8994
RunFinishedEvent(
9095
type=EventType.RUN_FINISHED,
@@ -96,7 +101,7 @@ def _(source, event): # pylint: disable=unused-argument
96101

97102
@crewai_event_bus.on(MethodExecutionStartedEvent)
98103
def _(source, event):
99-
if source == flow:
104+
if source == flow_copy:
100105
queue.put_nowait(
101106
StepStartedEvent(
102107
type=EventType.STEP_STARTED,
@@ -106,7 +111,7 @@ def _(source, event):
106111

107112
@crewai_event_bus.on(MethodExecutionFinishedEvent)
108113
def _(source, event):
109-
if source == flow:
114+
if source == flow_copy:
110115
messages = litellm_messages_to_ag_ui_messages(source.state.messages)
111116

112117
queue.put_nowait(
@@ -130,7 +135,7 @@ def _(source, event):
130135

131136
@crewai_event_bus.on(BridgedTextMessageChunkEvent)
132137
def _(source, event):
133-
if source == flow:
138+
if source == flow_copy:
134139
queue.put_nowait(
135140
TextMessageChunkEvent(
136141
type=EventType.TEXT_MESSAGE_CHUNK,
@@ -142,7 +147,7 @@ def _(source, event):
142147

143148
@crewai_event_bus.on(BridgedToolCallChunkEvent)
144149
def _(source, event):
145-
if source == flow:
150+
if source == flow_copy:
146151
queue.put_nowait(
147152
ToolCallChunkEvent(
148153
type=EventType.TOOL_CALL_CHUNK,
@@ -154,7 +159,7 @@ def _(source, event):
154159

155160
@crewai_event_bus.on(BridgedCustomEvent)
156161
def _(source, event):
157-
if source == flow:
162+
if source == flow_copy:
158163
queue.put_nowait(
159164
CustomEvent(
160165
type=EventType.CUSTOM,
@@ -165,15 +170,15 @@ def _(source, event):
165170

166171
@crewai_event_bus.on(BridgedStateSnapshotEvent)
167172
def _(source, event):
168-
if source == flow:
173+
if source == flow_copy:
169174
queue.put_nowait(
170175
StateSnapshotEvent(
171176
type=EventType.STATE_SNAPSHOT,
172177
snapshot=event.snapshot
173178
)
174179
)
175180

176-
asyncio.create_task(flow.kickoff_async(inputs=inputs))
181+
asyncio.create_task(flow_copy.kickoff_async(inputs=inputs))
177182

178183
while True:
179184
item = await queue.get()
@@ -195,6 +200,9 @@ def _(source, event):
195200

196201
return StreamingResponse(event_generator(), media_type=encoder.get_content_type())
197202

203+
def add_crewai_crew_fastapi_endpoint(app: FastAPI, crew: Crew, path: str = "/"):
204+
"""Adds a CrewAI crew endpoint to the FastAPI app."""
205+
add_crewai_flow_fastapi_endpoint(app, ChatWithCrewFlow(crew=crew), path)
198206

199207

200208
def crewai_prepare_inputs( # pylint: disable=unused-argument, too-many-arguments
@@ -226,4 +234,3 @@ def crewai_prepare_inputs( # pylint: disable=unused-argument, too-many-argument
226234
}
227235

228236
return new_state
229-

0 commit comments

Comments
 (0)