Skip to content

Commit 0b606b8

Browse files
authored
[fit] fix thread leaks from deactivated MCP server channels (#214)
* [fel] ensure thread release during channel cleanup by triggering completion * [fit] improve the naming.
1 parent 2a0f828 commit 0b606b8

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,22 @@ public McpServerController(@Fit(alias = "json") ObjectSerializer serializer, Mcp
8888
if (MapUtils.isEmpty(this.responses)) {
8989
return;
9090
}
91-
List<String> toRemoved = new ArrayList<>();
91+
List<String> obsoleteSessionIds = new ArrayList<>();
9292
for (Map.Entry<String, HttpClassicServerResponse> entry : this.responses.entrySet()) {
9393
if (entry.getValue().isActive()) {
9494
continue;
9595
}
96-
toRemoved.add(entry.getKey());
96+
obsoleteSessionIds.add(entry.getKey());
9797
}
98-
if (CollectionUtils.isEmpty(toRemoved)) {
98+
if (CollectionUtils.isEmpty(obsoleteSessionIds)) {
9999
return;
100100
}
101-
toRemoved.forEach(this.responses::remove);
102-
toRemoved.forEach(this.emitters::remove);
103-
log.info("Channels are inactive, remove emitters and responses. [sessionIds={}]", toRemoved);
101+
obsoleteSessionIds.forEach(this.responses::remove);
102+
for (String obsoleteSessionId : obsoleteSessionIds) {
103+
Emitter<TextEvent> removed = this.emitters.remove(obsoleteSessionId);
104+
removed.complete();
105+
}
106+
log.info("Channels are inactive, remove emitters and responses. [sessionIds={}]", obsoleteSessionIds);
104107
}).build());
105108
}
106109

0 commit comments

Comments
 (0)