55from urllib .parse import urljoin
66
77import requests
8+ import tabulate
89from dify_plugin import Tool
910from dify_plugin .entities .tool import ToolInvokeMessage
1011
@@ -22,6 +23,7 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag
2223 question = tool_parameters .get ("question" )
2324 if not question :
2425 raise ValueError ("Please fill in the question" )
26+ output_format = tool_parameters .get ("output_format" , "text" ).lower ()
2527
2628 url = urljoin (base_url .rstrip ('/' ) + '/' , 'ask/stream' )
2729
@@ -51,10 +53,11 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag
5153 response .encoding = 'utf-8'
5254
5355 # 处理 SSE 流
56+ buffer = ""
5457 result = {}
5558 events = []
5659 answer = ""
57- buffer = ""
60+ stream = False
5861 for chunk in response .iter_content (chunk_size = 1024 , decode_unicode = True ):
5962 if chunk :
6063 # 确保 chunk 是字符串
@@ -82,42 +85,94 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag
8285 except json .JSONDecodeError :
8386 data = data_str
8487
85- conversation_id = ""
86- if event_type and data :
87- if not conversation_id :
88- conversation_id = data .get ('conversation_id' )
89-
90- if event_type == "other" :
91- sub_event_type = data .get ('sub_event' )
92- sub_data = {k : v for k , v in data .items () if k != 'sub_event' }
93- events .append ({"event" : sub_event_type , "data" : sub_data })
94- # elif event_type not in {"ping", "agent_answer", "agent_answer_end"}:
95- elif event_type != "ping" :
96- events .append ({"event" : event_type , "data" : data })
97-
98- if event_type == "sql_generate" :
99- result ['semantic_sql' ] = data .get ('semantic_sql' )
100- elif event_type == "semantic_to_sql" :
101- result ['query_sql' ] = data .get ('query_sql' )
102- elif event_type == "sql_execute" :
103- result ['query_data' ] = json .dumps (data .get ('query_data' , []))
104- elif event_type == "agent_answer" :
105- answer += data .get ('answer' )
106- elif event_type == "agent_answer_end" :
107- events .append ({"event" : "complete_agent_answer" , "data" : {"answer" : answer }})
108- answer = ""
109- elif event_type in {"hitl_ai_request" , "hitl_tool_approval" }:
110- raise RuntimeError ("HITL (Human-in-the-loop) is not supported." )
111- elif event_type == "error" :
112- result ['error' ] = data .get ('error' )
113- elif event_type == "finished" :
114- result ['status' ] = data .get ('status' )
115-
116- result ['conversation_id' ] = conversation_id
117- result ['events' ] = events
118-
119- logging .info (f"result: { result } " )
120- yield self .create_json_message (result )
88+ if output_format == "json" :
89+ # json output format
90+ conversation_id = ""
91+ if event_type and data :
92+ if not conversation_id :
93+ conversation_id = data .get ('conversation_id' )
94+
95+ if event_type == "other" :
96+ sub_event_type = data .get ('sub_event' )
97+ sub_data = {k : v for k , v in data .items () if k != 'sub_event' }
98+ events .append ({"event" : sub_event_type , "data" : sub_data })
99+ # elif event_type not in {"ping", "agent_answer", "agent_answer_end"}:
100+ elif event_type != "ping" :
101+ events .append ({"event" : event_type , "data" : data })
102+
103+ if event_type == "sql_generate" :
104+ result ['semantic_sql' ] = data .get ('semantic_sql' )
105+ elif event_type == "semantic_to_sql" :
106+ result ['query_sql' ] = data .get ('query_sql' )
107+ elif event_type == "sql_execute" :
108+ result ['query_data' ] = data .get ('query_data' , [])
109+ elif event_type == "agent_answer" :
110+ answer += data .get ('answer' )
111+ elif event_type == "agent_answer_end" :
112+ events .append ({"event" : "complete_agent_answer" , "data" : {"answer" : answer }})
113+ answer = ""
114+ elif event_type in {"hitl_ai_request" , "hitl_tool_approval" }:
115+ raise RuntimeError ("HITL (Human-in-the-loop) is not supported." )
116+ elif event_type == "error" :
117+ result ['error' ] = data .get ('error' )
118+ elif event_type == "finished" :
119+ result ['status' ] = data .get ('status' )
120+
121+ result ['conversation_id' ] = conversation_id
122+ result ['events' ] = events
123+
124+ else :
125+ # text output format
126+ if event_type and data :
127+ if event_type == "sql_generate" :
128+ yield self .create_text_message (
129+ f"<details open><summary>✨ Semantic SQL</summary>" )
130+ yield self .create_text_message (f"{ data .get ('semantic_sql' )} </details>\n " )
131+ elif event_type == "semantic_to_sql" :
132+ yield self .create_text_message (f"<details open><summary>🎯 Query SQL</summary>" )
133+ yield self .create_text_message (f"{ data .get ('query_sql' )} </details>\n " )
134+ elif event_type == "sql_execute" :
135+ yield self .create_text_message (f"📊 Query Data: \n \n " )
136+ content = "No data to display"
137+ query_data = data .get ('query_data' , [])
138+ if query_data and len (query_data [0 ]) > 0 :
139+ content = tabulate .tabulate (query_data , headers = "keys" , tablefmt = "github" ,
140+ floatfmt = "" )
141+ yield self .create_text_message (f"{ content } \n \n " )
142+ elif event_type == "agent_answer" :
143+ if not stream :
144+ yield self .create_text_message (
145+ f"<details open><summary>🤖 Agent Answer</summary>" )
146+ yield self .create_text_message (data .get ('answer' ))
147+ stream = True
148+ elif event_type == "agent_answer_end" :
149+ yield self .create_text_message (f"</details>\n " )
150+ stream = False
151+ elif event_type in {"hitl_ai_request" , "hitl_tool_approval" }:
152+ raise RuntimeError ("HITL (Human-in-the-loop) is not supported." )
153+ elif event_type == "error" :
154+ yield self .create_text_message (
155+ f"<details open><summary>❌ Error Message</summary>" )
156+ yield self .create_text_message (f"{ data .get ('error' )} </details>\n " )
157+ elif event_type == "other" :
158+ yield self .create_text_message (
159+ f"<details open><summary>💬 { data .get ('sub_event' )} </summary>" )
160+ sub_data = {k : v for k , v in data .items () if
161+ k not in ['sub_event' , 'conversation_id' , 'timestamp' ]}
162+ # content = json.dumps(sub_data, ensure_ascii=False)
163+ content = '\n ' .join ([f"<p>{ k } : { v } </p>" for k , v in sub_data .items ()])
164+ yield self .create_text_message (f"{ content } </details>\n " )
165+ # elif event_type == "finished":
166+ # status = data.get('status')
167+ # if status == 'succeeded':
168+ # yield self.create_text_message(f"✅ Finished Status: `succeeded`")
169+ # else:
170+ # yield self.create_text_message(f"❌ Finished Status: `{status}`")
171+
172+ if output_format == "json" :
173+ logging .info (f"result: { result } " )
174+ yield self .create_json_message (result )
175+
121176 except requests .exceptions .RequestException as e :
122177 error_msg = f"Request exception: { e } "
123178 logging .error (error_msg )
0 commit comments