Skip to content

Commit 2b2ccdf

Browse files
authored
feat: add mqtt properties in meta (#4010)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent d2237ad commit 2b2ccdf

File tree

3 files changed

+21
-0
lines changed

3 files changed

+21
-0
lines changed

docs/en_US/guide/sources/builtin/mqtt.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,16 @@ For those who prefer a hands-on approach, the Command Line Interface (CLI) provi
188188

189189
More details can be found at [Streams Management with CLI](../../../api/cli/streams.md).
190190

191+
## Message Properties (MQTT v5)
192+
193+
When `protocolVersion` is set to `5`, MQTT v5 `User Properties` from incoming messages will be added into the message metadata (`meta`) under the key `properties` as a `map[string]string`.
194+
195+
Example:
196+
197+
```sql
198+
SELECT meta(properties) AS props FROM demo
199+
```
200+
191201
## Migration Guide
192202

193203
Starting from version 1.5.0, eKuiper has modified the MQTT source broker configuration, transitioning from `servers` to `server`. As a result, users can now specify only a single MQTT broker address, as opposed to an array of addresses.

docs/zh_CN/guide/sources/builtin/mqtt.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,16 @@ REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动
181181

182182
详细操作步骤及命令解释,可参考 [通过 CLI 进行流管理](../../../api/cli/streams.md)。
183183

184+
## 消息属性(MQTT v5)
185+
186+
当 `protocolVersion` 设置为 `5` 时,MQTT v5 的 `User Properties` 会被写入消息元数据 `meta` 中,key 为 `properties`,类型为 `map[string]string`。
187+
188+
**示例**
189+
190+
```sql
191+
SELECT meta(properties) AS props FROM demo
192+
```
193+
184194
## 迁移指南
185195

186196
从 eKuiper 1.5.0 开始,eKuiper 将 MQTT 源地址配置从 `servers` 更改为 `server`,即用户只能配置一个 MQTT 源地址而不是一个地址数组。对需要进行版本升级的用户:

internal/io/mqtt/source.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg any, ingest api.
123123
if tid, ok := props["traceparent"]; ok {
124124
meta["traceId"] = tid
125125
}
126+
meta["properties"] = props
126127
}
127128
ingest(ctx, payload, meta, rcvTime)
128129
}

0 commit comments

Comments
 (0)