@@ -2,15 +2,13 @@ use crate::command::Command;
22use crate :: log:: MaybeFancy ;
33use base64:: prelude:: * ;
44use camino:: Utf8PathBuf ;
5- use mqtt_channel:: MqttMessage ;
65use mqtt_channel:: QoS ;
76use mqtt_channel:: StreamExt ;
87use mqtt_channel:: TopicFilter ;
98use std:: time:: Duration ;
109use tedge_config:: tedge_toml:: MqttAuthClientConfig ;
1110use tedge_config:: TEdgeConfig ;
1211use tokio:: io:: AsyncWriteExt ;
13- use tracing:: error;
1412use tracing:: info;
1513
1614const DEFAULT_QUEUE_CAPACITY : usize = 10 ;
@@ -91,27 +89,25 @@ async fn subscribe(cmd: &MqttSubscribeCommand) -> Result<(), anyhow::Error> {
9189 break ;
9290 }
9391
94- let message = if cmd. base64 {
95- MqttMessage :: new (
96- & message. topic ,
97- BASE64_STANDARD . encode ( message. payload_bytes ( ) ) ,
98- )
92+ let payload = if cmd. base64 {
93+ BASE64_STANDARD . encode ( message. payload_bytes ( ) )
9994 } else {
100- message
95+ match message. payload_str ( ) {
96+ Ok ( payload_str) => payload_str. to_string ( ) ,
97+ Err ( _) => format ! (
98+ "<ERR=NON-UTF8> {}" ,
99+ BASE64_STANDARD . encode( message. payload_bytes( ) )
100+ ) ,
101+ }
101102 } ;
102103
103- match message. payload_str ( ) {
104- Ok ( payload) => {
105- let line = if cmd. hide_topic {
106- format ! ( "{payload}\n " )
107- } else {
108- format ! ( "[{}] {payload}\n " , & message. topic)
109- } ;
110- let _ = stdout. write_all ( line. as_bytes ( ) ) . await ;
111- let _ = stdout. flush ( ) . await ;
112- }
113- Err ( err) => error ! ( target: "MQTT" , "{err}" ) ,
114- }
104+ let line = if cmd. hide_topic {
105+ format ! ( "{payload}\n " )
106+ } else {
107+ format ! ( "[{}] {payload}\n " , & message. topic)
108+ } ;
109+ let _ = stdout. write_all ( line. as_bytes ( ) ) . await ;
110+ let _ = stdout. flush ( ) . await ;
115111
116112 n_messages += 1 ;
117113 if matches ! ( cmd. count, Some ( count) if count > 0 && n_messages >= count) {
0 commit comments