66
77package modelengine .fel .tool .mcp .client .support ;
88
9+ import static modelengine .fitframework .inspection .Validation .notBlank ;
10+ import static modelengine .fitframework .inspection .Validation .notNull ;
911import static modelengine .fitframework .util .ObjectUtils .cast ;
1012
1113import modelengine .fel .tool .mcp .client .McpClient ;
5759 */
5860public class DefaultMcpClient implements McpClient {
5961 private static final Logger log = Logger .get (DefaultMcpClient .class );
60- private static final long DELAY_MILLIS = 30_000L ;
6162
6263 private final ObjectSerializer jsonSerializer ;
6364 private final HttpClassicClient client ;
6465 private final String baseUri ;
6566 private final String sseEndpoint ;
6667 private final String name ;
6768 private final AtomicLong id = new AtomicLong (0 );
69+ private final long pingInterval ;
6870
6971 private volatile String messageEndpoint ;
7072 private volatile String sessionId ;
@@ -88,14 +90,16 @@ public class DefaultMcpClient implements McpClient {
8890 * @param client The HTTP client used for communication with the MCP server.
8991 * @param baseUri The base URI of the MCP server.
9092 * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
93+ * @param pingInterval The interval for sending ping messages to the MCP server. Unit: milliseconds.
9194 */
9295 public DefaultMcpClient (ObjectSerializer jsonSerializer , HttpClassicClient client , String baseUri ,
93- String sseEndpoint ) {
94- this .jsonSerializer = jsonSerializer ;
95- this .client = client ;
96- this .baseUri = baseUri ;
97- this .sseEndpoint = sseEndpoint ;
96+ String sseEndpoint , long pingInterval ) {
97+ this .jsonSerializer = notNull ( jsonSerializer , "The json serializer cannot be null." ) ;
98+ this .client = notNull ( client , "The http client cannot be null." ) ;
99+ this .baseUri = notBlank ( baseUri , "The MCP server base URI cannot be blank." ) ;
100+ this .sseEndpoint = notBlank ( sseEndpoint , "The MCP server SSE endpoint cannot be blank." ) ;
98101 this .name = UuidUtils .randomUuidString ();
102+ this .pingInterval = pingInterval > 0 ? pingInterval : 15_000 ;
99103 }
100104
101105 @ Override
@@ -126,20 +130,6 @@ public void initialize() {
126130 (subscription , textEvent ) -> this .consumeTextEvent (textEvent ),
127131 subscription -> log .info ("SSE channel is completed." ),
128132 (subscription , cause ) -> log .error ("SSE channel is failed." , cause ));
129- this .pingScheduler = ThreadPoolScheduler .custom ()
130- .threadPoolName ("mcp-client-ping-" + this .name )
131- .awaitTermination (3 , TimeUnit .SECONDS )
132- .isImmediateShutdown (true )
133- .corePoolSize (1 )
134- .maximumPoolSize (1 )
135- .keepAliveTime (60 , TimeUnit .SECONDS )
136- .workQueueCapacity (Integer .MAX_VALUE )
137- .isDaemonThread (true )
138- .build ();
139- this .pingScheduler .schedule (Task .builder ()
140- .runnable (this ::pingServer )
141- .policy (ExecutePolicy .fixedDelay (DELAY_MILLIS ))
142- .build (), DELAY_MILLIS );
143133 if (!this .waitInitialized ()) {
144134 throw new IllegalStateException ("Failed to initialize." );
145135 }
@@ -236,6 +226,20 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
236226 } catch (IOException e ) {
237227 throw new IllegalStateException (e );
238228 }
229+ this .pingScheduler = ThreadPoolScheduler .custom ()
230+ .threadPoolName ("mcp-client-ping-" + this .name )
231+ .awaitTermination (3 , TimeUnit .SECONDS )
232+ .isImmediateShutdown (true )
233+ .corePoolSize (1 )
234+ .maximumPoolSize (1 )
235+ .keepAliveTime (60 , TimeUnit .SECONDS )
236+ .workQueueCapacity (Integer .MAX_VALUE )
237+ .isDaemonThread (true )
238+ .build ();
239+ this .pingScheduler .schedule (Task .builder ()
240+ .runnable (this ::pingServer )
241+ .policy (ExecutePolicy .fixedDelay (this .pingInterval ))
242+ .build (), this .pingInterval );
239243 }
240244
241245 private void recordServerSchema (JsonRpc .Response <Long > response ) {
0 commit comments