@@ -15,7 +15,7 @@ use std::io::Stdout;
1515use std:: net:: SocketAddr ;
1616use std:: sync:: Arc ;
1717use std:: time:: Duration ;
18- use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
18+ use tokio:: io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ;
1919use tokio:: net:: tcp:: { OwnedReadHalf , OwnedWriteHalf } ;
2020use tokio:: net:: { TcpListener , TcpStream } ;
2121use tokio:: sync:: { Mutex , mpsc} ;
@@ -24,10 +24,9 @@ use tokio::time::timeout;
2424
2525type DebuggerResult < T > = Result < T , Box < dyn Error + Send > > ;
2626
27- #[ allow( unused) ]
2827#[ derive( Debug ) ]
2928pub struct DebuggerConnection {
30- read_stream : Option < Arc < Mutex < OwnedReadHalf > > > ,
29+ read_stream : Option < OwnedReadHalf > ,
3130 write_stream : Option < Arc < Mutex < OwnedWriteHalf > > > ,
3231 reader_task : Option < JoinHandle < ( ) > > ,
3332 response_senders : Arc < Mutex < HashMap < MessageCMD , mpsc:: Sender < Message > > > > ,
@@ -69,7 +68,7 @@ impl DebuggerConnection {
6968 . map_err ( |e| DebuggerError :: from ( e) ) ?
7069 } ;
7170 let ( read_stream, write_stream) = stream. into_split ( ) ;
72- self . read_stream = Some ( Arc :: new ( Mutex :: new ( read_stream) ) ) ;
71+ self . read_stream = Some ( read_stream) ;
7372 self . write_stream = Some ( Arc :: new ( Mutex :: new ( write_stream) ) ) ;
7473 Ok ( ( ) )
7574 }
@@ -86,13 +85,13 @@ impl DebuggerConnection {
8685 . map_err ( |e| DebuggerError :: from ( e) ) ?;
8786
8887 let ( read_stream, write_stream) = stream. into_split ( ) ;
89- self . read_stream = Some ( Arc :: new ( Mutex :: new ( read_stream) ) ) ;
88+ self . read_stream = Some ( read_stream) ;
9089 self . write_stream = Some ( Arc :: new ( Mutex :: new ( write_stream) ) ) ;
9190 Ok ( ( ) )
9291 }
9392
9493 pub fn is_connected ( & self ) -> bool {
95- self . read_stream . is_some ( ) && self . write_stream . is_some ( )
94+ self . write_stream . is_some ( )
9695 }
9796
9897 pub async fn close ( & mut self ) {
@@ -108,22 +107,20 @@ impl DebuggerConnection {
108107 return ;
109108 }
110109
111- if let Some ( stream ) = & self . read_stream {
112- let stream_clone = stream . clone ( ) ;
110+ let read_stream = self . read_stream . take ( ) ;
111+ if let Some ( stream ) = read_stream {
113112 let senders = self . response_senders . clone ( ) ;
114113 let eval_response = self . eval_response . clone ( ) ;
115114
116115 let handle = tokio:: spawn ( async move {
117- let mut buffer = vec ! [ 0u8 ; 4096 ] ;
118- let mut pos = 0 ;
119-
116+ let mut msg_id_string = String :: new ( ) ;
117+ let mut msg_json_string = String :: new ( ) ;
118+ let mut reader = BufReader :: new ( stream ) ;
120119 loop {
121- let read_result = {
122- let mut stream_guard = stream_clone. lock ( ) . await ;
123- stream_guard. read ( & mut buffer[ pos..] ) . await
124- } ;
120+ msg_id_string. clear ( ) ;
121+ msg_json_string. clear ( ) ;
125122
126- match read_result {
123+ match reader . read_line ( & mut msg_id_string ) . await {
127124 Ok ( 0 ) => {
128125 log:: error!( "Connection closed by peer" ) ;
129126 let mut ide_conn = ide_conn. lock ( ) . unwrap ( ) ;
@@ -137,74 +134,64 @@ impl DebuggerConnection {
137134 break ;
138135 }
139136 Ok ( n) => {
140- pos += n; // 解析消息格式:第一行是整数ID,第二行是JSON内容
141- log:: debug!( "read {} bytes, total bytes {}" , n, pos) ;
142- let mut start = 0 ;
143- let mut id_line = None ;
144- let mut i = 0 ;
145- let mut msg_id = 0 ;
146-
147- while i < pos {
148- // 查找换行符
149- if buffer[ i] == b'\n' {
150- if id_line. is_none ( ) {
151- // 解析第一行作为消息ID
152- if let Ok ( id_str) = std:: str:: from_utf8 ( & buffer[ start..i] ) {
153- if let Ok ( parsed_msg_id) = id_str. parse :: < i32 > ( ) {
154- // 记录ID并继续寻找JSON内容
155- msg_id = parsed_msg_id;
156- id_line = Some ( start) ;
157- start = i + 1 ;
158- }
159- }
160- } else {
161- // 已有ID,这一行是JSON内容
162- if let Ok ( msg_str) = std:: str:: from_utf8 ( & buffer[ start..i] )
163- {
164- if let Ok ( message) = Message :: from_str (
165- msg_str,
166- MessageCMD :: from ( msg_id as i64 ) ,
167- ) {
168- Self :: dispatch_message (
169- message,
170- & senders,
171- & eval_response,
172- )
173- . await ;
174- } else {
175- log:: error!( "parse fail: {}" , msg_str) ;
176- }
177- }
178- // 重置解析状态,准备解析下一条消息
179- id_line = None ;
180- start = i + 1 ;
181- }
182- }
183- i += 1 ;
137+ if n == 0 {
138+ break ;
184139 }
140+ }
141+ Err ( e) => {
142+ log:: error!( "Error reading from stream: {}" , e) ;
143+ break ;
144+ }
145+ }
185146
186- log:: debug!( "parsed {} bytes " , start ) ;
147+ log:: debug!( "read message id {} " , msg_id_string ) ;
187148
188- // 处理完整消息后移动剩余数据到缓冲区开头
189- if start > 0 {
190- buffer. copy_within ( start..pos, 0 ) ;
191- pos -= start;
192- }
149+ match reader. read_line ( & mut msg_json_string) . await {
150+ Ok ( 0 ) => {
151+ log:: error!( "Connection closed by peer" ) ;
152+ let mut ide_conn = ide_conn. lock ( ) . unwrap ( ) ;
153+ ide_conn. send_event ( Event :: Output ( OutputEventBody {
154+ category : Some ( dap:: types:: OutputEventCategory :: Console ) ,
155+ output : "Disconnected\n " . to_string ( ) ,
156+ ..Default :: default ( )
157+ } ) ) ;
193158
194- if pos > buffer. len ( ) - 1024 {
195- log:: debug!(
196- "current buffer used size {} bytes, extend buffer total size to {} bytes" ,
197- pos,
198- buffer. len( ) * 2
199- ) ;
200- buffer. resize ( buffer. len ( ) * 2 , 0 ) ;
159+ ide_conn. send_event ( Event :: Terminated ( None ) ) ;
160+ break ;
161+ }
162+ Ok ( n) => {
163+ if n == 0 {
164+ break ;
201165 }
202166 }
203167 Err ( e) => {
204168 log:: error!( "Error reading from stream: {}" , e) ;
205169 break ;
206170 }
207171 }
172+
173+ log:: debug!( "read message json {}" , msg_json_string) ;
174+
175+ let msg_id = match msg_id_string. trim ( ) . parse :: < i32 > ( ) {
176+ Ok ( id) => id,
177+ Err ( e) => {
178+ log:: error!( "Error parsing message ID: {}" , e) ;
179+ continue ;
180+ }
181+ } ;
182+
183+ let message = match Message :: from_str (
184+ & msg_json_string,
185+ MessageCMD :: from ( msg_id as i64 ) ,
186+ ) {
187+ Ok ( msg) => msg,
188+ Err ( e) => {
189+ log:: error!( "Error parsing message JSON: {}" , e) ;
190+ continue ;
191+ }
192+ } ;
193+
194+ Self :: dispatch_message ( message, & senders, & eval_response) . await ;
208195 }
209196 } ) ;
210197
0 commit comments