Skip to content

Commit c677f3f

Browse files
committed
Support COMPOSITE_METADATA.
1 parent 55827e8 commit c677f3f

File tree

10 files changed

+422
-176
lines changed

10 files changed

+422
-176
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ matches = "0.1.8"
1414
log = "0.4.8"
1515
bytes = "0.5.1"
1616
futures = "0.3.1"
17+
lazy_static = "1.4.0"
1718
# reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
1819

1920
[dependencies.tokio]
@@ -30,6 +31,7 @@ features = ["codec"]
3031
[dev-dependencies]
3132
env_logger = "0.7.1"
3233
hex = "0.4.0"
34+
rand = "0.7.2"
3335

3436
[[example]]
3537
name = "echo"

src/extension.rs

Lines changed: 98 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -1,168 +1,108 @@
1-
use std::fmt;
1+
use super::mime::WellKnownMIME;
2+
use crate::errors::{ErrorKind, RSocketError};
3+
use crate::frame::U24;
4+
use crate::result::RSocketResult;
5+
use bytes::{Buf, BufMut, Bytes, BytesMut};
26

3-
#[derive(PartialEq, Debug)]
4-
pub enum WellKnownMIME {
5-
ApplicationAvro,
6-
ApplicationCbor,
7-
ApplicationGraphql,
8-
ApplicationGzip,
9-
ApplicationJavascript,
10-
ApplicationJson,
11-
ApplicationOctetStream,
12-
ApplicationPdf,
13-
ApplicationVndApacheThriftBinary,
14-
ApplicationVndGoogleProtobuf,
15-
ApplicationXml,
16-
ApplicationZip,
17-
AudioAac,
18-
AudioMp3,
19-
AudioMp4,
20-
AudioMpeg3,
21-
AudioMpeg,
22-
AudioOgg,
23-
AudioOpus,
24-
AudioVorbis,
25-
ImageBmp,
26-
ImageGif,
27-
ImageHeicSequence,
28-
ImageHeic,
29-
ImageHeifSequence,
30-
ImageHeif,
31-
ImageJpeg,
32-
ImagePng,
33-
ImageTiff,
34-
MultipartMixed,
35-
TextCss,
36-
TextCsv,
37-
TextHtml,
38-
TextPlain,
39-
TextXml,
40-
VideoH264,
41-
VideoH265,
42-
VideoVP8,
43-
ApplicationXHessian,
44-
ApplicationXJavaObject,
45-
ApplicationCloudeventsJson,
46-
MessageXRSocketTracingZipkinV0,
47-
MessageXRSocketRoutingV0,
48-
MessageXRsocketCompositeMetadataV0,
49-
Unknown,
7+
const MAX_MIME_LEN: usize = 0x7F;
8+
9+
#[derive(Debug, Clone, Eq, PartialEq)]
10+
pub struct CompositeMetadata {
11+
mime: String,
12+
payload: Bytes,
5013
}
5114

52-
impl From<u8> for WellKnownMIME {
53-
fn from(n: u8) -> WellKnownMIME {
54-
return match n {
55-
0x00 => WellKnownMIME::ApplicationAvro,
56-
0x01 => WellKnownMIME::ApplicationCbor,
57-
0x02 => WellKnownMIME::ApplicationGraphql,
58-
0x03 => WellKnownMIME::ApplicationGzip,
59-
0x04 => WellKnownMIME::ApplicationJavascript,
60-
0x05 => WellKnownMIME::ApplicationJson,
61-
0x06 => WellKnownMIME::ApplicationOctetStream,
62-
0x07 => WellKnownMIME::ApplicationPdf,
63-
0x08 => WellKnownMIME::ApplicationVndApacheThriftBinary,
64-
0x09 => WellKnownMIME::ApplicationVndGoogleProtobuf,
65-
0x0A => WellKnownMIME::ApplicationXml,
66-
0x0B => WellKnownMIME::ApplicationZip,
67-
0x0C => WellKnownMIME::AudioAac,
68-
0x0D => WellKnownMIME::AudioMp3,
69-
0x0E => WellKnownMIME::AudioMp4,
70-
0x0F => WellKnownMIME::AudioMpeg3,
71-
0x10 => WellKnownMIME::AudioMpeg,
72-
0x11 => WellKnownMIME::AudioOgg,
73-
0x12 => WellKnownMIME::AudioOpus,
74-
0x13 => WellKnownMIME::AudioVorbis,
75-
0x14 => WellKnownMIME::ImageBmp,
76-
0x15 => WellKnownMIME::ImageGif,
77-
0x16 => WellKnownMIME::ImageHeicSequence,
78-
0x17 => WellKnownMIME::ImageHeic,
79-
0x18 => WellKnownMIME::ImageHeifSequence,
80-
0x19 => WellKnownMIME::ImageHeif,
81-
0x1A => WellKnownMIME::ImageJpeg,
82-
0x1B => WellKnownMIME::ImagePng,
83-
0x1C => WellKnownMIME::ImageTiff,
84-
0x1D => WellKnownMIME::MultipartMixed,
85-
0x1E => WellKnownMIME::TextCss,
86-
0x1F => WellKnownMIME::TextCsv,
87-
0x20 => WellKnownMIME::TextHtml,
88-
0x21 => WellKnownMIME::TextPlain,
89-
0x22 => WellKnownMIME::TextXml,
90-
0x23 => WellKnownMIME::VideoH264,
91-
0x24 => WellKnownMIME::VideoH265,
92-
0x25 => WellKnownMIME::VideoVP8,
93-
0x26 => WellKnownMIME::ApplicationXHessian,
94-
0x27 => WellKnownMIME::ApplicationXJavaObject,
95-
0x28 => WellKnownMIME::ApplicationCloudeventsJson,
96-
0x7D => WellKnownMIME::MessageXRSocketTracingZipkinV0,
97-
0x7E => WellKnownMIME::MessageXRSocketRoutingV0,
98-
0x7F => WellKnownMIME::MessageXRsocketCompositeMetadataV0,
99-
_ => WellKnownMIME::Unknown,
100-
};
15+
impl CompositeMetadata {
16+
pub fn new(mime: String, payload: Bytes) -> CompositeMetadata {
17+
if mime.len() > MAX_MIME_LEN {
18+
panic!("too large MIME type!");
19+
}
20+
if payload.len() > U24::max() {
21+
panic!("too large Payload!")
22+
}
23+
CompositeMetadata { mime, payload }
10124
}
102-
}
10325

104-
impl fmt::Display for WellKnownMIME {
105-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106-
return match *self {
107-
WellKnownMIME::ApplicationAvro => write!(f, "{}", "application/avro"),
108-
WellKnownMIME::ApplicationCbor => write!(f, "{}", "application/cbor"),
109-
WellKnownMIME::ApplicationGraphql => write!(f, "{}", "application/graphql"),
110-
WellKnownMIME::ApplicationGzip => write!(f, "{}", "application/gzip"),
111-
WellKnownMIME::ApplicationJavascript => write!(f, "{}", "application/javascript"),
112-
WellKnownMIME::ApplicationJson => write!(f, "{}", "application/json"),
113-
WellKnownMIME::ApplicationOctetStream => write!(f, "{}", "application/octet-stream"),
114-
WellKnownMIME::ApplicationPdf => write!(f, "{}", "application/pdf"),
115-
WellKnownMIME::ApplicationVndApacheThriftBinary => {
116-
write!(f, "{}", "application/vnd.apache.thrift.binary")
117-
}
118-
WellKnownMIME::ApplicationVndGoogleProtobuf => {
119-
write!(f, "{}", "application/vnd.google.protobuf")
120-
}
121-
WellKnownMIME::ApplicationXml => write!(f, "{}", "application/xml"),
122-
WellKnownMIME::ApplicationZip => write!(f, "{}", "application/zip"),
123-
WellKnownMIME::AudioAac => write!(f, "{}", "audio/aac"),
124-
WellKnownMIME::AudioMp3 => write!(f, "{}", "audio/mp3"),
125-
WellKnownMIME::AudioMp4 => write!(f, "{}", "audio/mp4"),
126-
WellKnownMIME::AudioMpeg3 => write!(f, "{}", "audio/mpeg3"),
127-
WellKnownMIME::AudioMpeg => write!(f, "{}", "audio/mpeg"),
128-
WellKnownMIME::AudioOgg => write!(f, "{}", "audio/ogg"),
129-
WellKnownMIME::AudioOpus => write!(f, "{}", "audio/opus"),
130-
WellKnownMIME::AudioVorbis => write!(f, "{}", "audio/vorbis"),
131-
WellKnownMIME::ImageBmp => write!(f, "{}", "image/bmp"),
132-
WellKnownMIME::ImageGif => write!(f, "{}", "image/gif"),
133-
WellKnownMIME::ImageHeicSequence => write!(f, "{}", "image/heic-sequence"),
134-
WellKnownMIME::ImageHeic => write!(f, "{}", "image/heic"),
135-
WellKnownMIME::ImageHeifSequence => write!(f, "{}", "image/heif-sequence"),
136-
WellKnownMIME::ImageHeif => write!(f, "{}", "image/heif"),
137-
WellKnownMIME::ImageJpeg => write!(f, "{}", "image/jpeg"),
138-
WellKnownMIME::ImagePng => write!(f, "{}", "image/png"),
139-
WellKnownMIME::ImageTiff => write!(f, "{}", "image/tiff"),
140-
WellKnownMIME::MultipartMixed => write!(f, "{}", "multipart/mixed"),
141-
WellKnownMIME::TextCss => write!(f, "{}", "text/css"),
142-
WellKnownMIME::TextCsv => write!(f, "{}", "text/csv"),
143-
WellKnownMIME::TextHtml => write!(f, "{}", "text/html"),
144-
WellKnownMIME::TextPlain => write!(f, "{}", "text/plain"),
145-
WellKnownMIME::TextXml => write!(f, "{}", "text/xml"),
146-
WellKnownMIME::VideoH264 => write!(f, "{}", "video/H264"),
147-
WellKnownMIME::VideoH265 => write!(f, "{}", "video/H265"),
148-
WellKnownMIME::VideoVP8 => write!(f, "{}", "video/VP8"),
149-
WellKnownMIME::ApplicationXHessian => write!(f, "{}", "application/x-hessian"),
150-
WellKnownMIME::ApplicationXJavaObject => write!(f, "{}", "application/x-java-object"),
151-
WellKnownMIME::ApplicationCloudeventsJson => {
152-
write!(f, "{}", "application/cloudevents+json")
153-
}
154-
WellKnownMIME::MessageXRSocketTracingZipkinV0 => {
155-
write!(f, "{}", "message/x.rsocket.tracing-zipkin.v0")
26+
pub fn decode(b: &mut BytesMut) -> RSocketResult<Vec<CompositeMetadata>> {
27+
let mut metadatas: Vec<CompositeMetadata> = vec![];
28+
loop {
29+
match Self::decode_once(b) {
30+
Ok(op) => match op {
31+
Some(v) => metadatas.push(v),
32+
None => break,
33+
},
34+
Err(e) => return Err(e),
15635
}
157-
WellKnownMIME::MessageXRSocketRoutingV0 => {
158-
write!(f, "{}", "message/x.rsocket.routing.v0")
159-
}
160-
WellKnownMIME::MessageXRsocketCompositeMetadataV0 => {
161-
write!(f, "{}", "message/x.rsocket.composite-metadata.v0")
36+
}
37+
Ok(metadatas)
38+
}
39+
40+
fn decode_once(bs: &mut BytesMut) -> RSocketResult<Option<CompositeMetadata>> {
41+
if bs.is_empty() {
42+
return Ok(None);
43+
}
44+
let first: u8 = bs.get_u8();
45+
let m = if 0x80 & first != 0 {
46+
// Well
47+
let well = WellKnownMIME::from(first & 0x7F);
48+
well.str().to_string()
49+
} else {
50+
// Bad
51+
let mime_len = first as usize;
52+
if bs.len() < mime_len {
53+
return Err(RSocketError::from(ErrorKind::WithDescription(
54+
"bad COMPOSITE_METADATA bytes: missing required bytes!",
55+
)));
16256
}
163-
_ => write!(f, "{}", "unknown"),
57+
let front = bs.split_to(mime_len);
58+
String::from_utf8(front.to_vec()).unwrap()
16459
};
60+
61+
if bs.len() < 3 {
62+
return Err(RSocketError::from(ErrorKind::WithDescription(
63+
"bad COMPOSITE_METADATA bytes: missing required bytes!",
64+
)));
65+
}
66+
let payload_size = U24::read_advance(bs) as usize;
67+
if bs.len() < payload_size {
68+
return Err(RSocketError::from(ErrorKind::WithDescription(
69+
"bad COMPOSITE_METADATA bytes: missing required bytes!",
70+
)));
71+
}
72+
let p = bs.split_to(payload_size).freeze();
73+
Ok(Some(CompositeMetadata::new(m, p)))
16574
}
166-
}
16775

168-
pub struct CompositeMetadata {}
76+
pub fn get_mime(&self) -> &String {
77+
&self.mime
78+
}
79+
80+
pub fn get_payload(&self) -> &Bytes {
81+
&self.payload
82+
}
83+
84+
pub fn write_to(&self, bf: &mut BytesMut) {
85+
let mi = WellKnownMIME::from(self.mime.as_str());
86+
let first_byte: u8 = if mi == WellKnownMIME::Unknown {
87+
// Bad
88+
self.mime.len() as u8
89+
} else {
90+
// Goodmi
91+
0x80 | mi.raw()
92+
};
93+
let payload_size = self.payload.len();
94+
95+
bf.put_u8(first_byte);
96+
if first_byte & 0x80 == 0 {
97+
bf.put_slice(self.mime.as_bytes());
98+
}
99+
U24::write(payload_size as u32, bf);
100+
bf.put(self.payload.bytes());
101+
}
102+
103+
pub fn bytes(&self) -> Bytes {
104+
let mut bf = BytesMut::new();
105+
self.write_to(&mut bf);
106+
bf.freeze()
107+
}
108+
}

src/frame/setup.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{Body, Frame, PayloadSupport, Version, Writeable, FLAG_METADATA, FLAG_RESUME};
2-
use crate::mime::MIME_BINARY;
2+
use crate::mime::APPLICATION_BINARY;
33
use crate::result::RSocketResult;
44
use bytes::{Buf, BufMut, Bytes, BytesMut};
55
use std::time::Duration;
@@ -133,8 +133,8 @@ impl SetupBuilder {
133133
keepalive: 30_000,
134134
lifetime: 90_000,
135135
token: None,
136-
mime_metadata: String::from(MIME_BINARY),
137-
mime_data: String::from(MIME_BINARY),
136+
mime_metadata: String::from(APPLICATION_BINARY),
137+
mime_data: String::from(APPLICATION_BINARY),
138138
metadata: None,
139139
data: None,
140140
},

src/frame/utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use super::FLAG_METADATA;
22
use bytes::{Buf, BufMut, Bytes, BytesMut};
33

4-
pub struct U24 {}
4+
pub struct U24;
55

66
impl U24 {
7+
pub fn max() -> usize {
8+
0x00FFFFFF
9+
}
10+
711
pub fn write(n: u32, bf: &mut BytesMut) {
812
bf.put_u8((0xFF & (n >> 16)) as u8);
913
bf.put_u8((0xFF & (n >> 8)) as u8);

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#[macro_use]
66
extern crate log;
77
#[macro_use]
8+
extern crate lazy_static;
9+
#[macro_use]
810
extern crate matches;
911

1012
pub mod extension;

0 commit comments

Comments
 (0)