Skip to content

Commit 97be17a

Browse files
authored
Merge pull request #48 from Tim-Zhang/port-from-dragonball
Port from dragonball
2 parents b47eb52 + 735bbd7 commit 97be17a

File tree

5 files changed

+36
-8
lines changed

5 files changed

+36
-8
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ libc = { version = "0.2.59", features = [ "extra_traits" ] }
1717
nix = "0.16.1"
1818
log = "0.4"
1919
byteorder = "1.3.2"
20+
thiserror = "1.0"
2021

2122
async-trait = { version = "0.1.31", optional = true }
2223
tokio = { version = "0.2", features = ["rt-threaded", "sync", "uds", "stream", "macros", "io-util"], optional = true }

src/common.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub fn do_bind(host: &str) -> Result<(RawFd, Domain)> {
9696
}
9797
};
9898

99+
setsockopt(fd, sockopt::ReusePort, &true).ok();
99100
bind(fd, &sockaddr).map_err(err_to_Others!(e, ""))?;
100101

101102
Ok((fd, domain))

src/error.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@
1616
1717
use crate::ttrpc::{Code, Status};
1818
use std::result;
19+
use thiserror::Error;
1920

2021
/// The error type for ttrpc.
21-
#[derive(Debug)]
22+
#[derive(Error, Debug)]
2223
pub enum Error {
24+
#[error("socket err: {0}")]
2325
Socket(String),
26+
27+
#[error("rpc status: {0:?}")]
2428
RpcStatus(Status),
29+
30+
#[error("ttrpc err: {0}")]
2531
Others(String),
2632
}
2733

src/sync/client.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::error::{Error, Result};
2929
use crate::sync::channel::{read_message, write_message};
3030
use crate::ttrpc::{Code, Request, Response};
3131
use crate::MessageHeader;
32+
use std::time::Duration;
3233

3334
type Sender = mpsc::Sender<(Vec<u8>, mpsc::SyncSender<Result<Vec<u8>>>)>;
3435
type Receiver = mpsc::Receiver<(Vec<u8>, mpsc::SyncSender<Result<Vec<u8>>>)>;
@@ -176,9 +177,17 @@ impl Client {
176177
self.sender_tx
177178
.send((buf, tx))
178179
.map_err(err_to_Others!(e, "Send packet to sender error "))?;
179-
let result = rx
180-
.recv()
181-
.map_err(err_to_Others!(e, "Recive packet from recver error "))?;
180+
181+
let result: Result<Vec<u8>>;
182+
if req.timeout_nano == 0 {
183+
result = rx
184+
.recv()
185+
.map_err(err_to_Others!(e, "Receive packet from recver error: "))?;
186+
} else {
187+
result = rx
188+
.recv_timeout(Duration::from_nanos(req.timeout_nano as u64))
189+
.map_err(err_to_Others!(e, "Receive packet from recver timeout: "))?;
190+
}
182191

183192
let buf = result?;
184193
let mut s = CodedInputStream::from_bytes(&buf);

src/sync/server.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use nix::unistd::close;
2121
use nix::unistd::pipe2;
2222
use protobuf::{CodedInputStream, Message};
2323
use std::collections::HashMap;
24-
use std::os::unix::io::RawFd;
24+
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
2525
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2626
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
2727
use std::sync::{Arc, Mutex};
@@ -279,8 +279,9 @@ impl Server {
279279
}
280280

281281
let (fd, _) = common::do_bind(host)?;
282-
self.listeners.push(fd);
282+
common::do_listen(fd)?;
283283

284+
self.listeners.push(fd);
284285
Ok(self)
285286
}
286287

@@ -341,8 +342,6 @@ impl Server {
341342
let service_quit = self.quit.clone();
342343
let monitor_fd = self.monitor_fd.0;
343344

344-
common::do_listen(listener)?;
345-
346345
let handler = thread::Builder::new()
347346
.name("listener_loop".into())
348347
.spawn(move || {
@@ -508,3 +507,15 @@ impl Server {
508507
}
509508
}
510509
}
510+
511+
impl FromRawFd for Server {
512+
unsafe fn from_raw_fd(fd: RawFd) -> Self {
513+
Self::default().add_listener(fd).unwrap()
514+
}
515+
}
516+
517+
impl AsRawFd for Server {
518+
fn as_raw_fd(&self) -> RawFd {
519+
self.listeners[0]
520+
}
521+
}

0 commit comments

Comments
 (0)