Skip to content

Commit 00a79fe

Browse files
committed
change request_fnf to fire_and_forget && support basic keepalive respond.
1 parent be85a1a commit 00a79fe

File tree

7 files changed

+58
-33
lines changed

7 files changed

+58
-33
lines changed

src/core/socket.rs

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::collections::HashMap;
1818
use std::net::SocketAddr;
1919
use std::sync::{Arc, Mutex, RwLock};
2020
use tokio::net::TcpStream;
21+
use tokio::runtime::Runtime;
2122

2223
pub enum Acceptor {
2324
Direct(Box<dyn RSocket>),
@@ -74,6 +75,7 @@ impl Runner {
7475
}
7576
}
7677

78+
#[inline]
7779
fn respond_metadata_push(&self, input: Payload) {
7880
let responder = self.responder.clone();
7981
tokio::spawn(lazy(move || {
@@ -82,14 +84,31 @@ impl Runner {
8284
}));
8385
}
8486

87+
#[inline]
8588
fn respond_fnf(&self, input: Payload) {
8689
let responder = self.responder.clone();
8790
tokio::spawn(lazy(move || {
88-
responder.request_fnf(input).wait().unwrap();
91+
responder.fire_and_forget(input).wait().unwrap();
8992
Ok(())
9093
}));
9194
}
9295

96+
#[inline]
97+
fn respond_keepalive(&self, keepalive: frame::Keepalive) {
98+
let tx = self.tx.clone();
99+
tokio::spawn(lazy(move || {
100+
let (d, _) = keepalive.split();
101+
let mut bu = frame::Keepalive::builder(0, 0);
102+
if let Some(b) = d {
103+
bu = bu.set_data(b);
104+
}
105+
let sending = bu.build();
106+
tx.send(sending)
107+
.and_then(move |_it| Ok(()))
108+
.map_err(move |e| warn!("send frame failed: {}", e))
109+
}));
110+
}
111+
93112
#[inline]
94113
fn respond_request_response(&self, sid: u32, _flag: u16, input: Payload) {
95114
let responder = self.responder.clone();
@@ -116,16 +135,13 @@ impl Runner {
116135
.set_data(Bytes::from("TODO: should be error details"))
117136
.build(),
118137
};
119-
120138
tx.send(sending)
121-
.and_then(move |_it| {
122-
debug!("sent");
123-
Ok(())
124-
})
125-
.map_err(move |e| error!("send frame failed: {}", e))
139+
.and_then(move |_it| Ok(()))
140+
.map_err(move |e| warn!("send frame failed: {}", e))
126141
}));
127142
}
128143

144+
#[inline]
129145
fn respond_request_stream(&self, sid: u32, flag: u16, input: Payload) {
130146
let responder = self.responder.clone();
131147
let tx = self.tx.clone();
@@ -155,9 +171,9 @@ impl Runner {
155171
}));
156172
}
157173

174+
#[inline]
158175
fn to_future(self, rx: mpsc::Receiver<Frame>) -> impl Future<Item = (), Error = ()> + Send {
159-
let handlers = self.handlers.clone();
160-
let task = rx.for_each(move |f: frame::Frame| {
176+
let task = rx.for_each(move |f| {
161177
let sid = f.get_stream_id();
162178
let flag = f.get_flag();
163179
debug!("incoming frame#{}", sid);
@@ -176,6 +192,7 @@ impl Runner {
176192
Body::Payload(v) => {
177193
let pa = Payload::from(v);
178194
// pick handler
195+
let handlers = self.handlers.clone();
179196
let mut senders = handlers.map.write().unwrap();
180197
let handler = senders.remove(&sid).unwrap();
181198

@@ -217,9 +234,14 @@ impl Runner {
217234
let pa = Payload::from(v);
218235
self.respond_fnf(pa);
219236
}
237+
Body::MetadataPush(v) => {
238+
let pa = Payload::from(v);
239+
self.respond_metadata_push(pa);
240+
}
220241
Body::Keepalive(v) => {
221242
if flag & frame::FLAG_RESPOND != 0 {
222243
debug!("got keepalive: {:?}", v);
244+
self.respond_keepalive(v);
223245
}
224246
}
225247
_ => unimplemented!(),
@@ -243,6 +265,7 @@ impl DuplexSocket {
243265
DuplexSocketBuilder::new()
244266
}
245267

268+
#[inline]
246269
fn new(
247270
first_stream_id: u32,
248271
ctx: Context,
@@ -253,16 +276,15 @@ impl DuplexSocket {
253276

254277
let handlers = Arc::new(Handlers::new());
255278
let handlers2 = handlers.clone();
256-
let tx1 = tp.tx();
257-
let tx2 = tp.tx();
279+
let (tx, rx) = tp.split();
258280

259281
let sk = DuplexSocket {
260-
tx: tx1,
282+
tx: tx.clone(),
261283
handlers: handlers,
262284
seq: StreamID::from(first_stream_id),
263285
};
264286

265-
let task = Runner::new(tx2, handlers2, responder, sk.clone()).to_future(tp.rx());
287+
let task = Runner::new(tx, handlers2, responder, sk.clone()).to_future(rx);
266288
let fu = lazy(move || {
267289
tokio::spawn(task0);
268290
task
@@ -296,7 +318,7 @@ impl DuplexSocket {
296318
let task = tx
297319
.send(sending)
298320
.map(|_| ())
299-
.map_err(|e| RSocketError::from("send frame failed"));
321+
.map_err(|_e| RSocketError::from("send frame failed"));
300322
Box::new(task)
301323
}
302324

@@ -324,7 +346,7 @@ impl RSocket for DuplexSocket {
324346
Box::new(fu)
325347
}
326348

327-
fn request_fnf(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
349+
fn fire_and_forget(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
328350
let (d, m) = req.split();
329351
let sid = self.seq.next();
330352
let mut bu = frame::RequestFNF::builder(sid, 0);
@@ -458,9 +480,9 @@ impl RSocket for Responder {
458480
(*r).metadata_push(req)
459481
}
460482

461-
fn request_fnf(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
483+
fn fire_and_forget(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
462484
let r = self.inner.read().unwrap();
463-
(*r).request_fnf(req)
485+
(*r).fire_and_forget(req)
464486
}
465487

466488
fn request_response(
@@ -490,7 +512,7 @@ impl RSocket for EmptyRSocket {
490512
Box::new(future::err(self.must_failed()))
491513
}
492514

493-
fn request_fnf(&self, _req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
515+
fn fire_and_forget(&self, _req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
494516
Box::new(future::err(self.must_failed()))
495517
}
496518

src/core/spi.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures::{Future, Stream};
1010

1111
pub trait RSocket: Sync + Send {
1212
fn metadata_push(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>>;
13-
fn request_fnf(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>>;
13+
fn fire_and_forget(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>>;
1414
fn request_response(&self, req: Payload)
1515
-> Box<dyn Future<Item = Payload, Error = RSocketError>>;
1616
fn request_stream(&self, req: Payload) -> Box<dyn Stream<Item = Payload, Error = RSocketError>>;
@@ -24,7 +24,7 @@ impl RSocket for MockResponder {
2424
Box::new(ok(()).map_err(|()| RSocketError::from("foobar")))
2525
}
2626

27-
fn request_fnf(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
27+
fn fire_and_forget(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
2828
debug!("receive request_fnf: {:?}", req);
2929
Box::new(ok(()).map_err(|()| RSocketError::from("foobar")))
3030
}

src/frame/keepalive.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ impl Keepalive {
6868
pub fn get_data(&self) -> &Option<Bytes> {
6969
&self.data
7070
}
71+
72+
pub fn split(self) -> (Option<Bytes>,Option<Bytes>){
73+
(self.data,None)
74+
}
75+
7176
}
7277

7378
impl Writeable for Keepalive {

src/transport/spi.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@ impl Transport {
1616
Transport { _tx: tx, _rx: rx }
1717
}
1818

19-
pub fn tx(&self) -> Sender<Frame> {
20-
self._tx.clone()
19+
pub fn split(self) -> (Sender<Frame>,Receiver<Frame>){
20+
(self._tx,self._rx)
2121
}
2222

23-
pub fn rx(self) -> Receiver<Frame> {
24-
self._rx
25-
}
2623
}

src/transport/tcp.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,21 @@ pub fn from_addr(addr: &SocketAddr) -> Context {
1616
}
1717

1818
pub fn from_socket(socket: TcpStream) -> Context {
19-
let (tx, rx) = mpsc::channel(0);
20-
let (rtx, rrx) = mpsc::channel(0);
19+
let (tx_snd, rx_snd) = mpsc::channel::<Frame>(0);
20+
let (tx_rcv, rx_rcv) = mpsc::channel::<Frame>(0);
2121
let (sink, stream) = Framed::new(socket, FrameCodec::new()).split();
2222
let sender: Box<dyn Stream<Item = Frame, Error = io::Error> + Send> =
23-
Box::new(rx.map_err(|_| panic!("errors not possible on rx")));
23+
Box::new(rx_snd.map_err(|_| panic!("errors not possible on rx")));
24+
let tx_rcv_gen = move || tx_rcv.clone();
2425
let task = stream
2526
.for_each(move |it| {
26-
rtx.clone().send(it).wait().unwrap();
27+
tx_rcv_gen().send(it).wait().unwrap();
2728
Ok(())
2829
})
2930
.map_err(|e| println!("error reading: {:?}", e));
3031
let fu = lazy(move || {
3132
tokio::spawn(sender.forward(sink).then(|_| Ok(())));
3233
task
3334
});
34-
(Transport::new(tx, rrx), Box::new(fu))
35+
(Transport::new(tx_snd, rx_rcv), Box::new(fu))
3536
}

src/x/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ impl RSocket for Client {
125125
self.socket.metadata_push(req)
126126
}
127127

128-
fn request_fnf(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
129-
self.socket.request_fnf(req)
128+
fn fire_and_forget(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
129+
self.socket.fire_and_forget(req)
130130
}
131131

132132
fn request_response(

tests/client_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn exec(socket: &Client) {
3535

3636
// request fnf
3737
let fnf = Payload::from("Mock FNF");
38-
socket.request_fnf(fnf).wait().unwrap();
38+
socket.fire_and_forget(fnf).wait().unwrap();
3939

4040
// request response
4141
for n in 0..3 {

0 commit comments

Comments
 (0)