Skip to content

Commit 0503cae

Browse files
committed
fix(lib): update task spawning to use spawn_blocking and block_on for Windows compatibility
1 parent 339b719 commit 0503cae

File tree

1 file changed

+70
-64
lines changed

1 file changed

+70
-64
lines changed

src/lib.rs

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -781,38 +781,41 @@ pub extern "C" fn mcp_list_tools_init() -> usize {
781781
// Get the global client
782782
let client_mutex = GLOBAL_CLIENT.get_or_init(|| Mutex::new(None));
783783

784-
// Spawn the async task
784+
// Spawn the async task using spawn_blocking + block_on for Windows compatibility
785785
{
786786
let client_opt = client_mutex.lock().unwrap();
787787
if let Some(client) = client_opt.as_ref() {
788788
// Clone the Arc to share the service across async boundaries
789789
let service_arc = client.service.clone();
790-
791-
// Enter the runtime context explicitly before spawning (Windows compatible)
792-
let _guard = client.runtime.enter();
793-
client.runtime.spawn(async move {
794-
let service_guard = service_arc.lock().await;
795-
if let Some(service) = service_guard.as_ref() {
796-
match service.list_tools(None).await {
797-
Ok(response) => {
798-
// Send each tool as a separate chunk
799-
for tool in response.tools {
800-
if let Ok(tool_json) = serde_json::to_value(&tool) {
801-
let _ = tx.send(StreamChunk::Tool(tool_json));
790+
let runtime_handle = client.runtime.handle().clone();
791+
792+
// Use spawn_blocking to run a blocking task that calls block_on
793+
// This ensures the reactor is running when async code executes (Windows fix)
794+
std::thread::spawn(move || {
795+
runtime_handle.block_on(async move {
796+
let service_guard = service_arc.lock().await;
797+
if let Some(service) = service_guard.as_ref() {
798+
match service.list_tools(None).await {
799+
Ok(response) => {
800+
// Send each tool as a separate chunk
801+
for tool in response.tools {
802+
if let Ok(tool_json) = serde_json::to_value(&tool) {
803+
let _ = tx.send(StreamChunk::Tool(tool_json));
804+
}
802805
}
806+
let _ = tx.send(StreamChunk::Done);
807+
}
808+
Err(e) => {
809+
let _ = tx.send(StreamChunk::Error(format!("Failed to list tools: {}", e)));
810+
let _ = tx.send(StreamChunk::Done);
803811
}
804-
let _ = tx.send(StreamChunk::Done);
805-
}
806-
Err(e) => {
807-
let _ = tx.send(StreamChunk::Error(format!("Failed to list tools: {}", e)));
808-
let _ = tx.send(StreamChunk::Done);
809812
}
813+
} else {
814+
// No service connected
815+
let _ = tx.send(StreamChunk::Error("Not connected. Call mcp_connect() first".to_string()));
816+
let _ = tx.send(StreamChunk::Done);
810817
}
811-
} else {
812-
// No service connected
813-
let _ = tx.send(StreamChunk::Error("Not connected. Call mcp_connect() first".to_string()));
814-
let _ = tx.send(StreamChunk::Done);
815-
}
818+
});
816819
});
817820
} else {
818821
// No client initialized
@@ -867,61 +870,64 @@ pub extern "C" fn mcp_call_tool_init(tool_name: *const c_char, arguments: *const
867870
// Get the global client
868871
let client_mutex = GLOBAL_CLIENT.get_or_init(|| Mutex::new(None));
869872

870-
// Spawn the async task
873+
// Spawn the async task using spawn_blocking + block_on for Windows compatibility
871874
{
872875
let client_opt = client_mutex.lock().unwrap();
873876
if let Some(client) = client_opt.as_ref() {
874877
let service_arc = client.service.clone();
878+
let runtime_handle = client.runtime.handle().clone();
879+
880+
// Use spawn_blocking to run a blocking task that calls block_on
881+
// This ensures the reactor is running when async code executes (Windows fix)
882+
std::thread::spawn(move || {
883+
runtime_handle.block_on(async move {
884+
let service_guard = service_arc.lock().await;
885+
if let Some(service) = service_guard.as_ref() {
886+
// Parse arguments
887+
let arguments_json: serde_json::Value = match serde_json::from_str(&arguments_str) {
888+
Ok(v) => v,
889+
Err(e) => {
890+
let _ = tx.send(StreamChunk::Error(format!("Invalid JSON arguments: {}", e)));
891+
let _ = tx.send(StreamChunk::Done);
892+
return;
893+
}
894+
};
875895

876-
// Enter the runtime context explicitly before spawning (Windows compatible)
877-
let _guard = client.runtime.enter();
878-
client.runtime.spawn(async move {
879-
let service_guard = service_arc.lock().await;
880-
if let Some(service) = service_guard.as_ref() {
881-
// Parse arguments
882-
let arguments_json: serde_json::Value = match serde_json::from_str(&arguments_str) {
883-
Ok(v) => v,
884-
Err(e) => {
885-
let _ = tx.send(StreamChunk::Error(format!("Invalid JSON arguments: {}", e)));
886-
let _ = tx.send(StreamChunk::Done);
887-
return;
888-
}
889-
};
890-
891-
// Create the call tool parameter
892-
let call_param = rmcp::model::CallToolRequestParam {
893-
name: std::borrow::Cow::Owned(tool_name_str),
894-
arguments: arguments_json.as_object().cloned(),
895-
};
896+
// Create the call tool parameter
897+
let call_param = rmcp::model::CallToolRequestParam {
898+
name: std::borrow::Cow::Owned(tool_name_str),
899+
arguments: arguments_json.as_object().cloned(),
900+
};
896901

897-
// Call the tool
898-
match service.call_tool(call_param).await {
899-
Ok(result) => {
900-
// Serialize the result to JSON and extract text content
901-
if let Ok(result_json) = serde_json::to_value(&result) {
902-
if let Some(content_array) = result_json.get("content").and_then(|v| v.as_array()) {
903-
for item in content_array {
904-
if let Some(item_type) = item.get("type").and_then(|v| v.as_str()) {
905-
if item_type == "text" {
906-
if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
907-
let _ = tx.send(StreamChunk::Text(text.to_string()));
902+
// Call the tool
903+
match service.call_tool(call_param).await {
904+
Ok(result) => {
905+
// Serialize the result to JSON and extract text content
906+
if let Ok(result_json) = serde_json::to_value(&result) {
907+
if let Some(content_array) = result_json.get("content").and_then(|v| v.as_array()) {
908+
for item in content_array {
909+
if let Some(item_type) = item.get("type").and_then(|v| v.as_str()) {
910+
if item_type == "text" {
911+
if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
912+
let _ = tx.send(StreamChunk::Text(text.to_string()));
913+
}
908914
}
909915
}
910916
}
911917
}
912918
}
919+
let _ = tx.send(StreamChunk::Done);
920+
}
921+
Err(e) => {
922+
let _ = tx.send(StreamChunk::Error(format!("Failed to call tool: {}", e)));
923+
let _ = tx.send(StreamChunk::Done);
913924
}
914-
let _ = tx.send(StreamChunk::Done);
915-
}
916-
Err(e) => {
917-
let _ = tx.send(StreamChunk::Error(format!("Failed to call tool: {}", e)));
918-
let _ = tx.send(StreamChunk::Done);
919925
}
926+
} else {
927+
let _ = tx.send(StreamChunk::Error("Not connected. Call mcp_connect() first".to_string()));
928+
let _ = tx.send(StreamChunk::Done);
920929
}
921-
} else {
922-
let _ = tx.send(StreamChunk::Error("Not connected. Call mcp_connect() first".to_string()));
923-
let _ = tx.send(StreamChunk::Done);
924-
}
930+
});
925931
});
926932
} else {
927933
let _ = tx.send(StreamChunk::Error("Client not initialized".to_string()));

0 commit comments

Comments
 (0)