1010
1111import modelengine .fel .tool .mcp .client .McpClient ;
1212import modelengine .fel .tool .mcp .entity .ClientSchema ;
13+ import modelengine .fel .tool .mcp .entity .Event ;
1314import modelengine .fel .tool .mcp .entity .JsonRpc ;
1415import modelengine .fel .tool .mcp .entity .Method ;
1516import modelengine .fel .tool .mcp .entity .ServerSchema ;
4344import java .util .concurrent .ConcurrentHashMap ;
4445import java .util .concurrent .TimeUnit ;
4546import java .util .concurrent .atomic .AtomicLong ;
47+ import java .util .function .BiConsumer ;
4648import java .util .function .Consumer ;
4749
4850/**
@@ -129,8 +131,8 @@ public void initialize() {
129131 .runnable (this ::pingServer )
130132 .policy (ExecutePolicy .fixedDelay (DELAY_MILLIS ))
131133 .build (), DELAY_MILLIS );
132- while (!this .waitInitialized ()) {
133- ThreadUtils . sleep ( 100 );
134+ if (!this .waitInitialized ()) {
135+ throw new IllegalStateException ( "Failed to initialize." );
134136 }
135137 }
136138
@@ -142,7 +144,7 @@ private void consumeTextEvent(TextEvent textEvent) {
142144 if (StringUtils .isBlank (textEvent .event ()) || StringUtils .isBlank ((String ) textEvent .data ())) {
143145 return ;
144146 }
145- if (Objects .equals (textEvent .event (), "endpoint" )) {
147+ if (Objects .equals (textEvent .event (), Event . ENDPOINT . code () )) {
146148 this .initializeMcpServer (textEvent );
147149 return ;
148150 }
@@ -173,64 +175,20 @@ private void pingServer() {
173175 log .info ("MCP client is not initialized and {} method will be delayed." , Method .PING .code ());
174176 return ;
175177 }
176- HttpClassicClientRequest request =
177- this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
178- long currentId = this .getNextId ();
179- JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId , Method .PING .code ());
180- request .entity (Entity .createObject (request , rpcRequest ));
181- log .info ("Send {} method to MCP server. [sessionId={}, request={}]" ,
182- Method .PING .code (),
183- this .sessionId ,
184- rpcRequest );
185- try (HttpClassicClientResponse <Object > exchange = request .exchange (Object .class )) {
186- if (exchange .statusCode () >= 200 && exchange .statusCode () < 300 ) {
187- log .info ("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]" ,
188- Method .PING .code (),
189- this .sessionId ,
190- exchange .statusCode ());
191- } else {
192- log .error ("Failed to {} MCP server. [sessionId={}, statusCode={}]" ,
193- Method .PING .code (),
194- this .sessionId ,
195- exchange .statusCode ());
196- }
197- } catch (IOException e ) {
198- throw new IllegalStateException (e );
199- }
178+ this .post2McpServer (Method .PING , null , null );
200179 }
201180
202181 private void initializeMcpServer (TextEvent textEvent ) {
203182 this .messageEndpoint = textEvent .data ().toString ();
204- HttpClassicClientRequest request =
205- this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
206- this .sessionId =
207- request .queries ().first ("session_id" ).orElseThrow (() -> new IllegalStateException ("no session_id" ));
208- long currentId = this .getNextId ();
209- this .responseConsumers .put (currentId , this ::initializedMcpServer );
210183 ClientSchema schema = new ClientSchema ("2024-11-05" ,
211184 new ClientSchema .Capabilities (),
212185 new ClientSchema .Info ("FIT MCP Client" , "3.6.0-SNAPSHOT" ));
213- JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId , Method .INITIALIZE .code (), schema );
214- request .entity (Entity .createObject (request , rpcRequest ));
215- log .info ("Send {} method to MCP server. [sessionId={}, request={}]" ,
216- Method .INITIALIZE .code (),
217- this .sessionId ,
218- rpcRequest );
219- try (HttpClassicClientResponse <Object > exchange = request .exchange (Object .class )) {
220- if (exchange .statusCode () >= 200 && exchange .statusCode () < 300 ) {
221- log .info ("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]" ,
222- Method .INITIALIZE .code (),
223- this .sessionId ,
224- exchange .statusCode ());
225- } else {
226- log .error ("Failed to {} MCP server. [sessionId={}, statusCode={}]" ,
227- Method .INITIALIZE .code (),
228- this .sessionId ,
229- exchange .statusCode ());
230- }
231- } catch (IOException e ) {
232- throw new IllegalStateException (e );
233- }
186+ this .post2McpServer (Method .INITIALIZE , schema , (request , currentId ) -> {
187+ this .sessionId = request .queries ()
188+ .first ("session_id" )
189+ .orElseThrow (() -> new IllegalStateException ("The session_id cannot be empty." ));
190+ this .responseConsumers .put (currentId , this ::initializedMcpServer );
191+ });
234192 }
235193
236194 private void initializedMcpServer (JsonRpc .Response <Long > response ) {
@@ -282,33 +240,11 @@ public List<Tool> getTools() {
282240 if (this .isNotInitialized ()) {
283241 throw new IllegalStateException ("MCP client is not initialized. Please wait a moment." );
284242 }
285- HttpClassicClientRequest request =
286- this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
287- long currentId = this .getNextId ();
288- this .responseConsumers .put (currentId , this ::getTools0 );
289- this .pendingRequests .put (currentId , true );
290- JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId , Method .TOOLS_LIST .code ());
291- request .entity (Entity .createObject (request , rpcRequest ));
292- log .info ("Send {} method to MCP server. [sessionId={}, request={}]" ,
293- Method .TOOLS_LIST .code (),
294- this .sessionId ,
295- rpcRequest );
296- try (HttpClassicClientResponse <Object > exchange = request .exchange (Object .class )) {
297- if (exchange .statusCode () >= 200 && exchange .statusCode () < 300 ) {
298- log .info ("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]" ,
299- Method .TOOLS_LIST .code (),
300- this .sessionId ,
301- exchange .statusCode ());
302- } else {
303- log .error ("Failed to {} MCP server. [sessionId={}, statusCode={}]" ,
304- Method .TOOLS_LIST .code (),
305- this .sessionId ,
306- exchange .statusCode ());
307- }
308- } catch (IOException e ) {
309- throw new IllegalStateException (e );
310- }
311- while (this .pendingRequests .get (currentId )) {
243+ long requestId = this .post2McpServer (Method .TOOLS_LIST , null , (request , currentId ) -> {
244+ this .responseConsumers .put (currentId , this ::getTools0 );
245+ this .pendingRequests .put (currentId , true );
246+ });
247+ while (this .pendingRequests .get (requestId )) {
312248 ThreadUtils .sleep (100 );
313249 }
314250 synchronized (this .toolsLock ) {
@@ -340,38 +276,16 @@ public Object callTool(String name, Map<String, Object> arguments) {
340276 if (this .isNotInitialized ()) {
341277 throw new IllegalStateException ("MCP client is not initialized. Please wait a moment." );
342278 }
343- HttpClassicClientRequest request =
344- this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
345- long currentId = this .getNextId ();
346- this .responseConsumers .put (currentId , this ::callTools0 );
347- this .pendingRequests .put (currentId , true );
348- JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId ,
349- Method .TOOLS_CALL .code (),
350- MapBuilder .<String , Object >get ().put ("name" , name ).put ("arguments" , arguments ).build ());
351- request .entity (Entity .createObject (request , rpcRequest ));
352- log .info ("Send {} method to MCP server. [sessionId={}, request={}]" ,
353- Method .TOOLS_CALL .code (),
354- this .sessionId ,
355- rpcRequest );
356- try (HttpClassicClientResponse <Object > exchange = request .exchange (Object .class )) {
357- if (exchange .statusCode () >= 200 && exchange .statusCode () < 300 ) {
358- log .info ("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]" ,
359- Method .TOOLS_CALL .code (),
360- this .sessionId ,
361- exchange .statusCode ());
362- } else {
363- log .error ("Failed to {} MCP server. [sessionId={}, statusCode={}]" ,
364- Method .TOOLS_CALL .code (),
365- this .sessionId ,
366- exchange .statusCode ());
367- }
368- } catch (IOException e ) {
369- throw new IllegalStateException (e );
370- }
371- while (this .pendingRequests .get (currentId )) {
279+ long requestId = this .post2McpServer (Method .TOOLS_CALL ,
280+ MapBuilder .<String , Object >get ().put ("name" , name ).put ("arguments" , arguments ).build (),
281+ (request , currentId ) -> {
282+ this .responseConsumers .put (currentId , this ::callTools0 );
283+ this .pendingRequests .put (currentId , true );
284+ });
285+ while (this .pendingRequests .get (requestId )) {
372286 ThreadUtils .sleep (100 );
373287 }
374- return this .pendingResults .get (currentId );
288+ return this .pendingResults .get (requestId );
375289 }
376290
377291 private void callTools0 (JsonRpc .Response <Long > response ) {
@@ -400,6 +314,37 @@ private void callTools0(JsonRpc.Response<Long> response) {
400314 this .pendingRequests .put (response .id (), false );
401315 }
402316
317+ private long post2McpServer (Method method , Object requestParams ,
318+ BiConsumer <HttpClassicClientRequest , Long > requestConsumer ) {
319+ HttpClassicClientRequest request =
320+ this .client .createRequest (HttpRequestMethod .POST , this .baseUri + this .messageEndpoint );
321+ long currentId = this .getNextId ();
322+ if (requestConsumer != null ) {
323+ requestConsumer .accept (request , currentId );
324+ }
325+ JsonRpc .Request <Long > rpcRequest = JsonRpc .createRequest (currentId , method .code (), requestParams );
326+ request .entity (Entity .createObject (request , rpcRequest ));
327+ log .info ("Send {} method to MCP server. [sessionId={}, request={}]" , method .code (), this .sessionId , rpcRequest );
328+ try (HttpClassicClientResponse <Object > exchange = request .exchange (Object .class )) {
329+ if (exchange .statusCode () >= 200 && exchange .statusCode () < 300 ) {
330+ log .info ("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]" ,
331+ method .code (),
332+ this .sessionId ,
333+ exchange .statusCode ());
334+ } else {
335+ log .error ("Failed to {} MCP server. [sessionId={}, statusCode={}]" ,
336+ method .code (),
337+ this .sessionId ,
338+ exchange .statusCode ());
339+ }
340+ } catch (IOException e ) {
341+ throw new IllegalStateException (StringUtils .format ("Failed to {0} MCP server. [sessionId={1}]" ,
342+ method .code (),
343+ this .sessionId ), e );
344+ }
345+ return currentId ;
346+ }
347+
403348 private long getNextId () {
404349 long tmpId = this .id .getAndIncrement ();
405350 if (tmpId < 0 ) {
@@ -422,10 +367,10 @@ private boolean waitInitialized() {
422367 return true ;
423368 }
424369 try {
425- this .initializedLock .wait ();
370+ this .initializedLock .wait (60_000L );
426371 } catch (InterruptedException e ) {
427372 Thread .currentThread ().interrupt ();
428- throw new IllegalStateException (e );
373+ throw new IllegalStateException ("Failed to initialize." , e );
429374 }
430375 }
431376 return this .initialized ;
0 commit comments