@@ -20,19 +20,20 @@ use std::os::unix::io::RawFd;
20
20
use std:: collections:: HashMap ;
21
21
use std:: sync:: mpsc;
22
22
use std:: sync:: { Arc , Mutex } ;
23
- use std:: { thread} ;
23
+ use std:: thread;
24
24
use std:: time:: Duration ;
25
25
26
26
use crate :: error:: { Error , Result } ;
27
- use crate :: sync:: sys:: { ClientConnection } ;
28
27
use crate :: proto:: { Code , Codec , MessageHeader , Request , Response , MESSAGE_TYPE_RESPONSE } ;
29
28
use crate :: sync:: channel:: { read_message, write_message} ;
29
+ use crate :: sync:: sys:: ClientConnection ;
30
30
31
31
#[ cfg( windows) ]
32
32
use super :: sys:: PipeConnection ;
33
33
34
34
type Sender = mpsc:: Sender < ( Vec < u8 > , mpsc:: SyncSender < Result < Vec < u8 > > > ) > ;
35
35
type Receiver = mpsc:: Receiver < ( Vec < u8 > , mpsc:: SyncSender < Result < Vec < u8 > > > ) > ;
36
+ type ReciverMap = Arc < Mutex < HashMap < u32 , mpsc:: SyncSender < Result < Vec < u8 > > > > > > ;
36
37
37
38
/// A ttrpc Client (sync).
38
39
#[ derive( Clone ) ]
@@ -44,7 +45,7 @@ pub struct Client {
44
45
impl Client {
45
46
pub fn connect ( sockaddr : & str ) -> Result < Client > {
46
47
let conn = ClientConnection :: client_connect ( sockaddr) ?;
47
-
48
+
48
49
Self :: new_client ( conn)
49
50
}
50
51
@@ -58,11 +59,10 @@ impl Client {
58
59
59
60
fn new_client ( pipe_client : ClientConnection ) -> Result < Client > {
60
61
let client = Arc :: new ( pipe_client) ;
61
-
62
+
62
63
let ( sender_tx, rx) : ( Sender , Receiver ) = mpsc:: channel ( ) ;
63
64
let recver_map_orig = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
64
65
65
-
66
66
let receiver_map = recver_map_orig. clone ( ) ;
67
67
let connection = Arc :: new ( client. get_pipe_connection ( ) ?) ;
68
68
let sender_client = connection. clone ( ) ;
@@ -138,35 +138,13 @@ impl Client {
138
138
}
139
139
} ,
140
140
} ;
141
- let mut map = recver_map_orig. lock ( ) . unwrap ( ) ;
142
- let recver_tx = match map. get ( & mh. stream_id ) {
143
- Some ( tx) => tx,
144
- None => {
145
- debug ! ( "Receiver got unknown packet {:?} {:?}" , mh, buf) ;
146
- continue ;
147
- }
148
- } ;
149
- if mh. type_ != MESSAGE_TYPE_RESPONSE {
150
- recver_tx
151
- . send ( Err ( Error :: Others ( format ! (
152
- "Receiver got malformed packet {mh:?} {buf:?}"
153
- ) ) ) )
154
- . unwrap_or_else ( |_e| error ! ( "The request has returned" ) ) ;
155
- continue ;
156
- }
157
141
158
- recver_tx
159
- . send ( Ok ( buf) )
160
- . unwrap_or_else ( |_e| error ! ( "The request has returned" ) ) ;
161
-
162
- map. remove ( & mh. stream_id ) ;
142
+ trans_resp ( recver_map_orig. clone ( ) , mh, Ok ( buf) ) ;
163
143
}
164
144
165
- let _ = receiver_client. close_receiver ( ) . map_err ( |e| {
166
- warn ! (
167
- "failed to close with error: {:?}" , e
168
- )
169
- } ) ;
145
+ let _ = receiver_client
146
+ . close_receiver ( )
147
+ . map_err ( |e| warn ! ( "failed to close with error: {:?}" , e) ) ;
170
148
171
149
trace ! ( "Receiver quit" ) ;
172
150
} ) ;
@@ -186,8 +164,10 @@ impl Client {
186
164
. map_err ( err_to_others_err ! ( e, "Send packet to sender error " ) ) ?;
187
165
188
166
let result = if req. timeout_nano == 0 {
189
- rx. recv ( )
190
- . map_err ( err_to_others_err ! ( e, "Receive packet from Receiver error: " ) ) ?
167
+ rx. recv ( ) . map_err ( err_to_others_err ! (
168
+ e,
169
+ "Receive packet from Receiver error: "
170
+ ) ) ?
191
171
} else {
192
172
rx. recv_timeout ( Duration :: from_nanos ( req. timeout_nano as u64 ) )
193
173
. map_err ( err_to_others_err ! (
@@ -197,8 +177,7 @@ impl Client {
197
177
} ;
198
178
199
179
let buf = result?;
200
- let res =
201
- Response :: decode ( buf) . map_err ( err_to_others_err ! ( e, "Unpack response error " ) ) ?;
180
+ let res = Response :: decode ( buf) . map_err ( err_to_others_err ! ( e, "Unpack response error " ) ) ?;
202
181
203
182
let status = res. status ( ) ;
204
183
if status. code ( ) != Code :: OK {
@@ -220,11 +199,35 @@ impl Drop for ClientConnection {
220
199
#[ cfg( windows) ]
221
200
impl Drop for PipeConnection {
222
201
fn drop ( & mut self ) {
223
- self . close ( ) . unwrap_or_else ( |e| {
224
- trace ! (
225
- "connection may already be closed: {}" , e
226
- )
227
- } ) ;
202
+ self . close ( )
203
+ . unwrap_or_else ( |e| trace ! ( "connection may already be closed: {}" , e) ) ;
228
204
trace ! ( "pipe connection is dropped" ) ;
229
205
}
230
206
}
207
+
208
+ /// Transfer the response
209
+ fn trans_resp ( recver_map_orig : ReciverMap , mh : MessageHeader , buf : Result < Vec < u8 > > ) {
210
+ let mut map = recver_map_orig. lock ( ) . unwrap ( ) ;
211
+ let recver_tx = match map. get ( & mh. stream_id ) {
212
+ Some ( tx) => tx,
213
+ None => {
214
+ debug ! ( "Recver got unknown packet {:?} {:?}" , mh, buf) ;
215
+ return ;
216
+ }
217
+ } ;
218
+ if mh. type_ != MESSAGE_TYPE_RESPONSE {
219
+ recver_tx
220
+ . send ( Err ( Error :: Others ( format ! (
221
+ "Recver got malformed packet {:?} {:?}" ,
222
+ mh, buf
223
+ ) ) ) )
224
+ . unwrap_or_else ( |_e| error ! ( "The request has returned" ) ) ;
225
+ return ;
226
+ }
227
+
228
+ recver_tx
229
+ . send ( buf)
230
+ . unwrap_or_else ( |_e| error ! ( "The request has returned" ) ) ;
231
+
232
+ map. remove ( & mh. stream_id ) ;
233
+ }
0 commit comments