99import static modelengine .fitframework .util .ObjectUtils .cast ;
1010
1111import modelengine .fel .tool .mcp .client .McpClient ;
12+ import modelengine .fel .tool .mcp .entity .ClientSchema ;
1213import modelengine .fel .tool .mcp .entity .JsonRpc ;
1314import modelengine .fel .tool .mcp .entity .Method ;
14- import modelengine .fel .tool .mcp .entity .Server ;
15+ import modelengine .fel .tool .mcp .entity .ServerSchema ;
1516import modelengine .fel .tool .mcp .entity .Tool ;
1617import modelengine .fit .http .client .HttpClassicClient ;
1718import modelengine .fit .http .client .HttpClassicClientRequest ;
2627import modelengine .fitframework .schedule .ThreadPoolExecutor ;
2728import modelengine .fitframework .schedule .ThreadPoolScheduler ;
2829import modelengine .fitframework .serialization .ObjectSerializer ;
30+ import modelengine .fitframework .util .CollectionUtils ;
2931import modelengine .fitframework .util .LockUtils ;
32+ import modelengine .fitframework .util .MapBuilder ;
3033import modelengine .fitframework .util .ObjectUtils ;
34+ import modelengine .fitframework .util .StringUtils ;
3135import modelengine .fitframework .util .ThreadUtils ;
3236import modelengine .fitframework .util .UuidUtils ;
3337
@@ -54,37 +58,43 @@ public class DefaultMcpClient implements McpClient {
5458
5559 private final ObjectSerializer jsonSerializer ;
5660 private final HttpClassicClient client ;
57- private final String connectionString ;
61+ private final String baseUri ;
62+ private final String sseEndpoint ;
5863 private final String name ;
5964 private final AtomicLong id = new AtomicLong (0 );
6065
61- private volatile String messageUrl ;
66+ private volatile String messageEndpoint ;
6267 private volatile String sessionId ;
63- private volatile Server server ;
68+ private volatile ServerSchema serverSchema ;
6469 private volatile boolean initialized = false ;
6570 private final List <Tool > tools = new ArrayList <>();
6671 private final Object initializedLock = LockUtils .newSynchronizedLock ();
6772 private final Object toolsLock = LockUtils .newSynchronizedLock ();
6873 private final Map <Long , Consumer <JsonRpc .Response <Long >>> responseConsumers = new ConcurrentHashMap <>();
6974 private final Map <Long , Boolean > pendingRequests = new ConcurrentHashMap <>();
75+ private final Map <Long , Object > pendingResults = new ConcurrentHashMap <>();
7076
7177 /**
7278 * Constructs a new instance of the DefaultMcpClient.
7379 *
7480 * @param jsonSerializer The serializer used for JSON serialization and deserialization.
7581 * @param client The HTTP client used for communication with the MCP server.
76- * @param connectionString The connection string used to establish the initial connection.
82+ * @param baseUri The base URI of the MCP server.
83+ * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
7784 */
78- public DefaultMcpClient (ObjectSerializer jsonSerializer , HttpClassicClient client , String connectionString ) {
85+ public DefaultMcpClient (ObjectSerializer jsonSerializer , HttpClassicClient client , String baseUri ,
86+ String sseEndpoint ) {
7987 this .jsonSerializer = jsonSerializer ;
8088 this .client = client ;
81- this .connectionString = connectionString ;
89+ this .baseUri = baseUri ;
90+ this .sseEndpoint = sseEndpoint ;
8291 this .name = UuidUtils .randomUuidString ();
8392 }
8493
8594 @ Override
8695 public void initialize () {
87- HttpClassicClientRequest request = this .client .createRequest (HttpRequestMethod .GET , connectionString );
96+ HttpClassicClientRequest request =
97+ this .client .createRequest (HttpRequestMethod .GET , this .baseUri + this .sseEndpoint );
8898 Choir <TextEvent > messages = this .client .exchangeStream (request , TextEvent .class );
8999 ThreadPoolExecutor threadPool = ThreadPoolExecutor .custom ()
90100 .threadPoolName ("mcp-client-" + this .name )
@@ -125,7 +135,13 @@ public void initialize() {
125135 }
126136
127137 private void consumeTextEvent (TextEvent textEvent ) {
128- log .info ("Receive message from MCP server. [message={}]" , textEvent .data ());
138+ log .info ("Receive message from MCP server. [id={}, event={}, message={}]" ,
139+ textEvent .id (),
140+ textEvent .event (),
141+ textEvent .data ());
142+ if (StringUtils .isBlank (textEvent .event ()) || StringUtils .isBlank ((String ) textEvent .data ())) {
143+ return ;
144+ }
129145 if (Objects .equals (textEvent .event (), "endpoint" )) {
130146 this .initializeMcpServer (textEvent );
131147 return ;
@@ -157,7 +173,8 @@ private void pingServer() {
157173 log .info ("MCP client is not initialized and {} method will be delayed." , Method .PING .code ());
158174 return ;
159175 }
160- HttpClassicClientRequest request = this .client .createRequest (HttpRequestMethod .POST , this .messageUrl );
176+ HttpClassicClientRequest request =
177+ this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
161178 long currentId = this .getNextId ();
162179 JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId , Method .PING .code ());
163180 request .entity (Entity .createObject (request , rpcRequest ));
@@ -183,12 +200,17 @@ private void pingServer() {
183200 }
184201
185202 private void initializeMcpServer (TextEvent textEvent ) {
186- this .messageUrl = textEvent .data ().toString ();
187- this .sessionId = textEvent .id ();
188- HttpClassicClientRequest request = this .client .createRequest (HttpRequestMethod .POST , this .messageUrl );
203+ this .messageEndpoint = textEvent .data ().toString ();
204+ HttpClassicClientRequest request =
205+ this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
206+ this .sessionId =
207+ request .queries ().first ("session_id" ).orElseThrow (() -> new IllegalStateException ("no session_id" ));
189208 long currentId = this .getNextId ();
190209 this .responseConsumers .put (currentId , this ::initializedMcpServer );
191- JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId , Method .INITIALIZE .code ());
210+ ClientSchema schema = new ClientSchema ("2024-11-05" ,
211+ new ClientSchema .Capabilities (),
212+ new ClientSchema .Info ("FIT MCP Client" , "3.6.0-SNAPSHOT" ));
213+ JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId , Method .INITIALIZE .code (), schema );
192214 request .entity (Entity .createObject (request , rpcRequest ));
193215 log .info ("Send {} method to MCP server. [sessionId={}, request={}]" ,
194216 Method .INITIALIZE .code (),
@@ -223,9 +245,9 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
223245 this .initialized = true ;
224246 this .initializedLock .notifyAll ();
225247 }
226- this .server = ObjectUtils . toCustomObject (response . result (), Server . class );
227- log . info ( "MCP server has initialized. [server={}]" , this . server );
228- HttpClassicClientRequest request = this .client .createRequest (HttpRequestMethod .POST , this .messageUrl );
248+ this .recordServerSchema (response );
249+ HttpClassicClientRequest request =
250+ this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this . messageEndpoint );
229251 JsonRpc .Notification notification = JsonRpc .createNotification (Method .NOTIFICATION_INITIALIZED .code ());
230252 request .entity (Entity .createObject (request , notification ));
231253 log .info ("Send {} method to MCP server. [sessionId={}, notification={}]" ,
@@ -249,12 +271,19 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
249271 }
250272 }
251273
274+ private void recordServerSchema (JsonRpc .Response <Long > response ) {
275+ Map <String , Object > mapResult = cast (response .result ());
276+ this .serverSchema = ServerSchema .create (mapResult );
277+ log .info ("MCP server has initialized. [server={}]" , this .serverSchema );
278+ }
279+
252280 @ Override
253281 public List <Tool > getTools () {
254282 if (this .isNotInitialized ()) {
255283 throw new IllegalStateException ("MCP client is not initialized. Please wait a moment." );
256284 }
257- HttpClassicClientRequest request = this .client .createRequest (HttpRequestMethod .POST , this .messageUrl );
285+ HttpClassicClientRequest request =
286+ this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
258287 long currentId = this .getNextId ();
259288 this .responseConsumers .put (currentId , this ::getTools0 );
260289 this .pendingRequests .put (currentId , true );
@@ -292,6 +321,7 @@ private void getTools0(JsonRpc.Response<Long> response) {
292321 log .error ("Failed to get tools list from MCP server. [sessionId={}, response={}]" ,
293322 this .sessionId ,
294323 response );
324+ this .pendingRequests .put (response .id (), false );
295325 return ;
296326 }
297327 Map <String , Object > result = cast (response .result ());
@@ -301,16 +331,73 @@ private void getTools0(JsonRpc.Response<Long> response) {
301331 this .tools .addAll (rawTools .stream ()
302332 .map (rawTool -> ObjectUtils .<Tool >toCustomObject (rawTool , Tool .class ))
303333 .toList ());
304- this .pendingRequests .put (response .id (), false );
305334 }
335+ this .pendingRequests .put (response .id (), false );
306336 }
307337
308338 @ Override
309339 public Object callTool (String name , Map <String , Object > arguments ) {
310340 if (this .isNotInitialized ()) {
311341 throw new IllegalStateException ("MCP client is not initialized. Please wait a moment." );
312342 }
313- return null ;
343+ HttpClassicClientRequest request =
344+ this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
345+ long currentId = this .getNextId ();
346+ this .responseConsumers .put (currentId , this ::callTools0 );
347+ this .pendingRequests .put (currentId , true );
348+ JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId ,
349+ Method .TOOLS_CALL .code (),
350+ MapBuilder .<String , Object >get ().put ("name" , name ).put ("arguments" , arguments ).build ());
351+ request .entity (Entity .createObject (request , rpcRequest ));
352+ log .info ("Send {} method to MCP server. [sessionId={}, request={}]" ,
353+ Method .TOOLS_CALL .code (),
354+ this .sessionId ,
355+ rpcRequest );
356+ try (HttpClassicClientResponse <Object > exchange = request .exchange (Object .class )) {
357+ if (exchange .statusCode () >= 200 && exchange .statusCode () < 300 ) {
358+ log .info ("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]" ,
359+ Method .TOOLS_CALL .code (),
360+ this .sessionId ,
361+ exchange .statusCode ());
362+ } else {
363+ log .error ("Failed to {} MCP server. [sessionId={}, statusCode={}]" ,
364+ Method .TOOLS_CALL .code (),
365+ this .sessionId ,
366+ exchange .statusCode ());
367+ }
368+ } catch (IOException e ) {
369+ throw new IllegalStateException (e );
370+ }
371+ while (this .pendingRequests .get (currentId )) {
372+ ThreadUtils .sleep (100 );
373+ }
374+ return this .pendingResults .get (currentId );
375+ }
376+
377+ private void callTools0 (JsonRpc .Response <Long > response ) {
378+ if (response .error () != null ) {
379+ log .error ("Failed to call tool from MCP server. [sessionId={}, response={}]" , this .sessionId , response );
380+ this .pendingRequests .put (response .id (), false );
381+ return ;
382+ }
383+ Map <String , Object > result = cast (response .result ());
384+ boolean isError = cast (result .get ("isError" ));
385+ if (isError ) {
386+ log .error ("Failed to call tool from MCP server. [sessionId={}, result={}]" , this .sessionId , result );
387+ this .pendingRequests .put (response .id (), false );
388+ return ;
389+ }
390+ List <Map <String , Object >> rawContents = cast (result .get ("content" ));
391+ if (CollectionUtils .isEmpty (rawContents )) {
392+ log .error ("Failed to call tool from MCP server: no result returned. [sessionId={}, result={}]" ,
393+ this .sessionId ,
394+ result );
395+ this .pendingRequests .put (response .id (), false );
396+ return ;
397+ }
398+ Map <String , Object > rawContent = rawContents .get (0 );
399+ this .pendingResults .put (response .id (), rawContent .get ("text" ));
400+ this .pendingRequests .put (response .id (), false );
314401 }
315402
316403 private long getNextId () {
0 commit comments