Skip to content

Commit be85a1a

Browse files
committed
optimize.
1 parent 73ce973 commit be85a1a

24 files changed

+315
-308
lines changed

src/core/callers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub struct RequestCaller {
1313
impl RequestCaller {
1414
pub fn new() -> (oneshot::Sender<Payload>, RequestCaller) {
1515
let (tx, rx) = oneshot::channel();
16-
(tx, RequestCaller { rx: rx })
16+
(tx, RequestCaller { rx })
1717
}
1818
}
1919

@@ -33,7 +33,7 @@ pub struct StreamCaller {
3333
impl StreamCaller {
3434
pub fn new() -> (mpsc::Sender<Payload>, StreamCaller) {
3535
let (tx, rx) = mpsc::channel(0);
36-
(tx, StreamCaller { rx: rx })
36+
(tx, StreamCaller { rx })
3737
}
3838
}
3939

src/core/misc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub struct StreamID {
99
impl StreamID {
1010
fn new(value: u32) -> StreamID {
1111
let inner = Arc::new(AtomicU32::new(value));
12-
StreamID { inner: inner }
12+
StreamID { inner }
1313
}
1414

1515
pub fn next(&self) -> u32 {

src/core/socket.rs

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,20 @@ impl Runner {
9090
}));
9191
}
9292

93+
#[inline]
9394
fn respond_request_response(&self, sid: u32, _flag: u16, input: Payload) {
9495
let responder = self.responder.clone();
9596
let tx = self.tx.clone();
9697
tokio::spawn(lazy(move || {
9798
let result = responder
9899
.request_response(input)
99-
.map(|res| {
100+
.map(move |res| {
101+
let (d, m) = res.split();
100102
let mut bu = frame::Payload::builder(sid, frame::FLAG_COMPLETE);
101-
let (data, metadata) = res.split();
102-
if let Some(b) = data {
103+
if let Some(b) = d {
103104
bu = bu.set_data(b);
104105
}
105-
if let Some(b) = metadata {
106+
if let Some(b) = m {
106107
bu = bu.set_metadata(b);
107108
}
108109
bu.build()
@@ -115,9 +116,13 @@ impl Runner {
115116
.set_data(Bytes::from("TODO: should be error details"))
116117
.build(),
117118
};
119+
118120
tx.send(sending)
119-
.map(move |_| ())
120-
.map_err(|e| error!("send frame failed: {}", e))
121+
.and_then(move |_it| {
122+
debug!("sent");
123+
Ok(())
124+
})
125+
.map_err(move |e| error!("send frame failed: {}", e))
121126
}));
122127
}
123128

@@ -129,11 +134,12 @@ impl Runner {
129134
responder
130135
.request_stream(input)
131136
.map(|elem| {
137+
let (d, m) = elem.split();
132138
let mut bu = frame::Payload::builder(sid, frame::FLAG_NEXT);
133-
if let Some(b) = elem.data() {
139+
if let Some(b) = d {
134140
bu = bu.set_data(b);
135141
}
136-
if let Some(b) = elem.metadata() {
142+
if let Some(b) = m {
137143
bu = bu.set_metadata(b);
138144
}
139145
bu.build()
@@ -157,7 +163,7 @@ impl Runner {
157163
debug!("incoming frame#{}", sid);
158164
match f.get_body() {
159165
Body::Setup(v) => {
160-
let pa = SetupPayload::from(&v);
166+
let pa = SetupPayload::from(v);
161167
match &self.acceptor {
162168
Acceptor::Generate(f) => {
163169
let rs = Box::new(self.socket.clone());
@@ -168,7 +174,7 @@ impl Runner {
168174
};
169175
}
170176
Body::Payload(v) => {
171-
let pa = Payload::from(&v);
177+
let pa = Payload::from(v);
172178
// pick handler
173179
let mut senders = handlers.map.write().unwrap();
174180
let handler = senders.remove(&sid).unwrap();
@@ -204,9 +210,13 @@ impl Runner {
204210
self.respond_request_response(sid, flag, pa);
205211
}
206212
Body::RequestStream(v) => {
207-
let pa = Payload::from(&v);
213+
let pa = Payload::from(v);
208214
self.respond_request_stream(sid, flag, pa);
209215
}
216+
Body::RequestFNF(v) => {
217+
let pa = Payload::from(v);
218+
self.respond_fnf(pa);
219+
}
210220
Body::Keepalive(v) => {
211221
if flag & frame::FLAG_RESPOND != 0 {
212222
debug!("got keepalive: {:?}", v);
@@ -262,24 +272,23 @@ impl DuplexSocket {
262272

263273
pub fn setup(&self, setup: SetupPayload) -> impl Future<Item = (), Error = RSocketError> {
264274
let mut bu = frame::Setup::builder(0, 0);
265-
266-
if let Some(b) = setup.data() {
267-
bu = bu.set_data(b);
268-
}
269-
if let Some(b) = setup.metadata() {
270-
bu = bu.set_metadata(b);
271-
}
272275
if let Some(s) = setup.data_mime_type() {
273276
bu = bu.set_mime_data(&s);
274277
}
275278
if let Some(s) = setup.metadata_mime_type() {
276279
bu = bu.set_mime_metadata(&s);
277280
}
278-
let sending = bu
281+
bu = bu
279282
.set_keepalive(setup.keepalive_interval())
280-
.set_lifetime(setup.keepalive_lifetime())
281-
.build();
282-
self.send_frame(sending)
283+
.set_lifetime(setup.keepalive_lifetime());
284+
let (d, m) = setup.split();
285+
if let Some(b) = d {
286+
bu = bu.set_data(b);
287+
}
288+
if let Some(b) = m {
289+
bu = bu.set_metadata(b);
290+
}
291+
self.send_frame(bu.build())
283292
}
284293

285294
fn send_frame(&self, sending: Frame) -> Box<dyn Future<Item = (), Error = RSocketError>> {
@@ -300,9 +309,10 @@ impl DuplexSocket {
300309

301310
impl RSocket for DuplexSocket {
302311
fn metadata_push(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
312+
let (_d, m) = req.split();
303313
let sid = self.seq.next();
304314
let mut bu = frame::MetadataPush::builder(sid, 0);
305-
if let Some(b) = req.metadata() {
315+
if let Some(b) = m {
306316
bu = bu.set_metadata(b);
307317
}
308318
let sending = bu.build();
@@ -315,12 +325,13 @@ impl RSocket for DuplexSocket {
315325
}
316326

317327
fn request_fnf(&self, req: Payload) -> Box<dyn Future<Item = (), Error = RSocketError>> {
328+
let (d, m) = req.split();
318329
let sid = self.seq.next();
319330
let mut bu = frame::RequestFNF::builder(sid, 0);
320-
if let Some(b) = req.data() {
331+
if let Some(b) = d {
321332
bu = bu.set_data(b);
322333
}
323-
if let Some(b) = req.metadata() {
334+
if let Some(b) = m {
324335
bu = bu.set_metadata(b);
325336
}
326337
let sending = bu.build();
@@ -336,16 +347,17 @@ impl RSocket for DuplexSocket {
336347
&self,
337348
input: Payload,
338349
) -> Box<dyn Future<Item = Payload, Error = RSocketError>> {
350+
let (d, m) = input.split();
339351
let sid = self.seq.next();
340352
let (tx, caller) = RequestCaller::new();
341353
// register handler
342354
self.register_handler(sid, Handler::Request(tx));
343355
// crate request frame
344356
let mut bu = frame::RequestResponse::builder(sid, 0);
345-
if let Some(b) = input.data() {
357+
if let Some(b) = d {
346358
bu = bu.set_data(b);
347359
}
348-
if let Some(b) = input.metadata() {
360+
if let Some(b) = m {
349361
bu = bu.set_metadata(b);
350362
}
351363
let sending = bu.build();
@@ -359,16 +371,17 @@ impl RSocket for DuplexSocket {
359371
&self,
360372
input: Payload,
361373
) -> Box<dyn Stream<Item = Payload, Error = RSocketError>> {
374+
let (d, m) = input.split();
362375
let sid = self.seq.next();
363376
// register handler
364377
let (tx, caller) = StreamCaller::new();
365378
self.register_handler(sid, Handler::Stream(tx));
366379
// crate stream frame
367380
let mut bu = frame::RequestStream::builder(sid, 0);
368-
if let Some(b) = input.data() {
381+
if let Some(b) = d {
369382
bu = bu.set_data(b);
370383
}
371-
if let Some(b) = input.metadata() {
384+
if let Some(b) = m {
372385
bu = bu.set_metadata(b);
373386
}
374387
let sending = bu.build();

src/frame/cancel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{Body, Frame};
22

3-
#[derive(Debug, Clone)]
3+
#[derive(Debug)]
44
pub struct Cancel {}
55

66
impl Cancel {

src/frame/error.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::{Body, Frame, Writeable};
44
use crate::result::RSocketResult;
55
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};
66

7-
#[derive(Debug, Clone)]
7+
#[derive(Debug)]
88
pub struct Error {
99
code: u32,
1010
data: Option<Bytes>,
@@ -19,8 +19,8 @@ pub struct ErrorBuilder {
1919
impl ErrorBuilder {
2020
fn new(stream_id: u32, flag: u16) -> ErrorBuilder {
2121
ErrorBuilder {
22-
stream_id: stream_id,
23-
flag: flag,
22+
stream_id,
23+
flag,
2424
value: Error {
2525
code: 0,
2626
data: None,
@@ -39,7 +39,7 @@ impl ErrorBuilder {
3939
}
4040

4141
pub fn build(self) -> Frame {
42-
Frame::new(self.stream_id, Body::Error(self.value.clone()), self.flag)
42+
Frame::new(self.stream_id, Body::Error(self.value), self.flag)
4343
}
4444
}
4545

@@ -52,18 +52,15 @@ impl Error {
5252
} else {
5353
Some(Bytes::from(bf.to_vec()))
5454
};
55-
Ok(Error {
56-
code: code,
57-
data: d,
58-
})
55+
Ok(Error { code, data: d })
5956
}
6057

6158
pub fn builder(stream_id: u32, flag: u16) -> ErrorBuilder {
6259
ErrorBuilder::new(stream_id, flag)
6360
}
6461

65-
pub fn get_data(&self) -> Option<Bytes> {
66-
self.data.clone()
62+
pub fn get_data(&self) -> &Option<Bytes> {
63+
&self.data
6764
}
6865

6966
pub fn get_code(&self) -> u32 {

src/frame/keepalive.rs

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
extern crate bytes;
22

3-
use crate::result::RSocketResult;
43
use super::{Body, Frame, Writeable};
4+
use crate::result::RSocketResult;
55
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};
66

7-
#[derive(Debug, Clone)]
7+
#[derive(Debug)]
88
pub struct Keepalive {
99
last_received_position: u64,
1010
data: Option<Bytes>,
@@ -19,48 +19,27 @@ pub struct KeepaliveBuilder {
1919
impl KeepaliveBuilder {
2020
fn new(stream_id: u32, flag: u16) -> KeepaliveBuilder {
2121
KeepaliveBuilder {
22-
stream_id: stream_id,
23-
flag: flag,
22+
stream_id,
23+
flag,
2424
keepalive: Keepalive {
2525
last_received_position: 0,
2626
data: None,
2727
},
2828
}
2929
}
3030

31-
pub fn set_data(&mut self, data: Bytes) -> &mut KeepaliveBuilder {
31+
pub fn set_data(mut self, data: Bytes) -> Self {
3232
self.keepalive.data = Some(data);
3333
self
3434
}
3535

36-
pub fn set_last_received_position(&mut self, position: u64) -> &mut KeepaliveBuilder {
36+
pub fn set_last_received_position(mut self, position: u64) -> Self {
3737
self.keepalive.last_received_position = position;
3838
self
3939
}
4040

41-
pub fn build(&mut self) -> Frame {
42-
Frame::new(
43-
self.stream_id,
44-
Body::Keepalive(self.keepalive.clone()),
45-
self.flag,
46-
)
47-
}
48-
}
49-
50-
impl Writeable for Keepalive {
51-
fn write_to(&self, bf: &mut BytesMut) {
52-
bf.put_u64_be(self.last_received_position);
53-
match &self.data {
54-
Some(v) => bf.put(v),
55-
None => (),
56-
}
57-
}
58-
59-
fn len(&self) -> u32 {
60-
8 + match &self.data {
61-
Some(v) => v.len() as u32,
62-
None => 0,
63-
}
41+
pub fn build(self) -> Frame {
42+
Frame::new(self.stream_id, Body::Keepalive(self.keepalive), self.flag)
6443
}
6544
}
6645

@@ -83,10 +62,27 @@ impl Keepalive {
8362
}
8463

8564
pub fn get_last_received_position(&self) -> u64 {
86-
self.last_received_position.clone()
65+
self.last_received_position
66+
}
67+
68+
pub fn get_data(&self) -> &Option<Bytes> {
69+
&self.data
70+
}
71+
}
72+
73+
impl Writeable for Keepalive {
74+
fn write_to(&self, bf: &mut BytesMut) {
75+
bf.put_u64_be(self.last_received_position);
76+
match &self.data {
77+
Some(v) => bf.put(v),
78+
None => (),
79+
}
8780
}
8881

89-
pub fn get_data(&self) -> Option<Bytes> {
90-
self.data.clone()
82+
fn len(&self) -> u32 {
83+
8 + match &self.data {
84+
Some(v) => v.len() as u32,
85+
None => 0,
86+
}
9187
}
9288
}

0 commit comments

Comments
 (0)