Skip to content

Commit 5065f06

Browse files
committed
fix many bugs
1 parent 1105e17 commit 5065f06

File tree

5 files changed

+208
-34
lines changed

5 files changed

+208
-34
lines changed

src/context/debugger/mod.rs

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::net::SocketAddr;
1616
use std::sync::Arc;
1717
use std::time::Duration;
1818
use tokio::io::{AsyncReadExt, AsyncWriteExt};
19+
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
1920
use tokio::net::{TcpListener, TcpStream};
2021
use tokio::sync::{Mutex, mpsc};
2122
use tokio::task::JoinHandle;
@@ -26,7 +27,8 @@ type DebuggerResult<T> = Result<T, Box<dyn Error + Send>>;
2627
#[allow(unused)]
2728
#[derive(Debug)]
2829
pub struct DebuggerConnection {
29-
stream: Option<Arc<Mutex<TcpStream>>>,
30+
read_stream: Option<Arc<Mutex<OwnedReadHalf>>>,
31+
write_stream: Option<Arc<Mutex<OwnedWriteHalf>>>,
3032
reader_task: Option<JoinHandle<()>>,
3133
response_senders: Arc<Mutex<HashMap<MessageCMD, mpsc::Sender<Message>>>>,
3234
eval_seq_id: i64,
@@ -37,7 +39,8 @@ pub struct DebuggerConnection {
3739
impl DebuggerConnection {
3840
pub fn new() -> Self {
3941
DebuggerConnection {
40-
stream: None,
42+
read_stream: None,
43+
write_stream: None,
4144
reader_task: None,
4245
response_senders: Arc::new(Mutex::new(HashMap::new())),
4346
eval_seq_id: 0,
@@ -65,8 +68,9 @@ impl DebuggerConnection {
6568
.await
6669
.map_err(|e| DebuggerError::from(e))?
6770
};
68-
69-
self.stream = Some(Arc::new(Mutex::new(stream)));
71+
let (read_stream, write_stream) = stream.into_split();
72+
self.read_stream = Some(Arc::new(Mutex::new(read_stream)));
73+
self.write_stream = Some(Arc::new(Mutex::new(write_stream)));
7074
Ok(())
7175
}
7276

@@ -81,27 +85,30 @@ impl DebuggerConnection {
8185
.await
8286
.map_err(|e| DebuggerError::from(e))?;
8387

84-
self.stream = Some(Arc::new(Mutex::new(stream)));
88+
let (read_stream, write_stream) = stream.into_split();
89+
self.read_stream = Some(Arc::new(Mutex::new(read_stream)));
90+
self.write_stream = Some(Arc::new(Mutex::new(write_stream)));
8591
Ok(())
8692
}
8793

8894
pub fn is_connected(&self) -> bool {
89-
self.stream.is_some()
95+
self.read_stream.is_some() && self.write_stream.is_some()
9096
}
9197

9298
pub async fn close(&mut self) {
9399
if let Some(handle) = self.reader_task.take() {
94100
handle.abort();
95101
}
96-
self.stream = None;
102+
self.read_stream = None;
103+
self.write_stream = None;
97104
}
98105

99106
pub fn start_reader_task(&mut self, ide_conn: Arc<std::sync::Mutex<ServerOutput<Stdout>>>) {
100107
if self.reader_task.is_some() {
101108
return;
102109
}
103110

104-
if let Some(stream) = &self.stream {
111+
if let Some(stream) = &self.read_stream {
105112
let stream_clone = stream.clone();
106113
let senders = self.response_senders.clone();
107114
let eval_response = self.eval_response.clone();
@@ -130,6 +137,7 @@ impl DebuggerConnection {
130137
break;
131138
}
132139
Ok(n) => {
140+
log::debug!("read {} bytes, total bytes {}", n, pos);
133141
pos += n; // 解析消息格式:第一行是整数ID,第二行是JSON内容
134142
let mut start = 0;
135143
let mut id_line = None;
@@ -172,13 +180,20 @@ impl DebuggerConnection {
172180
i += 1;
173181
}
174182

183+
log::debug!("parsed {} bytes", start);
184+
175185
// 处理完整消息后移动剩余数据到缓冲区开头
176186
if start > 0 {
177187
buffer.copy_within(start..pos, 0);
178188
pos -= start;
179189
}
180190

181191
if pos > buffer.len() - 1024 {
192+
log::debug!(
193+
"current buffer used size {} bytes, extend buffer total size to {} bytes",
194+
pos,
195+
buffer.len() * 2
196+
);
182197
buffer.resize(buffer.len() * 2, 0);
183198
}
184199
}
@@ -247,15 +262,21 @@ impl DebuggerConnection {
247262
}
248263

249264
pub async fn send_message(&self, message: Message) -> DebuggerResult<()> {
250-
if let Some(stream) = &self.stream {
265+
if let Some(stream) = &self.write_stream {
251266
let mut stream_guard = stream.lock().await;
252-
253-
let json = serde_json::to_string(&message)
254-
.map_err(|e| DebuggerError::SerializationError(format!("serde fail: {}", e)))?;
267+
let json = match serde_json::to_string(&message) {
268+
Ok(json) => json,
269+
Err(e) => {
270+
log::error!("serde fail: {}", e);
271+
return Err(
272+
DebuggerError::SerializationError(format!("serde fail: {}", e)).into(),
273+
);
274+
}
275+
};
255276

256277
let msg_id = message.get_cmd() as i32;
257278
let message_text = format!("{}\n{}\n", msg_id, json);
258-
279+
log::debug!("send message: {}", message_text);
259280
match stream_guard
260281
.write_all(message_text.as_bytes())
261282
.await
@@ -267,7 +288,7 @@ impl DebuggerConnection {
267288
return Err(e);
268289
}
269290
}
270-
291+
log::debug!("send message ok");
271292
match stream_guard
272293
.flush()
273294
.await
@@ -279,23 +300,29 @@ impl DebuggerConnection {
279300
return Err(e);
280301
}
281302
}
282-
303+
log::debug!("flush stream ok");
283304
Ok(())
284305
} else {
285306
Err(DebuggerError::ConnectionError("not connected".to_string()).into())
286307
}
287308
}
288309

289310
pub async fn send_request(&self, request: Message) -> DebuggerResult<Message> {
290-
if let Some(stream) = &self.stream {
311+
if let Some(stream) = &self.write_stream {
291312
let mut stream_guard = stream.lock().await;
292-
293-
let json = serde_json::to_string(&request)
294-
.map_err(|e| DebuggerError::SerializationError(format!("serde fail: {}", e)))?;
313+
let json = match serde_json::to_string(&request) {
314+
Ok(json) => json,
315+
Err(e) => {
316+
log::error!("serde fail: {}", e);
317+
return Err(
318+
DebuggerError::SerializationError(format!("serde fail: {}", e)).into(),
319+
);
320+
}
321+
};
295322

296323
let msg_id = request.get_cmd() as i32;
297324
let message_text = format!("{}\n{}\n", msg_id, json);
298-
325+
log::debug!("send message: {}", message_text);
299326
match stream_guard
300327
.write_all(message_text.as_bytes())
301328
.await
@@ -307,6 +334,7 @@ impl DebuggerConnection {
307334
return Err(e);
308335
}
309336
}
337+
log::debug!("send message ok");
310338

311339
match stream_guard
312340
.flush()
@@ -320,6 +348,9 @@ impl DebuggerConnection {
320348
}
321349
}
322350

351+
log::debug!("flush stream ok");
352+
353+
drop(stream_guard);
323354
// 等待响应
324355
let receiver = self
325356
.register_callback(request.get_cmd().get_rsp_cmd())
@@ -341,10 +372,11 @@ impl DebuggerConnection {
341372
depth: i64,
342373
frame_id: i64,
343374
) -> DebuggerResult<EvalRsp> {
344-
if let Some(stream) = &self.stream {
375+
if let Some(stream) = &self.write_stream {
345376
let seq = self.eval_seq_id;
346377
self.eval_seq_id += 1;
347378
let eval_req = EvalReq {
379+
cmd: MessageCMD::EvalReq as i64,
348380
seq: seq as i32,
349381
expr: expression,
350382
stack_level: frame_id as i32,
@@ -355,7 +387,6 @@ impl DebuggerConnection {
355387
};
356388

357389
let mut stream_guard = stream.lock().await;
358-
359390
let json = serde_json::to_string(&eval_req)
360391
.map_err(|e| DebuggerError::SerializationError(format!("serde fail: {}", e)))?;
361392

@@ -388,6 +419,7 @@ impl DebuggerConnection {
388419
}
389420
}
390421

422+
drop(stream_guard);
391423
if let Some(mut rx) = receiver {
392424
if let Some(response) = rx.recv().await {
393425
return Ok(response);

0 commit comments

Comments
 (0)