Skip to content

Commit 85221a3

Browse files
Merge pull request #79 from codefuse-ai/ekg2_wyp_dev
feat: add two func for process biz datas in httpapis
2 parents a9767c5 + 7942268 commit 85221a3

File tree

6 files changed

+452
-12
lines changed

6 files changed

+452
-12
lines changed

examples/ekg_examples/who_is_spy_game.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from muagent.schemas.ekg.ekg_graph import TYPE2SCHEMA
77
from muagent.schemas.common import GNode, GEdge
8+
from muagent.service.utils import decode_biznodes, encode_biznodes
89

910

1011
import math
@@ -234,7 +235,8 @@ def autofill_nodes(nodes: List[GNode]):
234235

235236

236237
def add_nodes(ekg_service, nodes: list[GNode]):
237-
newnodes = autofill_nodes(nodes)
238+
# newnodes = autofill_nodes(nodes)
239+
newnodes, newedges = decode_biznodes(nodes)
238240
logger.info('尝试查插入节点')
239241
for one_node in newnodes:
240242
one_node.attributes['description'] = one_node.attributes['description']
@@ -250,8 +252,9 @@ def add_nodes(ekg_service, nodes: list[GNode]):
250252
one_node.attributes['enable'] = True
251253

252254
ekg_service.add_nodes([one_node], teamid=teamid)
253-
# ekg_service.gb.add_node(one_node)
254255

256+
# add task-tool edge or task-agent edge
257+
add_edges(ekg_service, newedges)
255258

256259
def add_edges(ekg_service, edges):
257260

muagent/httpapis/ekg_construct/api.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from muagent.service.ekg_construct.ekg_construct_base import EKGConstructService
1212
from muagent.schemas.apis.ekg_api_schema import *
1313
from muagent.schemas.ekg import *
14+
from muagent.service.utils import decode_biznodes, encode_biznodes
1415
from muagent.service.ekg_reasoning.src.graph_search.graph_search_main import main
1516

1617

@@ -221,15 +222,20 @@ async def update_graph(request: EKGFeaturesRequest):
221222
origin_edges = request.features.query.get('originEdges', [])
222223
edges = request.features.query.get('edges', [])
223224

224-
# 构建 nodeid 到 type 的字典,方便后续查找
225-
nodeid2type_dict = {n["id"]: n["type"] for n in origin_nodes + nodes}
226-
227225
# 将 origin_nodes 和 nodes 转换为 GNode 对象
228226
origin_nodes = [GNode(**n) for n in origin_nodes]
229-
origin_nodes = autofill_nodes(origin_nodes)
227+
# origin_nodes = autofill_nodes(origin_nodes)
228+
origin_nodes, origin_new_edges = decode_biznodes(origin_nodes)
230229
nodes = [GNode(**n) for n in nodes]
231-
nodes = autofill_nodes(nodes)
230+
# nodes = autofill_nodes(nodes)
231+
nodes, new_edges = decode_biznodes(nodes)
232+
233+
origin_edges.extend([e.dict() for e in origin_new_edges])
234+
edges.extend([e.dict() for e in new_edges])
232235

236+
# 构建 nodeid 到 type 的字典,方便后续查找
237+
nodeid2type_dict = {n.id: n.type for n in origin_nodes + nodes}
238+
233239
# 处理 origin_edges,给每个 edge 设置 type 字段
234240
origin_edges = [
235241
GEdge(
@@ -293,7 +299,7 @@ def get_node(request: EKGFeaturesRequest):
293299
node = ekg_construct_service.get_node_by_id(
294300
query.nodeid, query.nodeType
295301
)
296-
# node = node.dict()
302+
# might lost agents and tools
297303
except Exception as e:
298304
errorMessage = str(e)
299305
successCode = False
@@ -325,10 +331,11 @@ def get_graph(request: EKGFeaturesRequest):
325331
)
326332
nodes = graph.nodes
327333
edges = graph.edges
334+
nodes, edges = encode_biznodes(nodes, edges)
328335
except Exception as e:
329336
errorMessage = str(e)
330337
successCode = False
331-
nodes, edges = {}, {}
338+
nodes, edges = [], []
332339

333340
result = EKGGraphResponse(
334341
successCode=successCode, errorMessage=errorMessage,
@@ -377,14 +384,13 @@ def get_ancestor(request: EKGFeaturesRequest):
377384
nodeid=query.nodeid, node_type=query.nodeType,
378385
rootid=query.rootid
379386
)
380-
# nodes = graph.nodes.dict()
381-
# edges = graph.edges.dict()
382387
nodes = graph.nodes
383388
edges = graph.edges
389+
nodes, edges = encode_biznodes(nodes, edges)
384390
except Exception as e:
385391
errorMessage = str(e)
386392
successCode = False
387-
nodes, edges = {}, {}
393+
nodes, edges = [], []
388394

389395

390396
result = EKGGraphResponse(

muagent/schemas/ekg/ekg_graph.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,12 @@ class EKGToolSchema(NodeSchema):
185185
class EKGAgentSchema(NodeSchema):
186186
modelname: str = Field("", description='llm model name')
187187

188+
189+
class EKGTaskBizNodeSchema(EKGTaskNodeSchema):
190+
tools: List['EKGToolSchema'] = []
191+
agents: List['EKGAgentSchema'] = []
192+
193+
188194
# SLS / Tbase
189195
class EKGGraphSlsSchema(BaseModel):
190196
# node_{NodeTypesEnum}
@@ -254,6 +260,8 @@ class EKGSlsData(BaseModel):
254260
NodeTypesEnum.EDGE.value: EKGEdgeSchema
255261
}
256262

263+
TYPE2SCHEMA_BIZ = copy.deepcopy(TYPE2SCHEMA)
264+
TYPE2SCHEMA_BIZ[NodeTypesEnum.TASK.value] = EKGTaskBizNodeSchema
257265

258266
#####################
259267
##### yuque dsl #####

muagent/service/ekg_construct/ekg_construct_base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,11 @@ def search_nodes_by_text(
922922
node.id, node.type, f"ekg_team_{teamid}"
923923
).paths) > 0
924924
]
925+
#
926+
nodes = [
927+
node for node in nodes
928+
if node.type not in ["opsgptkg_tool", "opsgptkg_agent"]
929+
]
925930
return nodes
926931

927932
def search_rootpath_by_nodeid(

muagent/service/utils.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import json
2+
import hashlib
3+
from loguru import logger
4+
from typing import List, Tuple, Dict
5+
6+
from muagent.schemas.common import GNode, GEdge
7+
from muagent.schemas.ekg import NodeSchema, TYPE2SCHEMA_BIZ
8+
9+
10+
11+
def decode_biznodes(
12+
nodes: List[GNode]
13+
) -> Tuple[List[GNode], List[GEdge]]:
14+
'''
15+
compatible biz data :
16+
- node's attributes "extra"
17+
- task node with tools and agents
18+
'''
19+
new_nodes = []
20+
new_edges = []
21+
for node in nodes:
22+
schema = TYPE2SCHEMA_BIZ.get(node.type,)
23+
extra = node.attributes.pop("extra", {})
24+
if extra and isinstance(extra, str):
25+
extra = json.loads(extra)
26+
elif extra and isinstance(extra, dict):
27+
pass
28+
else:
29+
extra = {}
30+
31+
node.attributes.update(extra)
32+
node_data = schema(
33+
**{**{"id": node.id, "type": node.type}, **node.attributes}
34+
)
35+
36+
if node.type == "opsgptkg_task":
37+
logger.debug(f"schema:{ schema}")
38+
logger.debug(f"node_data:{ type(node_data)}")
39+
logger.debug(f"node_data:{ node_data}")
40+
41+
node_data = {
42+
k:v
43+
for k, v in node_data.dict().items()
44+
if k not in ["type", "ID", "id", "extra"]
45+
}
46+
47+
if node.type == "opsgptkg_task":
48+
logger.debug(f"node_data:{ node_data}")
49+
50+
# update agent/tool nodes and edges
51+
agents = node_data.pop("agents", [])
52+
tools = node_data.pop("tools", [])
53+
54+
for agent_or_tool in agents + tools:
55+
56+
if agent_or_tool["type"] not in ["opsgptkg_agent", "opsgptkg_tool"]:
57+
continue
58+
schema = TYPE2SCHEMA_BIZ.get(agent_or_tool["type"])
59+
agent_or_tool: NodeSchema = schema(**agent_or_tool)
60+
new_nodes.append(GNode(**{
61+
"id": agent_or_tool.id,
62+
"type": agent_or_tool.type,
63+
"attributes": agent_or_tool.attributes()
64+
}))
65+
# may lost edge attributes
66+
new_edges.append(GEdge(
67+
start_id=node.id,
68+
end_id=agent_or_tool.id,
69+
type='_route_'.join(['opsgptkg_task', agent_or_tool.type]),
70+
attributes={}
71+
))
72+
73+
if node.type == "opsgptkg_task":
74+
logger.debug(f"node_data:{ node_data}")
75+
logger.debug(f"node.attributes:{ node.attributes}")
76+
77+
new_nodes.append(GNode(**{
78+
"id": node.id,
79+
"type": node.type,
80+
"attributes": {**node_data, **node.attributes}
81+
}))
82+
return new_nodes, new_edges
83+
84+
85+
86+
def encode_biznodes(
87+
nodes: List[GNode],
88+
edges: List[GEdge]
89+
) -> Tuple[List[GNode], List[GEdge]]:
90+
'''
91+
compatible biz data:
92+
- node's attributes "extra"
93+
- task node with tools and agents
94+
'''
95+
task_nodes_by_id: Dict[str, GNode] = {
96+
n.id: n for n in nodes if n.type=="opsgptkg_task"
97+
}
98+
agent_nodes_by_id: Dict[str, GNode] = {
99+
n.id: n for n in nodes if n.type in ["opsgptkg_tool"]
100+
}
101+
tool_nodes_by_id: Dict[str, GNode] = {
102+
n.id: n for n in nodes if n.type in ["opsgptkg_agent"]
103+
}
104+
105+
new_edges = []
106+
for edge in edges:
107+
if edge.start_id in task_nodes_by_id:
108+
task_nodes_by_id[edge.start_id].attributes.setdefault("agents", [])
109+
task_nodes_by_id[edge.start_id].attributes.setdefault("tools", [])
110+
111+
if edge.end_id in agent_nodes_by_id:
112+
task_nodes_by_id[edge.end_id].attributes["agents"].append(
113+
agent_nodes_by_id[edge.end_id].dict()
114+
)
115+
continue
116+
117+
if edge.end_id in tool_nodes_by_id:
118+
task_nodes_by_id[edge.end_id].attributes["tools"].append(
119+
tool_nodes_by_id[edge.end_id].dict()
120+
)
121+
continue
122+
new_edges.append(edge)
123+
124+
125+
return nodes, new_edges

0 commit comments

Comments
 (0)