Skip to content

Commit 04a8ecf

Browse files
committed
add codec for frame resume.
1 parent 6f804c6 commit 04a8ecf

File tree

5 files changed

+202
-63
lines changed

5 files changed

+202
-63
lines changed

src/frame/mod.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
extern crate bytes;
22

3-
use crate::result::{RSocketResult};
4-
use crate::errors::{RSocketError};
3+
use crate::errors::RSocketError;
4+
use crate::result::RSocketResult;
55
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};
66

77
mod cancel;
@@ -19,9 +19,10 @@ mod resume;
1919
mod resume_ok;
2020
mod setup;
2121
mod utils;
22+
mod version;
2223

2324
pub use cancel::Cancel;
24-
pub use error::{Error};
25+
pub use error::Error;
2526
pub use keepalive::Keepalive;
2627
pub use lease::Lease;
2728
pub use metadata_push::MetadataPush;
@@ -35,6 +36,7 @@ pub use resume::Resume;
3536
pub use resume_ok::ResumeOK;
3637
pub use setup::{Setup, SetupBuilder};
3738
pub use utils::*;
39+
pub use version::Version;
3840

3941
pub const FLAG_NEXT: u16 = 0x01 << 5;
4042
pub const FLAG_COMPLETE: u16 = 0x01 << 6;
@@ -122,7 +124,7 @@ impl Writeable for Frame {
122124
Body::Error(v) => v.write_to(bf),
123125
Body::Cancel() => (),
124126
Body::ResumeOK(v) => v.write_to(bf),
125-
_ => unimplemented!(),
127+
Body::Resume(v) => v.write_to(bf),
126128
}
127129
}
128130

@@ -143,7 +145,7 @@ impl Writeable for Frame {
143145
Body::Cancel() => 0,
144146
Body::Error(v) => v.len(),
145147
Body::ResumeOK(v) => v.len(),
146-
_ => unimplemented!(),
148+
Body::Resume(v) => v.len(),
147149
}
148150
}
149151
}
@@ -165,29 +167,30 @@ impl Frame {
165167
b.advance(2);
166168
let (flag, kind) = (n & 0x03FF, (n & 0xFC00) >> 10);
167169
let body = match kind {
168-
TYPE_SETUP => Setup::decode(flag, b).map(|it|Body::Setup(it)),
169-
TYPE_REQUEST_RESPONSE =>RequestResponse::decode(flag, b).map(|it|Body::RequestResponse(it)),
170-
TYPE_REQUEST_STREAM => RequestStream::decode(flag, b).map(|it|Body::RequestStream(it)),
171-
TYPE_REQUEST_CHANNEL => RequestChannel::decode(flag, b).map(|it|Body::RequestChannel(it)),
172-
TYPE_REQUEST_FNF => RequestFNF::decode(flag, b).map(|it|Body::RequestFNF(it)),
173-
TYPE_REQUEST_N => RequestN::decode(flag, b).map(|it|Body::RequestN(it)),
174-
TYPE_METADATA_PUSH => MetadataPush::decode(flag, b).map(|it|Body::MetadataPush(it)),
175-
TYPE_KEEPALIVE => Keepalive::decode(flag, b).map(|it|Body::Keepalive(it)),
176-
TYPE_PAYLOAD => Payload::decode(flag, b).map(|it|Body::Payload(it)),
177-
TYPE_LEASE => Lease::decode(flag, b).map(|it|Body::Lease(it)),
170+
TYPE_SETUP => Setup::decode(flag, b).map(|it| Body::Setup(it)),
171+
TYPE_REQUEST_RESPONSE => RequestResponse::decode(flag, b).map(|it| Body::RequestResponse(it)),
172+
TYPE_REQUEST_STREAM => RequestStream::decode(flag, b).map(|it| Body::RequestStream(it)),
173+
TYPE_REQUEST_CHANNEL => RequestChannel::decode(flag, b).map(|it| Body::RequestChannel(it)),
174+
TYPE_REQUEST_FNF => RequestFNF::decode(flag, b).map(|it| Body::RequestFNF(it)),
175+
TYPE_REQUEST_N => RequestN::decode(flag, b).map(|it| Body::RequestN(it)),
176+
TYPE_METADATA_PUSH => MetadataPush::decode(flag, b).map(|it| Body::MetadataPush(it)),
177+
TYPE_KEEPALIVE => Keepalive::decode(flag, b).map(|it| Body::Keepalive(it)),
178+
TYPE_PAYLOAD => Payload::decode(flag, b).map(|it| Body::Payload(it)),
179+
TYPE_LEASE => Lease::decode(flag, b).map(|it| Body::Lease(it)),
178180
TYPE_CANCEL => Ok(Body::Cancel()),
179-
TYPE_ERROR => Error::decode(flag, b).map(|it|Body::Error(it)),
180-
TYPE_RESUME_OK => ResumeOK::decode(flag, b).map(|it|Body::ResumeOK(it)),
181+
TYPE_ERROR => Error::decode(flag, b).map(|it| Body::Error(it)),
182+
TYPE_RESUME_OK => ResumeOK::decode(flag, b).map(|it| Body::ResumeOK(it)),
183+
TYPE_RESUME => Resume::decode(flag, b).map(|it| Body::Resume(it)),
181184
_ => Err(RSocketError::from("illegal frame type")),
182185
};
183-
body.map(|it|Frame::new(sid, it, flag))
186+
body.map(|it| Frame::new(sid, it, flag))
184187
}
185188

186189
pub fn get_body(&self) -> &Body {
187190
&self.body
188191
}
189192

190-
pub fn get_frame_type(&self) -> u16{
193+
pub fn get_frame_type(&self) -> u16 {
191194
to_frame_type(&self.body)
192195
}
193196

@@ -199,12 +202,12 @@ impl Frame {
199202
self.stream_id.clone()
200203
}
201204

202-
pub fn has_next(&self) -> bool{
205+
pub fn has_next(&self) -> bool {
203206
self.flag & FLAG_NEXT != 0
204207
}
205208

206-
pub fn has_complete(&self) -> bool{
207-
self.flag&FLAG_COMPLETE!=0
209+
pub fn has_complete(&self) -> bool {
210+
self.flag & FLAG_COMPLETE != 0
208211
}
209212
}
210213

src/frame/resume.rs

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,128 @@
1+
extern crate bytes;
2+
3+
use crate::frame::{Body, Frame, Version, Writeable};
4+
use crate::result::RSocketResult;
5+
6+
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};
17

28
#[derive(Debug)]
3-
pub struct Resume {}
9+
pub struct Resume {
10+
version: Version,
11+
token: Option<Bytes>,
12+
last_received_server_position: u64,
13+
first_available_client_position: u64,
14+
}
15+
16+
pub struct ResumeBuilder {
17+
stream_id: u32,
18+
flag: u16,
19+
inner: Resume,
20+
}
21+
22+
impl Resume {
23+
fn new() -> Resume {
24+
Resume {
25+
version: Version::default(),
26+
token: None,
27+
last_received_server_position: 0,
28+
first_available_client_position: 0,
29+
}
30+
}
31+
32+
pub fn decode(flag: u16, b: &mut BytesMut) -> RSocketResult<Resume> {
33+
let major = BigEndian::read_u16(b);
34+
b.advance(2);
35+
let minor = BigEndian::read_u16(b);
36+
b.advance(2);
37+
let token_size = BigEndian::read_u16(b);
38+
b.advance(2);
39+
let token = if token_size > 0 {
40+
Some(Bytes::from(b.split_to(token_size as usize)))
41+
} else {
42+
None
43+
};
44+
let p1 = BigEndian::read_u64(b);
45+
b.advance(8);
46+
let p2 = BigEndian::read_u64(b);
47+
b.advance(8);
48+
Ok(Resume {
49+
version: Version::new(major, minor),
50+
token: token,
51+
last_received_server_position: p1,
52+
first_available_client_position: p2,
53+
})
54+
}
55+
56+
pub fn builder(stream_id: u32, flag: u16) -> ResumeBuilder {
57+
ResumeBuilder::new(stream_id, flag)
58+
}
59+
60+
pub fn get_version(&self) -> Version {
61+
self.version.clone()
62+
}
63+
64+
pub fn get_token(&self) -> Option<Bytes> {
65+
self.token.clone()
66+
}
67+
68+
pub fn get_last_received_server_position(&self) -> u64 {
69+
self.last_received_server_position.clone()
70+
}
71+
72+
pub fn get_first_available_client_position(&self) -> u64 {
73+
self.first_available_client_position.clone()
74+
}
75+
}
76+
77+
impl ResumeBuilder {
78+
fn new(stream_id: u32, flag: u16) -> ResumeBuilder {
79+
ResumeBuilder {
80+
stream_id: stream_id,
81+
flag: flag,
82+
inner: Resume::new(),
83+
}
84+
}
85+
86+
pub fn set_token(mut self, token: Bytes) -> Self {
87+
self.inner.token = Some(token);
88+
self
89+
}
90+
91+
pub fn set_last_received_server_position(mut self, position: u64) -> Self {
92+
self.inner.last_received_server_position = position;
93+
self
94+
}
95+
96+
pub fn set_first_available_client_position(mut self, position: u64) -> Self {
97+
self.inner.first_available_client_position = position;
98+
self
99+
}
100+
101+
pub fn build(self) -> Frame {
102+
Frame {
103+
stream_id: self.stream_id,
104+
flag: self.flag,
105+
body: Body::Resume(self.inner),
106+
}
107+
}
108+
}
109+
110+
impl Writeable for Resume {
111+
fn write_to(&self, bf: &mut BytesMut) {
112+
self.version.write_to(bf);
113+
if let Some(b) = &self.token {
114+
bf.put_u16_be(b.len() as u16);
115+
bf.put(b);
116+
}
117+
bf.put_u64_be(self.get_last_received_server_position());
118+
bf.put_u64_be(self.get_first_available_client_position());
119+
}
120+
121+
fn len(&self) -> u32 {
122+
let mut size: u32 = 22;
123+
if let Some(b) = &self.token {
124+
size += b.len() as u32;
125+
}
126+
size
127+
}
128+
}

src/frame/setup.rs

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,13 @@
11
extern crate bytes;
22

3-
use crate::result::{RSocketResult};
4-
use crate::frame::{Body, Frame, PayloadSupport, Writeable, FLAG_METADATA, FLAG_RESUME, U24};
3+
use crate::frame::{
4+
Body, Frame, PayloadSupport, Version, Writeable, FLAG_METADATA, FLAG_RESUME, U24,
5+
};
56
use crate::mime::MIME_BINARY;
7+
use crate::result::RSocketResult;
68
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};
79
use std::time::Duration;
810

9-
#[derive(Debug, Clone)]
10-
pub struct Version {
11-
major: u16,
12-
minor: u16,
13-
}
14-
15-
impl Version {
16-
pub fn default() -> Version {
17-
Version { major: 1, minor: 0 }
18-
}
19-
20-
fn new(major: u16, minor: u16) -> Version {
21-
Version { major, minor }
22-
}
23-
24-
pub fn get_major(&self) -> u16 {
25-
self.major.clone()
26-
}
27-
28-
pub fn get_minor(&self) -> u16 {
29-
self.minor.clone()
30-
}
31-
32-
pub fn write_to(&self, bf: &mut BytesMut) {
33-
bf.put_u16_be(self.major);
34-
bf.put_u16_be(self.minor);
35-
}
36-
}
37-
3811
#[derive(Debug, Clone)]
3912
pub struct Setup {
4013
version: Version,
@@ -66,12 +39,9 @@ impl Writeable for Setup {
6639
self.version.write_to(bf);
6740
bf.put_u32_be(self.keepalive);
6841
bf.put_u32_be(self.lifetime);
69-
match &self.token {
70-
Some(v) => {
71-
bf.put_u16_be(v.len() as u16);
72-
bf.put(v);
73-
}
74-
None => (),
42+
if let Some(b) = &self.token {
43+
bf.put_u16_be(b.len() as u16);
44+
bf.put(b);
7545
}
7646
bf.put_u8(self.mime_metadata.len() as u8);
7747
bf.put(&self.mime_metadata);
@@ -82,7 +52,6 @@ impl Writeable for Setup {
8252
}
8353

8454
impl Setup {
85-
8655
pub fn decode(flag: u16, b: &mut BytesMut) -> RSocketResult<Setup> {
8756
let major = BigEndian::read_u16(b);
8857
b.advance(2);
@@ -183,7 +152,7 @@ impl SetupBuilder {
183152
Frame::new(self.stream_id, Body::Setup(self.value.clone()), self.flag)
184153
}
185154

186-
pub fn set_data(mut self, bs: Bytes) -> Self{
155+
pub fn set_data(mut self, bs: Bytes) -> Self {
187156
self.value.data = Some(bs);
188157
self
189158
}

src/frame/version.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
extern crate bytes;
2+
3+
use bytes::{BufMut, BytesMut};
4+
5+
#[derive(Debug, Clone)]
6+
pub struct Version {
7+
major: u16,
8+
minor: u16,
9+
}
10+
11+
impl Version {
12+
pub fn default() -> Version {
13+
Version { major: 1, minor: 0 }
14+
}
15+
16+
pub fn new(major: u16, minor: u16) -> Version {
17+
Version { major, minor }
18+
}
19+
20+
pub fn get_major(&self) -> u16 {
21+
self.major.clone()
22+
}
23+
24+
pub fn get_minor(&self) -> u16 {
25+
self.minor.clone()
26+
}
27+
28+
pub fn write_to(&self, bf: &mut BytesMut) {
29+
bf.put_u16_be(self.major);
30+
bf.put_u16_be(self.minor);
31+
}
32+
}

tests/frame_test.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,21 @@ fn test_error() {
103103
}
104104

105105
#[test]
106-
fn test_resume_ok() {
106+
fn resume_ok() {
107107
let f = ResumeOK::builder(1234, 0).set_position(2333).build();
108108
try_codec(&f);
109109
}
110110

111+
#[test]
112+
fn test_resume() {
113+
let f = Resume::builder(0, FLAG_RESUME)
114+
.set_last_received_server_position(123)
115+
.set_first_available_client_position(22)
116+
.set_token(Bytes::from("this is a token"))
117+
.build();
118+
try_codec(&f);
119+
}
120+
111121
fn try_codec(f: &Frame) {
112122
println!("******* codec: {:?}", f);
113123
let mut bf = BytesMut::with_capacity(f.len() as usize);

0 commit comments

Comments
 (0)