77import com .t4a .processor .AIProcessor ;
88import com .t4a .transform .PromptTransformer ;
99
10+ import io .github .vishalmysore .common .Agent ;
1011import io .github .vishalmysore .common .CommonClientResponse ;
1112import lombok .extern .slf4j .Slf4j ;
1213
14+ import java .util .ArrayList ;
15+ import java .util .List ;
16+ import java .util .concurrent .*;
17+
1318/**
1419 * AgenticMesh is a class that implements various AI processing patterns such as Pipeline Mesh, Hub and Spoke, and Blackboard.
1520 * It uses an AgentCatalog to process queries and an AIProcessor to handle AI-related tasks.
@@ -29,65 +34,8 @@ public AgenticMesh(AgentCatalog agentCatalog) {
2934
3035 }
3136
32- /**
33- * Extracts the first valid JSON object from a response that may contain multiple JSONs or extra text.
34- * Handles markdown code blocks and commentary from LLM.
35- * @param response The response string that may contain multiple JSON objects
36- * @return First valid JSON object as string, or original response if no JSON found
37- */
38- private String extractFirstJsonObject (String response ) {
39- if (response == null || response .trim ().isEmpty ()) {
40- return response ;
41- }
42-
43- String cleaned = response .trim ();
44-
45- // Remove markdown code blocks first
46- if (cleaned .contains ("```json" )) {
47- int startIdx = cleaned .indexOf ("```json" ) + 7 ;
48- int endIdx = cleaned .indexOf ("```" , startIdx );
49- if (endIdx > startIdx ) {
50- cleaned = cleaned .substring (startIdx , endIdx ).trim ();
51- }
52- } else if (cleaned .startsWith ("```" )) {
53- int startIdx = cleaned .indexOf ("```" ) + 3 ;
54- int endIdx = cleaned .indexOf ("```" , startIdx );
55- if (endIdx > startIdx ) {
56- cleaned = cleaned .substring (startIdx , endIdx ).trim ();
57- }
58- }
59-
60- // Find first { and matching }
61- int firstBrace = cleaned .indexOf ('{' );
62- if (firstBrace == -1 ) {
63- return response ;
64- }
65-
66- int braceCount = 0 ;
67- int endIdx = -1 ;
68-
69- for (int i = firstBrace ; i < cleaned .length (); i ++) {
70- char c = cleaned .charAt (i );
71- if (c == '{' ) {
72- braceCount ++;
73- } else if (c == '}' ) {
74- braceCount --;
75- if (braceCount == 0 ) {
76- endIdx = i ;
77- break ;
78- }
79- }
80- }
81-
82- if (endIdx > firstBrace ) {
83- return cleaned .substring (firstBrace , endIdx + 1 );
84- }
85-
86- return response ;
87- }
88-
8937 public CommonClientResponse pipeLineMesh (String query ){
90- return pipeLineMesh (query , null );
38+ return pipeLineMesh (query , null , 0 );
9139 }
9240
9341 /**
@@ -96,54 +44,42 @@ public CommonClientResponse pipeLineMesh(String query){
9644 * The method recursively processes the query until a complete response is obtained or no further processing is needed.
9745 * @param query The initial query to be processed.
9846 * @param previousResponse
47+ * @param depth Current recursion depth
9948 * @return
10049 */
101- public CommonClientResponse pipeLineMesh (String query , CommonClientResponse previousResponse ) {
50+ public CommonClientResponse pipeLineMesh (String query , CommonClientResponse previousResponse , int depth ) {
51+ if (depth > 10 ) {
52+ log .warn ("Max recursion depth of 10 reached, returning current response" );
53+ return previousResponse != null ? previousResponse : agentCatalog .processQuery (query );
54+ }
55+
10256 CommonClientResponse response = agentCatalog .processQuery (query );
10357 String jsonYesOrNo = null ;
10458 String yesOrNoField = "is_User_Query_Answered_Fully_Yes_Or_No_Only" ;
105- String pendingQuery = "if_No_Then_What_Is_Pending_Query" ;
59+ String pendingQuery = "if_No_Then_What_Is_Pending_Query" ;
60+ String explain = "explain_why_the_query_is_fully_answered_by_looking_at_each_part_of_the_response_and_query" ;
10661 if (response ==null ) {
107- return previousResponse ;
62+ return previousResponse ;
10863 } else {
10964 try {
110- String prompt = "Original query: " + query + " response: " + response + ". IMPORTANT: Return ONLY a single JSON object, no explanations, no multiple JSONs, no markdown formatting." ;
111- jsonYesOrNo = promptTransformer .transformIntoJson (
112- jsonUtils .createJson (yesOrNoField , pendingQuery ).toString (),
113- prompt
114- );
115-
116- // Extract first valid JSON object
117- jsonYesOrNo = extractFirstJsonObject (jsonYesOrNo );
118- log .debug ("Extracted JSON: {}" , jsonYesOrNo );
119-
65+ jsonYesOrNo = promptTransformer .transformIntoJson (jsonUtils .createJson (yesOrNoField ,pendingQuery ,explain ).toString (),"This is the Original query " +query + " This is the response so far " +response +" " );
66+ log .info (jsonYesOrNo );
12067 String yesOrNo = jsonUtils .getFieldValueFromMultipleFields (jsonYesOrNo , yesOrNoField );
121- if (yesOrNo != null && yesOrNo .contains ("Yes" )) {
68+ if (yesOrNo .contains ("Yes" )) {
12269 return response ;
123- } else if (yesOrNo != null ) {
124- String pendingQueryValue = jsonUtils .getFieldValueFromMultipleFields (jsonYesOrNo , pendingQuery );
125- if (pendingQueryValue != null && !pendingQueryValue .trim ().isEmpty ()) {
126- response = pipeLineMesh ("Pending query: " + pendingQueryValue + " previous response: " + response .getTextResult (), response );
127- } else {
128- return response ;
129- }
13070 } else {
131- log . warn ( "Could not extract yes/no answer, returning current response" );
132- return response ;
71+ String pendingQueryValue = jsonUtils . getFieldValueFromMultipleFields ( jsonYesOrNo , pendingQuery );
72+ response = pipeLineMesh ( "Pending query " + pendingQueryValue + " response so far " + response . getTextResult () , response , depth + 1 ) ;
13373 }
13474
13575 } catch (AIProcessingException e ) {
136- log .warn ("AIProcessingException in pipeLineMesh: {}" , e .getMessage ());
137- return response ;
138- } catch (Exception e ) {
139- log .error ("Unexpected error in pipeLineMesh: {}" , e .getMessage (), e );
140- return response ;
76+ log .warn (e .getMessage ());
14177 }
14278
14379 }
14480
14581 return response ;
146- }
82+ }
14783
14884 /**
14985 * Processes a query using the Hub and Spoke pattern.
@@ -162,13 +98,10 @@ public CommonClientResponse hubAndSpoke(String query) {
16298
16399 try {
164100 // Get list of sub-queries needed
165- String prompt = "Original query: " + query + " Initial response: " + mainResponse .getTextResult () + ". IMPORTANT: Return ONLY a single JSON object, no explanations." ;
166101 jsonQueries = promptTransformer .transformIntoJson (
167102 jsonUtils .createJson (subQueriesField ).toString (),
168- prompt
103+ "Original query: " + query + " Initial response: " + mainResponse . getTextResult ()
169104 );
170-
171- jsonQueries = extractFirstJsonObject (jsonQueries );
172105
173106 String subQueriesString = jsonUtils .getFieldValueFromMultipleFields (jsonQueries , subQueriesField );
174107 if (subQueriesString == null || subQueriesString .isEmpty ()) {
@@ -215,13 +148,10 @@ public CommonClientResponse blackboard(String query) {
215148
216149 try {
217150 // Identify knowledge gaps and required expert agents
218- String prompt = "Analyze knowledge gaps and required experts for: " + initialKnowledge .getTextResult () + ". IMPORTANT: Return ONLY a single JSON object, no explanations." ;
219151 jsonAnalysis = promptTransformer .transformIntoJson (
220152 jsonUtils .createJson (knowledgeGapsField , expertAgentsField ).toString (),
221- prompt
153+ "Analyze knowledge gaps and required experts for: " + initialKnowledge . getTextResult ()
222154 );
223-
224- jsonAnalysis = extractFirstJsonObject (jsonAnalysis );
225155
226156 String gaps = jsonUtils .getFieldValueFromMultipleFields (jsonAnalysis , knowledgeGapsField );
227157 String experts = jsonUtils .getFieldValueFromMultipleFields (jsonAnalysis , expertAgentsField );
@@ -260,4 +190,141 @@ public CommonClientResponse blackboard(String query) {
260190 }
261191 }
262192
193+ /**
194+ * Processes a query using the Parallel Mesh pattern.
195+ * In this method, all agents are queried simultaneously and the best response is selected.
196+ * This pattern provides the fastest response time and allows comparison of different agent capabilities.
197+ * @param query The query to be processed by all agents
198+ * @return The best response selected by AI from all agent responses
199+ */
200+ public CommonClientResponse parallelMesh (String query ) {
201+ if (query == null || query .trim ().isEmpty ()) {
202+ log .warn ("Empty or null query provided to parallelMesh" );
203+ return null ;
204+ }
205+
206+ // Get all agents from catalog
207+ List <Agent > allAgents = new ArrayList <>(agentCatalog .getAgents ().values ());
208+
209+ if (allAgents .isEmpty ()) {
210+ log .warn ("No agents available in catalog" );
211+ return null ;
212+ }
213+
214+ // Create fixed thread pool sized to number of agents
215+ ExecutorService executorService = Executors .newFixedThreadPool (allAgents .size ());
216+ List <Future <CommonClientResponse >> futures = new ArrayList <>();
217+
218+ try {
219+ // Submit query to all agents in parallel
220+ log .info ("Submitting query to {} agents in parallel" , allAgents .size ());
221+ for (Agent agent : allAgents ) {
222+ Future <CommonClientResponse > future = executorService .submit (() -> {
223+ try {
224+ log .debug ("Querying agent: {}" , agent .getType ());
225+ return agent .remoteMethodCall (query );
226+ } catch (Exception e ) {
227+ log .warn ("Error querying agent {}: {}" , agent .getType (), e .getMessage ());
228+ return null ;
229+ }
230+ });
231+ futures .add (future );
232+ }
233+
234+ // Collect results with timeout
235+ List <CommonClientResponse > responses = new ArrayList <>();
236+ for (int i = 0 ; i < futures .size (); i ++) {
237+ try {
238+ CommonClientResponse response = futures .get (i ).get (30 , TimeUnit .SECONDS );
239+ if (response != null && response .getTextResult () != null && !response .getTextResult ().trim ().isEmpty ()) {
240+ responses .add (response );
241+ log .debug ("Received response from agent {}" , i );
242+ }
243+ } catch (TimeoutException e ) {
244+ log .warn ("Agent {} timed out after 30 seconds" , i );
245+ } catch (InterruptedException e ) {
246+ log .warn ("Agent {} was interrupted" , i );
247+ Thread .currentThread ().interrupt ();
248+ } catch (ExecutionException e ) {
249+ log .warn ("Agent {} threw exception: {}" , i , e .getMessage ());
250+ }
251+ }
252+
253+ log .info ("Received {} valid responses out of {} agents" , responses .size (), allAgents .size ());
254+
255+ // Handle cases with no valid responses
256+ if (responses .isEmpty ()) {
257+ log .warn ("No valid responses received from any agent" );
258+ return null ;
259+ }
260+
261+ // If only one response, return it directly
262+ if (responses .size () == 1 ) {
263+ return responses .get (0 );
264+ }
265+
266+ // Use AI to select the best response
267+ return selectBestResponse (query , responses );
268+
269+ } finally {
270+ // Clean up thread pool
271+ executorService .shutdown ();
272+ try {
273+ if (!executorService .awaitTermination (5 , TimeUnit .SECONDS )) {
274+ executorService .shutdownNow ();
275+ }
276+ } catch (InterruptedException e ) {
277+ executorService .shutdownNow ();
278+ Thread .currentThread ().interrupt ();
279+ }
280+ }
281+ }
282+
283+ /**
284+ * Uses AI to select the best response from multiple agent responses.
285+ * @param query The original query
286+ * @param responses List of responses from different agents
287+ * @return The response deemed most appropriate by the AI
288+ */
289+ private CommonClientResponse selectBestResponse (String query , List <CommonClientResponse > responses ) {
290+ try {
291+ // Build context with all responses
292+ StringBuilder allResponses = new StringBuilder ();
293+ for (int i = 0 ; i < responses .size (); i ++) {
294+ allResponses .append ("Response " ).append (i + 1 ).append (": " )
295+ .append (responses .get (i ).getTextResult ())
296+ .append ("\n \n " );
297+ }
298+
299+ // Ask AI to select the best response
300+ String bestIndexField = "best_response_index_1_based" ;
301+ String reasonField = "reason_for_selection" ;
302+ String jsonSelection = promptTransformer .transformIntoJson (
303+ jsonUtils .createJson (bestIndexField , reasonField ).toString (),
304+ "Original query: " + query + "\n \n All responses:\n " + allResponses +
305+ "\n Select the response that best answers the query. Return the 1-based index."
306+ );
307+
308+ String bestIndexStr = jsonUtils .getFieldValueFromMultipleFields (jsonSelection , bestIndexField );
309+ String reason = jsonUtils .getFieldValueFromMultipleFields (jsonSelection , reasonField );
310+
311+ int bestIndex = Integer .parseInt (bestIndexStr .trim ()) - 1 ; // Convert to 0-based
312+
313+ if (bestIndex >= 0 && bestIndex < responses .size ()) {
314+ log .info ("Selected response {} out of {}. Reason: {}" , bestIndex + 1 , responses .size (), reason );
315+ return responses .get (bestIndex );
316+ } else {
317+ log .warn ("Invalid index {} returned by AI, returning first response" , bestIndex );
318+ return responses .get (0 );
319+ }
320+
321+ } catch (AIProcessingException e ) {
322+ log .warn ("Error selecting best response: {}, returning first response" , e .getMessage ());
323+ return responses .get (0 );
324+ } catch (Exception e ) {
325+ log .warn ("Unexpected error selecting best response: {}, returning first response" , e .getMessage ());
326+ return responses .get (0 );
327+ }
328+ }
329+
263330}
0 commit comments