Skip to content

Commit d724dbc

Browse files
committed
feat: add sparkplug b encoder
needs error handling
1 parent d1de077 commit d724dbc

File tree

3 files changed

+36
-2
lines changed

3 files changed

+36
-2
lines changed

app/src/actions/Publish.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { AppState } from '../reducers'
33
import { Base64Message } from '../../../backend/src/Model/Base64Message'
44
import { Dispatch } from 'redux'
55
import { makePublishEvent, rendererEvents } from '../../../events'
6+
import { Decoder } from '../../../backend/src/Model/Decoder'
67

78
export const setTopic = (topic?: string): Action => {
89
return {
@@ -47,6 +48,14 @@ export const publish = (connectionId: string) => (dispatch: Dispatch<Action>, ge
4748
retain: state.publish.retain,
4849
qos: state.publish.qos,
4950
}
51+
52+
if (
53+
mqttMessage.payload &&
54+
mqttMessage.topic.match(/spBv1\.0\/[^/]+\/(DDATA|NDATA|NCMD|DCMD|NBIRTH|DBIRTH|NDEATH|DDEATH\/[^/]+\/)/u)
55+
) {
56+
mqttMessage.payload.decoder = Decoder.SPARKPLUG
57+
}
58+
5059
rendererEvents.emit(publishEvent, mqttMessage)
5160
}
5261

backend/src/DataSource/MqttSource.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { Client, connect as mqttConnect } from 'mqtt'
44
import { DataSource, DataSourceStateMachine } from './'
55
import { MqttMessage } from '../../../events'
66
import { Base64Message } from '../Model/Base64Message'
7+
import { Decoder } from '../Model/Decoder'
8+
import { SparkplugEncoder } from '../Model/sparkplugb'
79

810
export interface MqttOptions {
911
url: string
@@ -47,7 +49,6 @@ export class MqttSource implements DataSource<MqttOptions> {
4749
throw error
4850
}
4951

50-
5152
const client = mqttConnect(url.toString(), {
5253
resubscribe: false,
5354
rejectUnauthorized: options.certValidation,
@@ -99,7 +100,12 @@ export class MqttSource implements DataSource<MqttOptions> {
99100

100101
public publish(msg: MqttMessage) {
101102
if (this.client) {
102-
this.client.publish(msg.topic, msg.payload ? Base64Message.toUnicodeString(msg.payload) : '', {
103+
let payload: string | Buffer = msg.payload ? Base64Message.toUnicodeString(msg.payload) : ''
104+
if (msg.payload?.decoder === Decoder.SPARKPLUG) {
105+
payload = SparkplugEncoder.encode(payload) || payload
106+
}
107+
108+
this.client.publish(msg.topic, payload, {
103109
qos: msg.qos,
104110
retain: msg.retain,
105111
})

backend/src/Model/sparkplugb.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,22 @@ export const SparkplugDecoder = {
2222
}
2323
}
2424
}
25+
26+
export const SparkplugEncoder = {
27+
encode(input: string): Buffer | undefined {
28+
try {
29+
console.log(input)
30+
const payload = JSON.parse(input)
31+
return Buffer.from(
32+
SparkplugPayload.encode(
33+
SparkplugPayload.create({
34+
timestamp: Date.now(),
35+
...payload,
36+
})
37+
).finish()
38+
)
39+
} catch (err) {
40+
// todo ?
41+
}
42+
},
43+
}

0 commit comments

Comments
 (0)