Skip to content

Commit 8a066c3

Browse files
committed
fix(windows/lib.rs): use synchronous Mutex for STREAM_CHANNELS to avoid Tokio reactor context issues
1 parent d65dcdc commit 8a066c3

File tree

1 file changed

+33
-44
lines changed

1 file changed

+33
-44
lines changed

src/lib.rs

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -579,13 +579,10 @@ pub extern "C" fn mcp_disconnect() -> *mut c_char {
579579

580580
// Also clear any active stream channels
581581
{
582-
let runtime = tokio::runtime::Runtime::new().unwrap();
583-
runtime.block_on(async {
584-
let mut channels = STREAM_CHANNELS.lock().await;
585-
channels.clear();
586-
});
582+
let mut channels = STREAM_CHANNELS.lock().unwrap();
583+
channels.clear();
587584
}
588-
585+
589586
// Reset stream counter
590587
*STREAM_COUNTER.lock().unwrap() = 0;
591588

@@ -740,8 +737,8 @@ use std::collections::HashMap;
740737
use tokio::sync::Mutex as TokioMutex;
741738

742739
lazy_static::lazy_static! {
743-
static ref STREAM_CHANNELS: Arc<TokioMutex<HashMap<usize, tokio::sync::mpsc::UnboundedReceiver<StreamChunk>>>> =
744-
Arc::new(TokioMutex::new(HashMap::new()));
740+
static ref STREAM_CHANNELS: Arc<Mutex<HashMap<usize, tokio::sync::mpsc::UnboundedReceiver<StreamChunk>>>> =
741+
Arc::new(Mutex::new(HashMap::new()));
745742
static ref STREAM_COUNTER: Mutex<usize> = Mutex::new(0);
746743
}
747744

@@ -825,12 +822,9 @@ pub extern "C" fn mcp_list_tools_init() -> usize {
825822
} // Release the lock here
826823

827824
// Store the receiver in global storage (now safe to acquire lock again)
828-
let client_opt = client_mutex.lock().unwrap();
829-
if let Some(client) = client_opt.as_ref() {
830-
client.runtime.block_on(async {
831-
let mut channels = STREAM_CHANNELS.lock().await;
832-
channels.insert(stream_id, rx);
833-
});
825+
{
826+
let mut channels = STREAM_CHANNELS.lock().unwrap();
827+
channels.insert(stream_id, rx);
834828
}
835829

836830
stream_id
@@ -936,12 +930,9 @@ pub extern "C" fn mcp_call_tool_init(tool_name: *const c_char, arguments: *const
936930
}
937931

938932
// Store the receiver
939-
let client_opt = client_mutex.lock().unwrap();
940-
if let Some(client) = client_opt.as_ref() {
941-
client.runtime.block_on(async {
942-
let mut channels = STREAM_CHANNELS.lock().await;
943-
channels.insert(stream_id, rx);
944-
});
933+
{
934+
let mut channels = STREAM_CHANNELS.lock().unwrap();
935+
channels.insert(stream_id, rx);
945936
}
946937

947938
stream_id
@@ -954,20 +945,18 @@ pub extern "C" fn mcp_stream_next(stream_id: usize) -> *mut StreamResult {
954945
let client_mutex = GLOBAL_CLIENT.get_or_init(|| Mutex::new(None));
955946
let client_opt = client_mutex.lock().unwrap();
956947

957-
if let Some(client) = client_opt.as_ref() {
958-
client.runtime.block_on(async {
959-
let mut channels = STREAM_CHANNELS.lock().await;
960-
if let Some(rx) = channels.get_mut(&stream_id) {
961-
match rx.try_recv() {
962-
Ok(chunk) => {
963-
Box::into_raw(Box::new(chunk_to_stream_result(chunk)))
964-
}
965-
Err(_) => ptr::null_mut(),
948+
if let Some(_client) = client_opt.as_ref() {
949+
let mut channels = STREAM_CHANNELS.lock().unwrap();
950+
if let Some(rx) = channels.get_mut(&stream_id) {
951+
match rx.try_recv() {
952+
Ok(chunk) => {
953+
Box::into_raw(Box::new(chunk_to_stream_result(chunk)))
966954
}
967-
} else {
968-
ptr::null_mut()
955+
Err(_) => ptr::null_mut(),
969956
}
970-
})
957+
} else {
958+
ptr::null_mut()
959+
}
971960
} else {
972961
ptr::null_mut()
973962
}
@@ -982,20 +971,22 @@ pub extern "C" fn mcp_stream_wait(stream_id: usize, timeout_ms: u64) -> *mut Str
982971
let client_opt = client_mutex.lock().unwrap();
983972

984973
if let Some(client) = client_opt.as_ref() {
985-
client.runtime.block_on(async {
986-
let mut channels = STREAM_CHANNELS.lock().await;
987-
if let Some(rx) = channels.get_mut(&stream_id) {
974+
// Get mutable reference to receiver outside of async block
975+
let mut channels = STREAM_CHANNELS.lock().unwrap();
976+
if let Some(rx) = channels.get_mut(&stream_id) {
977+
// We need to use block_on for the async recv operation
978+
client.runtime.block_on(async {
988979
let timeout = tokio::time::Duration::from_millis(timeout_ms);
989980
match tokio::time::timeout(timeout, rx.recv()).await {
990981
Ok(Some(chunk)) => {
991982
Box::into_raw(Box::new(chunk_to_stream_result(chunk)))
992983
}
993984
_ => ptr::null_mut(),
994985
}
995-
} else {
996-
ptr::null_mut()
997-
}
998-
})
986+
})
987+
} else {
988+
ptr::null_mut()
989+
}
999990
} else {
1000991
ptr::null_mut()
1001992
}
@@ -1007,11 +998,9 @@ pub extern "C" fn mcp_stream_cleanup(stream_id: usize) {
1007998
let client_mutex = GLOBAL_CLIENT.get_or_init(|| Mutex::new(None));
1008999
let client_opt = client_mutex.lock().unwrap();
10091000

1010-
if let Some(client) = client_opt.as_ref() {
1011-
client.runtime.block_on(async {
1012-
let mut channels = STREAM_CHANNELS.lock().await;
1013-
channels.remove(&stream_id);
1014-
});
1001+
if let Some(_client) = client_opt.as_ref() {
1002+
let mut channels = STREAM_CHANNELS.lock().unwrap();
1003+
channels.remove(&stream_id);
10151004
}
10161005
}
10171006

0 commit comments

Comments
 (0)