Skip to content

Commit 5101526

Browse files
committed
disallow any control characters in messages
rework later on as part of #2 Signed-off-by: Lance-Drane <[email protected]>
1 parent 700830d commit 5101526

File tree

2 files changed

+85
-28
lines changed

2 files changed

+85
-28
lines changed

shared-deps/src/intersect_messaging.rs

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,29 +38,50 @@ pub fn should_message_passthrough(
3838
// The values are just the channel concatenated with the message string, separated by a non-printable byte (1)
3939
// since channels always follow a specific format, but messages can have many arbitrary characters in them, list the channel first.
4040

41-
/// build the event source data string
42-
pub fn make_eventsource_data(channel: &str, msg_str: &str) -> String {
43-
format!("{}{}{}", channel, DELIMITER, msg_str)
44-
}
45-
4641
#[derive(Debug)]
47-
pub struct ExtractEventSourceErr;
42+
pub struct IntersectEventSourceErr;
4843

49-
impl std::fmt::Display for ExtractEventSourceErr {
44+
impl std::fmt::Display for IntersectEventSourceErr {
5045
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
5146
write!(f, "event source data was not properly formatted")
5247
}
5348
}
5449

55-
impl std::error::Error for ExtractEventSourceErr {}
50+
impl std::error::Error for IntersectEventSourceErr {}
51+
52+
fn contains_control_characters(channel: &str, msg_str: &str) -> bool {
53+
channel.as_bytes().iter().any(|byte| *byte < b' ')
54+
|| msg_str.as_bytes().iter().any(|byte| *byte < b' ')
55+
}
56+
57+
/// build the event source data string
58+
pub fn make_eventsource_data(
59+
channel: &str,
60+
msg_str: &str,
61+
) -> Result<String, IntersectEventSourceErr> {
62+
// TODO rework this when we switch to strictly binary characters
63+
if contains_control_characters(channel, msg_str) {
64+
tracing::warn!("Data from SSE should not have any control characters");
65+
return Err(IntersectEventSourceErr);
66+
}
67+
Ok(format!("{}{}{}", channel, DELIMITER, msg_str))
68+
}
5669

5770
/// returns a tuple of the channel string and the event source string
58-
pub fn extract_eventsource_data(data: &str) -> Result<(String, String), ExtractEventSourceErr> {
71+
pub fn extract_eventsource_data(data: &str) -> Result<(String, String), IntersectEventSourceErr> {
5972
match data.split_once(DELIMITER) {
60-
Some((channel, msg_str)) => Ok((channel.to_owned(), msg_str.to_owned())),
73+
Some((channel, msg_str)) => {
74+
// TODO rework this when we switch to strictly binary payloads
75+
// we technically could allow for control characters which aren't '\n' or '\r', but it's best to prohibit this
76+
if contains_control_characters(channel, msg_str) {
77+
tracing::warn!("Data from SSE should not have any control characters");
78+
return Err(IntersectEventSourceErr);
79+
}
80+
Ok((channel.to_owned(), msg_str.to_owned()))
81+
}
6182
None => {
6283
tracing::warn!("Data from SSE does not match expected format: {}", data);
63-
Err(ExtractEventSourceErr)
84+
Err(IntersectEventSourceErr)
6485
}
6586
}
6687
}
@@ -104,14 +125,46 @@ mod tests {
104125
#[test]
105126
fn encode_decode_eventsource_msg_idempotent() {
106127
let channel = "channel";
107-
let message = "mess\x01age\x01"; // message also uses delimiter but can still be restored exactly as-is
128+
let message = "message";
108129

109-
let encoded = make_eventsource_data(channel, message);
130+
let encoded = make_eventsource_data(channel, message).unwrap();
110131
// our delimiter only adds one byte to all the data we want to push through
111132
assert!(encoded.len() == channel.len() + message.len() + 1);
112133

113134
let (decoded_channel, decoded_message) = extract_eventsource_data(&encoded).unwrap();
114135
assert_eq!(channel, decoded_channel);
115136
assert_eq!(message, decoded_message);
116137
}
138+
139+
#[test]
140+
fn disallow_control_characters_in_eventsource_parts() {
141+
let channel = "channel\n";
142+
let msg_str = "message";
143+
let result = make_eventsource_data(channel, msg_str);
144+
assert!(result.is_err());
145+
146+
let channel = "channel";
147+
let msg_str = "message\n";
148+
let result = make_eventsource_data(channel, msg_str);
149+
assert!(result.is_err());
150+
151+
let channel = "channel";
152+
let msg_str = "message";
153+
let result = make_eventsource_data(channel, msg_str);
154+
assert!(result.is_ok());
155+
}
156+
157+
#[test]
158+
fn disallow_control_characters_in_eventsource() {
159+
let encoded = "channel\x01message\n";
160+
let result = extract_eventsource_data(&encoded);
161+
assert!(result.is_err());
162+
}
163+
164+
#[test]
165+
fn require_delimiter_in_eventsource() {
166+
let encoded = "channelmessage";
167+
let result = extract_eventsource_data(&encoded);
168+
assert!(result.is_err());
169+
}
117170
}

shared-deps/src/protocols/amqp/subscribe.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -159,22 +159,26 @@ async fn consume_message(
159159
}
160160
Ok(true) => {
161161
let topic = deliver.routing_key();
162-
let event = make_eventsource_data(topic, &utf8_data);
163-
tracing::debug!("consume delivery {} , data: {}", deliver, event,);
164-
// TODO handle this better later, see broadcast() documentation for details.
165-
tokio::select! {
166-
_ = killswitch => {
167-
// WARNING: in the client implementation, this may happen while waiting on a response, resulting in us rejecting a message we actually passed through successfully
168-
// this would only happen if we actually call publish_event_to_http(), if the killswitch was toggled before reaching here we will always do the killswitch branch.
169-
tracing::warn!("Got message from broker but did not send it over HTTP, the message will be rejected.");
170-
should_ack = false;
171-
},
172-
http_result = broadcaster.publish_event_to_http(event) => {
173-
if !http_result {
174-
tracing::warn!("Some clients may not have gotten a message, the message will be rejected.");
175-
should_ack = false;
162+
match make_eventsource_data(topic, &utf8_data) {
163+
Err(_) => {}
164+
Ok(event) => {
165+
tracing::debug!("consume delivery {} , data: {}", deliver, event,);
166+
// TODO handle this better later, see broadcast() documentation for details.
167+
tokio::select! {
168+
_ = killswitch => {
169+
// WARNING: in the client implementation, this may happen while waiting on a response, resulting in us rejecting a message we actually passed through successfully
170+
// this would only happen if we actually call publish_event_to_http(), if the killswitch was toggled before reaching here we will always do the killswitch branch.
171+
tracing::warn!("Got message from broker but did not send it over HTTP, the message will be rejected.");
172+
should_ack = false;
173+
},
174+
http_result = broadcaster.publish_event_to_http(event) => {
175+
if !http_result {
176+
tracing::warn!("Some clients may not have gotten a message, the message will be rejected.");
177+
should_ack = false;
178+
}
179+
},
176180
}
177-
},
181+
}
178182
}
179183
}
180184
}

0 commit comments

Comments
 (0)