Skip to content

Commit 4dfcd9a

Browse files
committed
Ignore any trailing null char from MQTT payloads
Such trailing null char are added by collectd (following C convention) and preventing tedge flow scripts to decode the payloads. Previously this trailing char was remove on read by `Payload.as_bytes()`. However, a bug has been introduced with `MqttMessage.split()` which was failing to clean the payload passed to tedge flows. => Now any trailing null char is removed when the payload is received from MQTT. Signed-off-by: Didier Wenzek <[email protected]>
1 parent f5297bb commit 4dfcd9a

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

crates/common/mqtt_channel/src/messages.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl Debug for DebugPayload {
7474

7575
impl From<String> for DebugPayload {
7676
fn from(value: String) -> Self {
77-
DebugPayload(value.into_bytes())
77+
DebugPayload::new(value)
7878
}
7979
}
8080

@@ -86,7 +86,7 @@ impl From<DebugPayload> for Vec<u8> {
8686

8787
impl From<Vec<u8>> for DebugPayload {
8888
fn from(value: Vec<u8>) -> Self {
89-
DebugPayload(value)
89+
DebugPayload::new(value)
9090
}
9191
}
9292

@@ -132,14 +132,14 @@ impl<'de> Deserialize<'de> for DebugPayload {
132132
where
133133
E: serde::de::Error,
134134
{
135-
Ok(DebugPayload(value.as_bytes().to_vec()))
135+
Ok(DebugPayload::new(value))
136136
}
137137

138138
fn visit_bytes<E>(self, value: &[u8]) -> Result<Self::Value, E>
139139
where
140140
E: serde::de::Error,
141141
{
142-
Ok(DebugPayload(value.to_vec()))
142+
Ok(DebugPayload::new(value))
143143
}
144144
}
145145

@@ -157,15 +157,24 @@ impl Display for DebugPayload {
157157
}
158158

159159
impl DebugPayload {
160+
/// Remove any trailing null char
161+
fn new(payload: impl Into<Vec<u8>>) -> Self {
162+
let mut payload = payload.into();
163+
if payload.ends_with(b"\0") {
164+
payload.pop();
165+
};
166+
DebugPayload(payload)
167+
}
168+
160169
/// The payload string (unless this payload is not UTF8)
161170
pub fn as_str(&self) -> Result<&str, MqttError> {
162171
let bytes = self.as_bytes();
163172
std::str::from_utf8(bytes).map_err(|err| MqttError::new_invalid_utf8_payload(bytes, err))
164173
}
165174

166-
/// The bytes of the payload (except any trailing null char)
175+
/// The bytes of the payload
167176
pub fn as_bytes(&self) -> &[u8] {
168-
self.0.strip_suffix(&[0]).unwrap_or(self.0.as_slice())
177+
self.0.as_slice()
169178
}
170179
}
171180

@@ -179,7 +188,7 @@ impl MqttMessage {
179188
{
180189
MqttMessage {
181190
topic: topic.clone(),
182-
payload: DebugPayload(payload.into()),
191+
payload: DebugPayload::new(payload),
183192
qos: QoS::AtLeastOnce,
184193
retain: false,
185194
}
@@ -241,7 +250,7 @@ impl From<Publish> for MqttMessage {
241250

242251
MqttMessage {
243252
topic: Topic::new_unchecked(&topic),
244-
payload: DebugPayload(payload.to_vec()),
253+
payload: DebugPayload::new(payload),
245254
qos,
246255
retain,
247256
}
@@ -317,7 +326,7 @@ mod tests {
317326
fn message_serialize_deserialize() {
318327
let message = MqttMessage {
319328
topic: Topic::new("test").unwrap(),
320-
payload: DebugPayload("test-payload".as_bytes().to_vec()),
329+
payload: DebugPayload::new("test-payload"),
321330
qos: QoS::AtMostOnce,
322331
retain: true,
323332
};

0 commit comments

Comments
 (0)