|
26 | 26 | import io.modelcontextprotocol.client.McpSyncClient; |
27 | 27 | import io.modelcontextprotocol.client.transport.ServerParameters; |
28 | 28 | import io.modelcontextprotocol.spec.McpSchema.ListToolsResult; |
29 | | -import io.reactivex.rxjava3.core.Single; |
30 | | -import java.util.List; |
| 29 | +import io.reactivex.rxjava3.core.Flowable; |
31 | 30 | import java.util.Objects; |
32 | 31 | import java.util.Optional; |
33 | 32 | import org.slf4j.Logger; |
@@ -151,69 +150,74 @@ public McpToolset(ServerParameters connectionParams) { |
151 | 150 | } |
152 | 151 |
|
153 | 152 | @Override |
154 | | - public Single<List<BaseTool>> getTools(ReadonlyContext readonlyContext) { |
155 | | - return Single.fromCallable( |
156 | | - () -> { |
157 | | - for (int i = 0; i < MAX_RETRIES; i++) { |
158 | | - try { |
159 | | - if (this.mcpSession == null) { |
160 | | - logger.info("MCP session is null or closed, initializing (attempt {}).", i + 1); |
161 | | - this.mcpSession = this.mcpSessionManager.createSession(); |
162 | | - } |
163 | | - |
164 | | - ListToolsResult toolsResponse = this.mcpSession.listTools(); |
165 | | - return toolsResponse.tools().stream() |
166 | | - .map( |
167 | | - tool -> |
168 | | - new McpTool( |
169 | | - tool, this.mcpSession, this.mcpSessionManager, this.objectMapper)) |
170 | | - .filter( |
171 | | - tool -> |
172 | | - isToolSelected(tool, toolFilter, Optional.ofNullable(readonlyContext))) |
173 | | - .collect(toImmutableList()); |
174 | | - } catch (IllegalArgumentException e) { |
175 | | - // This could happen if parameters for tool loading are somehow invalid. |
176 | | - // This is likely a fatal error and should not be retried. |
177 | | - logger.error("Invalid argument encountered during tool loading.", e); |
178 | | - throw new McpToolLoadingException( |
179 | | - "Invalid argument encountered during tool loading.", e); |
180 | | - } catch (RuntimeException e) { // Catch any other unexpected runtime exceptions |
181 | | - logger.error("Unexpected error during tool loading, retry attempt " + (i + 1), e); |
182 | | - if (i < MAX_RETRIES - 1) { |
183 | | - // For other general exceptions, we might still want to retry if they are |
184 | | - // potentially transient, or if we don't have more specific handling. But it's |
185 | | - // better to be specific. For now, we'll treat them as potentially retryable but log |
186 | | - // them at a higher level. |
| 153 | + public Flowable<BaseTool> getTools(ReadonlyContext readonlyContext) { |
| 154 | + return Flowable.fromCallable( |
| 155 | + () -> { |
| 156 | + for (int i = 0; i < MAX_RETRIES; i++) { |
187 | 157 | try { |
188 | | - logger.info("Reinitializing MCP session before next retry for unexpected error."); |
189 | | - this.mcpSession = this.mcpSessionManager.createSession(); |
190 | | - Thread.sleep(RETRY_DELAY_MILLIS); |
191 | | - } catch (InterruptedException ie) { |
192 | | - Thread.currentThread().interrupt(); |
193 | | - logger.error( |
194 | | - "Interrupted during retry delay for loadTools (unexpected error).", ie); |
| 158 | + if (this.mcpSession == null) { |
| 159 | + logger.info("MCP session is null or closed, initializing (attempt {}).", i + 1); |
| 160 | + this.mcpSession = this.mcpSessionManager.createSession(); |
| 161 | + } |
| 162 | + |
| 163 | + ListToolsResult toolsResponse = this.mcpSession.listTools(); |
| 164 | + return toolsResponse.tools().stream() |
| 165 | + .map( |
| 166 | + tool -> |
| 167 | + new McpTool( |
| 168 | + tool, this.mcpSession, this.mcpSessionManager, this.objectMapper)) |
| 169 | + .filter( |
| 170 | + tool -> |
| 171 | + isToolSelected( |
| 172 | + tool, toolFilter, Optional.ofNullable(readonlyContext))) |
| 173 | + .collect(toImmutableList()); |
| 174 | + } catch (IllegalArgumentException e) { |
| 175 | + // This could happen if parameters for tool loading are somehow invalid. |
| 176 | + // This is likely a fatal error and should not be retried. |
| 177 | + logger.error("Invalid argument encountered during tool loading.", e); |
195 | 178 | throw new McpToolLoadingException( |
196 | | - "Interrupted during retry delay (unexpected error)", ie); |
197 | | - } catch (RuntimeException reinitE) { |
198 | | - logger.error( |
199 | | - "Failed to reinitialize session during retry (unexpected error).", reinitE); |
200 | | - throw new McpInitializationException( |
201 | | - "Failed to reinitialize session during tool loading retry (unexpected" |
202 | | - + " error).", |
203 | | - reinitE); |
| 179 | + "Invalid argument encountered during tool loading.", e); |
| 180 | + } catch (RuntimeException e) { // Catch any other unexpected runtime exceptions |
| 181 | + logger.error("Unexpected error during tool loading, retry attempt " + (i + 1), e); |
| 182 | + if (i < MAX_RETRIES - 1) { |
| 183 | + // For other general exceptions, we might still want to retry if they are |
| 184 | + // potentially transient, or if we don't have more specific handling. But it's |
| 185 | + // better to be specific. For now, we'll treat them as potentially retryable but |
| 186 | + // log |
| 187 | + // them at a higher level. |
| 188 | + try { |
| 189 | + logger.info( |
| 190 | + "Reinitializing MCP session before next retry for unexpected error."); |
| 191 | + this.mcpSession = this.mcpSessionManager.createSession(); |
| 192 | + Thread.sleep(RETRY_DELAY_MILLIS); |
| 193 | + } catch (InterruptedException ie) { |
| 194 | + Thread.currentThread().interrupt(); |
| 195 | + logger.error( |
| 196 | + "Interrupted during retry delay for loadTools (unexpected error).", ie); |
| 197 | + throw new McpToolLoadingException( |
| 198 | + "Interrupted during retry delay (unexpected error)", ie); |
| 199 | + } catch (RuntimeException reinitE) { |
| 200 | + logger.error( |
| 201 | + "Failed to reinitialize session during retry (unexpected error).", |
| 202 | + reinitE); |
| 203 | + throw new McpInitializationException( |
| 204 | + "Failed to reinitialize session during tool loading retry (unexpected" |
| 205 | + + " error).", |
| 206 | + reinitE); |
| 207 | + } |
| 208 | + } else { |
| 209 | + logger.error( |
| 210 | + "Failed to load tools after multiple retries due to unexpected error.", e); |
| 211 | + throw new McpToolLoadingException( |
| 212 | + "Failed to load tools after multiple retries due to unexpected error.", e); |
| 213 | + } |
204 | 214 | } |
205 | | - } else { |
206 | | - logger.error( |
207 | | - "Failed to load tools after multiple retries due to unexpected error.", e); |
208 | | - throw new McpToolLoadingException( |
209 | | - "Failed to load tools after multiple retries due to unexpected error.", e); |
210 | 215 | } |
211 | | - } |
212 | | - } |
213 | | - // This line should ideally not be reached if retries are handled correctly or an |
214 | | - // exception is always thrown. |
215 | | - throw new IllegalStateException("Unexpected state in getTools retry loop"); |
216 | | - }); |
| 216 | + // This line should ideally not be reached if retries are handled correctly or an |
| 217 | + // exception is always thrown. |
| 218 | + throw new IllegalStateException("Unexpected state in getTools retry loop"); |
| 219 | + }) |
| 220 | + .flatMapIterable(tools -> tools); |
217 | 221 | } |
218 | 222 |
|
219 | 223 | @Override |
|
0 commit comments