Skip to content

Commit e735bb3

Browse files
committed
Refactor to support other OSes
This moves unix specific calls out of the main server and client functions. It does this by introducing several new types: PipeListener, PipeConnection, and ClientConnection. These types are contain the unix specific functionality to communitcate with Unix Domain sockets and are hidden behind a conditional compilation flag. Signed-off-by: James Sturtevant <[email protected]>
1 parent cb4d18b commit e735bb3

File tree

7 files changed

+459
-224
lines changed

7 files changed

+459
-224
lines changed

src/sync/channel.rs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use nix::sys::socket::*;
16-
use std::os::unix::io::RawFd;
1715

1816
use crate::error::{get_rpc_status, sock_error_msg, Error, Result};
17+
use crate::sync::sys::{PipeConnection};
1918
use crate::proto::{Code, MessageHeader, MESSAGE_HEADER_LENGTH, MESSAGE_LENGTH_MAX};
2019

21-
fn retryable(e: nix::Error) -> bool {
22-
use ::nix::Error;
23-
e == Error::EINTR || e == Error::EAGAIN
24-
}
25-
26-
fn read_count(fd: RawFd, count: usize) -> Result<Vec<u8>> {
20+
fn read_count (fd: &PipeConnection, count: usize) -> Result<Vec<u8>> {
2721
let mut v: Vec<u8> = vec![0; count];
2822
let mut len = 0;
2923

@@ -32,19 +26,14 @@ fn read_count(fd: RawFd, count: usize) -> Result<Vec<u8>> {
3226
}
3327

3428
loop {
35-
match recv(fd, &mut v[len..], MsgFlags::empty()) {
29+
match fd.read(&mut v[len..]) {
3630
Ok(l) => {
3731
len += l;
3832
// when socket peer closed, it would return 0.
3933
if len == count || l == 0 {
4034
break;
4135
}
4236
}
43-
44-
Err(e) if retryable(e) => {
45-
// Should retry
46-
}
47-
4837
Err(e) => {
4938
return Err(Error::Socket(e.to_string()));
5039
}
@@ -54,26 +43,21 @@ fn read_count(fd: RawFd, count: usize) -> Result<Vec<u8>> {
5443
Ok(v[0..len].to_vec())
5544
}
5645

57-
fn write_count(fd: RawFd, buf: &[u8], count: usize) -> Result<usize> {
46+
fn write_count(fd: &PipeConnection, buf: &[u8], count: usize) -> Result<usize> {
5847
let mut len = 0;
5948

6049
if count == 0 {
6150
return Ok(0);
6251
}
6352

6453
loop {
65-
match send(fd, &buf[len..], MsgFlags::empty()) {
54+
match fd.write(&buf[len..]){
6655
Ok(l) => {
6756
len += l;
6857
if len == count {
6958
break;
7059
}
7160
}
72-
73-
Err(e) if retryable(e) => {
74-
// Should retry
75-
}
76-
7761
Err(e) => {
7862
return Err(Error::Socket(e.to_string()));
7963
}
@@ -83,7 +67,7 @@ fn write_count(fd: RawFd, buf: &[u8], count: usize) -> Result<usize> {
8367
Ok(len)
8468
}
8569

86-
fn read_message_header(fd: RawFd) -> Result<MessageHeader> {
70+
fn read_message_header(fd: &PipeConnection) -> Result<MessageHeader> {
8771
let buf = read_count(fd, MESSAGE_HEADER_LENGTH)?;
8872
let size = buf.len();
8973
if size != MESSAGE_HEADER_LENGTH {
@@ -98,7 +82,7 @@ fn read_message_header(fd: RawFd) -> Result<MessageHeader> {
9882
Ok(mh)
9983
}
10084

101-
pub fn read_message(fd: RawFd) -> Result<(MessageHeader, Vec<u8>)> {
85+
pub fn read_message(fd: &PipeConnection) -> Result<(MessageHeader, Vec<u8>)> {
10286
let mh = read_message_header(fd)?;
10387
trace!("Got Message header {:?}", mh);
10488

@@ -125,7 +109,7 @@ pub fn read_message(fd: RawFd) -> Result<(MessageHeader, Vec<u8>)> {
125109
Ok((mh, buf))
126110
}
127111

128-
fn write_message_header(fd: RawFd, mh: MessageHeader) -> Result<()> {
112+
fn write_message_header(fd: &PipeConnection, mh: MessageHeader) -> Result<()> {
129113
let buf: Vec<u8> = mh.into();
130114

131115
let size = write_count(fd, &buf, MESSAGE_HEADER_LENGTH)?;
@@ -139,7 +123,7 @@ fn write_message_header(fd: RawFd, mh: MessageHeader) -> Result<()> {
139123
Ok(())
140124
}
141125

142-
pub fn write_message(fd: RawFd, mh: MessageHeader, buf: Vec<u8>) -> Result<()> {
126+
pub fn write_message(fd: &PipeConnection, mh: MessageHeader, buf: Vec<u8>) -> Result<()> {
143127
write_message_header(fd, mh)?;
144128

145129
let size = write_count(fd, &buf, buf.len())?;

src/sync/client.rs

Lines changed: 38 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,15 @@
1414

1515
//! Sync client of ttrpc.
1616
17-
use nix::sys::socket::*;
18-
use nix::unistd::close;
17+
1918
use std::collections::HashMap;
2019
use std::os::unix::io::RawFd;
2120
use std::sync::mpsc;
2221
use std::sync::{Arc, Mutex};
23-
use std::{io, thread};
22+
use std::{thread};
2423

25-
#[cfg(target_os = "macos")]
26-
use crate::common::set_fd_close_exec;
27-
use crate::common::{client_connect, SOCK_CLOEXEC};
2824
use crate::error::{Error, Result};
25+
use crate::sync::sys::{ClientConnection};
2926
use crate::proto::{Code, Codec, MessageHeader, Request, Response, MESSAGE_TYPE_RESPONSE};
3027
use crate::sync::channel::{read_message, write_message};
3128
use std::time::Duration;
@@ -36,38 +33,36 @@ type Receiver = mpsc::Receiver<(Vec<u8>, mpsc::SyncSender<Result<Vec<u8>>>)>;
3633
/// A ttrpc Client (sync).
3734
#[derive(Clone)]
3835
pub struct Client {
39-
_fd: RawFd,
36+
_fd: Arc<ClientConnection>,
4037
sender_tx: Sender,
41-
_client_close: Arc<ClientClose>,
4238
}
4339

4440
impl Client {
4541
pub fn connect(sockaddr: &str) -> Result<Client> {
46-
let fd = unsafe { client_connect(sockaddr)? };
47-
Ok(Self::new(fd))
42+
let conn = ClientConnection::client_connect(sockaddr)?;
43+
44+
Ok(Self::new_client(conn))
4845
}
4946

5047
/// Initialize a new [`Client`] from raw file descriptor.
5148
pub fn new(fd: RawFd) -> Client {
52-
let (sender_tx, rx): (Sender, Receiver) = mpsc::channel();
49+
let conn = ClientConnection::new(fd);
5350

54-
let (recver_fd, close_fd) =
55-
socketpair(AddressFamily::Unix, SockType::Stream, None, SOCK_CLOEXEC).unwrap();
51+
Self::new_client(conn)
52+
}
5653

57-
// MacOS doesn't support descriptor creation with SOCK_CLOEXEC automically,
58-
// so there is a chance of leak if fork + exec happens in between of these calls.
59-
#[cfg(target_os = "macos")]
60-
{
61-
set_fd_close_exec(recver_fd).unwrap();
62-
set_fd_close_exec(close_fd).unwrap();
63-
}
54+
fn new_client(pipe_client: ClientConnection) -> Client {
55+
let client = Arc::new(pipe_client);
56+
6457

65-
let client_close = Arc::new(ClientClose { fd, close_fd });
58+
let (sender_tx, rx): (Sender, Receiver) = mpsc::channel();
6659

60+
6761
let recver_map_orig = Arc::new(Mutex::new(HashMap::new()));
6862

6963
//Sender
7064
let recver_map = recver_map_orig.clone();
65+
let sender_client = client.clone();
7166
thread::spawn(move || {
7267
let mut stream_id: u32 = 1;
7368
for (buf, recver_tx) in rx.iter() {
@@ -80,7 +75,8 @@ impl Client {
8075
}
8176
let mut mh = MessageHeader::new_request(0, buf.len() as u32);
8277
mh.set_stream_id(current_stream_id);
83-
if let Err(e) = write_message(fd, mh, buf) {
78+
let c = sender_client.get_pipe_connection();
79+
if let Err(e) = write_message(&c, mh, buf) {
8480
//Remove current_stream_id and recver_tx to recver_map
8581
{
8682
let mut map = recver_map.lock().unwrap();
@@ -95,53 +91,28 @@ impl Client {
9591
});
9692

9793
//Recver
94+
let reciever_client = client.clone();
9895
thread::spawn(move || {
99-
let mut pollers = vec![
100-
libc::pollfd {
101-
fd: recver_fd,
102-
events: libc::POLLIN,
103-
revents: 0,
104-
},
105-
libc::pollfd {
106-
fd,
107-
events: libc::POLLIN,
108-
revents: 0,
109-
},
110-
];
96+
11197

11298
loop {
113-
let returned = unsafe {
114-
let pollers: &mut [libc::pollfd] = &mut pollers;
115-
libc::poll(
116-
pollers as *mut _ as *mut libc::pollfd,
117-
pollers.len() as _,
118-
-1,
119-
)
120-
};
121-
122-
if returned == -1 {
123-
let err = io::Error::last_os_error();
124-
if err.raw_os_error() == Some(libc::EINTR) {
99+
100+
match reciever_client.ready() {
101+
Ok(None) => {
125102
continue;
126103
}
127-
128-
error!("fatal error in process reaper:{}", err);
129-
break;
130-
} else if returned < 1 {
131-
continue;
132-
}
133-
134-
if pollers[0].revents != 0 {
135-
break;
136-
}
137-
138-
if pollers[pollers.len() - 1].revents == 0 {
139-
continue;
104+
Ok(_) => {}
105+
Err(e) => {
106+
error!("pipeConnection ready error {:?}", e);
107+
break;
108+
}
140109
}
141-
142110
let mh;
143111
let buf;
144-
match read_message(fd) {
112+
113+
let pipe_connection = reciever_client.get_pipe_connection();
114+
115+
match read_message(&pipe_connection) {
145116
Ok((x, y)) => {
146117
mh = x;
147118
buf = y;
@@ -190,20 +161,18 @@ impl Client {
190161
map.remove(&mh.stream_id);
191162
}
192163

193-
let _ = close(recver_fd).map_err(|e| {
164+
let _ = reciever_client.close_receiver().map_err(|e| {
194165
warn!(
195-
"failed to close recver_fd: {} with error: {:?}",
196-
recver_fd, e
166+
"failed to close with error: {:?}", e
197167
)
198168
});
199169

200170
trace!("Recver quit");
201171
});
202172

203173
Client {
204-
_fd: fd,
174+
_fd: client,
205175
sender_tx,
206-
_client_close: client_close,
207176
}
208177
}
209178
pub fn request(&self, req: Request) -> Result<Response> {
@@ -239,15 +208,9 @@ impl Client {
239208
}
240209
}
241210

242-
struct ClientClose {
243-
fd: RawFd,
244-
close_fd: RawFd,
245-
}
246-
247-
impl Drop for ClientClose {
211+
impl Drop for ClientConnection {
248212
fn drop(&mut self) {
249-
close(self.close_fd).unwrap();
250-
close(self.fd).unwrap();
251-
trace!("All client is droped");
213+
self.close().unwrap();
214+
trace!("All client is dropped");
252215
}
253216
}

src/sync/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
mod channel;
99
mod client;
1010
mod server;
11+
mod sys;
1112

1213
#[macro_use]
1314
mod utils;

0 commit comments

Comments
 (0)