Esmqtt is a tool that can subscribe to receive messages through the MQTT protocol and forward them to Elasticsearch.
- Support for MQTT Protocol V3, V4
- Forward MQTT messages to Elasticsearch.
- Support message rule routing, customize message subject to correspond to the specified Elasticsearch indexes.
- Support message persistence
go install github.com/talkincode/esmqtt
esmqtt -install
Use systemd to manage services on a Linux system
systemctl <start | stop | restart> esmqtt
```bash
docker-compose up -d
- yaml
/etc/esmqtt.yml
appid: esmqtt
location: Asia/Shanghai
workdir: /var/esmqtt
debug: true
logger:
mode: development
console_enable: true
loki_enable: false
file_enable: true
filename: /var/esmqtt/esmqtt.log
queue_size: 4096
loki_api: http://127.0.0.1:3100
loki_user: esmqtt
loki_pwd: esmqtt
loki_job: esmqtt
metrics_storage: /var/esmqtt/data/metrics
metrics_history: 168
mqtt:
server: ""
username: ""
password: ""
debug: false
elastic:
server: http://127.0.0.1:9200
api_key: ""
username: elastic
password: elastic
debug: false
- ENVIRONMENT VARIABLE
.env
ESMQTT_SYSTEM_WORKER_DIR=/tmp/esmqtt
ESMQTT_SYSTEM_DEBUG=true
ESMQTT_MQTT_SERVER=tcp://127.0.0.1:1883
ESMQTT_MQTT_USERNAME=esmqtt
ESMQTT_MQTT_PASSWORD=
ESMQTT_MQTT_DEBUG=true
ESMQTT_ELASTIC_SERVER=https://localhost:9200
ESMQTT_ELASTIC_APIKEY=
ESMQTT_ELASTIC_USERNAME=elastic
ESMQTT_ELASTIC_PASSWORD=elastic
ESMQTT_ELASTIC_DEBUG=true
ESMQTT_LOGGER_JOB=esmqtt
ESMQTT_LOGGER_SERVER=
ESMQTT_LOGGER_USERNAME=esmqtt
ESMQTT_LOGGER_PASSWORD=
ESMQTT_LOGGER_MODE=development
ESMQTT_LOGGER_LOKI_ENABLE=false
ESMQTT_LOGGER_FILE_ENABLE=true
/var/esmqtt/rules.json
[
{
"topic": "testnode/elastic/message/create",
"index": "testnode_message",
"spliter": "day"
}
]- The program will load the file at startup, and if it does not exist, the program will automatically subscribe to the topic
elastic/message/create. The index depends on theindexfield specified in the message. - If neither the
indexfield value is specified in the message nor in the rule, the message will be ignored. - When using a rule, if no
indexfield value is specified in the message, theindexfield value in the rule will be used. The value ofspliteris "year | month | day | hour" (or null), and the index name is generated according to the date rule. For exampletestnode_message_2021-01-01,testnode_message_2021-01,testnode_message_2021.
{
"data": {
"id": "1312313142",
"index": "testindex",
"payload": {
"name": "test", "value": 100
},
"vector": [],
"timestamp": 1698827090749
}
}data.idspecifies the document ID that the message is forwarded to, if not specified in the message, the internally generated UUID will be used.data.indexspecifies the index to which the message will be forwarded, if not specified in the message, the value of theindexfield in the rule will be used, if not specified in the rule, the message will be ignored.data.payloadis a custom object, if it is empty, the message will be ignored.data.timestampis the message timestamp, if it is empty, the current timestamp will be used.data.vectoris the message vector, if it is empty, an empty array will be used (this field is designed for GPT model, with dimension 1536, for specific scenarios, it can be empty).