Skip to content

Commit e31bc5a

Browse files
authored
Merge pull request #484 from joerggollnick/main
new service `mqtt_filter` and add mqttwarn.openrc script
2 parents 4594a75 + fcc055a commit e31bc5a

File tree

3 files changed

+148
-0
lines changed

3 files changed

+148
-0
lines changed

HANDBOOK.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ _mqttwarn_ supports a number of services (listed alphabetically below):
326326
* mastodon (see [tootpaste](#tootpaste))
327327
* [mattermost](#mattermost)
328328
* [mqtt](#mqtt)
329+
* [mqtt_filter](#mqtt_filter)
329330
* [mqttpub](#mqttpub)
330331
* [mysql](#mysql)
331332
* [mysql_dynamic](#mysql_dynamic)
@@ -1423,6 +1424,74 @@ This shows the currently full configuration possible. Global values from the
14231424
authentication (`auth`) or (`tls`) you may omit those sections. (The `defaults`
14241425
section must exist.)
14251426
1427+
### `mqtt_filter`
1428+
1429+
The `mqtt_filter` target executes the specified program and its arguments. It is similar
1430+
to `pipe` but it doesn't open a pipe to the program. It provides stdout as response
1431+
to a configured queue.
1432+
Example use cases are e.g. IoT buttons which publish a message when they are pushed
1433+
and they execute an external program. It is also a clone of [mqtt-launcher](https://github.com/jpmens/mqtt-launcher).
1434+
With no response configured it acts like `execute` with multiple arguments.
1435+
1436+
To pass the published data (json args array) to the command, use `{args[0]}` and `{args[1]}` which then gets replaced. Message looks like `'{ "args" : ["' + temp + '","' + room + '"] }'` for `fr
1437+
itzctl`.
1438+
1439+
outgoing_topic is constructed by parts of incoming topic or as full_incoming topic.
1440+
1441+
```ini
1442+
[config:mqtt_filter]
1443+
targets = {
1444+
# full_topic, topic[0], topic[1], args[0], .....
1445+
# touch file /tmp/executed
1446+
'touch' : [ None,0,0,'touch', '/tmp/executed' ],
1447+
# uses firtzctl to set temperature of a room
1448+
'fritzctl' : [ None,0,0,'/usr/bin/fritzctl','--loglevel=ERROR','temperature', "{args[0]}", "{args[1]}" ]
1449+
# performs a dirvish backup and writes stdout as a new messages to response topic
1450+
'backup' : ["response/{topic[1]}/{topic[2]}",0,0,'/usr/bin/sudo','/usr/sbin/dirvish','--vault', "{args[0]}" ],
1451+
}
1452+
```
1453+
1454+
Use case for fritzctl is to change the requested temperature for a connected thermostat.
1455+
Topic is constructed as /home/{room}/temperature/{action}.
1456+
1457+
```
1458+
def TemperatureConvert( data=None, srv=None):
1459+
1460+
# optional debug logger
1461+
if srv is not None:
1462+
srv.logging.debug('data={data}, srv={srv}'.format(**locals()))
1463+
1464+
topic = str( data.get('topic','') )
1465+
1466+
# init
1467+
room = ''
1468+
action = 'status'
1469+
1470+
# /home/{room}/temperature/{action}
1471+
parts = topic.split('/')
1472+
1473+
for idx, part in enumerate( parts ):
1474+
if idx == 1:
1475+
room = part
1476+
1477+
if idx == 3:
1478+
action = part
1479+
1480+
temp = str( data.get('payload','sav') )
1481+
if temp == '':
1482+
temp = 'sav'
1483+
1484+
if action == 'set':
1485+
cmd = '{ "args" : ["' + temp + '","' + room + '"] }'
1486+
1487+
return cmd
1488+
```
1489+
1490+
Use case for backup is to run a dirvish backup triggered by a simple mqtt message.
1491+
1492+
Note, that for each message targeted to the `mqtt_filter` service, a new process is
1493+
spawned (fork/exec), so it is quite "expensive".
1494+
14261495
### `mqttpub`
14271496
14281497
This service publishes a message to the broker _mqttwarn_ is connected to. (To

etc/mqttwarn.openrc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/sbin/openrc-run
2+
3+
command="/usr/bin/python /opt/mqttwarn/mqttwarn.py"
4+
command_args="${MQTTWARN_OPTIONS}"
5+
command_background=yes
6+
pidfile=/run/mqttwarn.pid
7+
directory=/opt/mqttwarn
8+
9+
name="mqttwarn"
10+
description="mqttwarn pluggable mqtt notification service"
11+
12+
depend() {
13+
need net
14+
}

mqttwarn/services/mqtt_filter.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
__author__ = 'Joerg Gollnick <github+mqttwarn()wurzelbenutzer.de>'
5+
__copyright__ = 'Copyright 2016 Tobias Brunner / 2021 Joerg Gollnick'
6+
__license__ = """Eclipse Public License - v 1.0 (http://www.eclipse.org/legal/epl-v10.html)"""
7+
8+
import subprocess
9+
import json
10+
from pipes import quote
11+
from six import string_types
12+
13+
def plugin(srv, item):
14+
15+
srv.logging.debug("*** MODULE=%s: service=%s, target=%s", __file__, item.service, item.target)
16+
17+
# same as for ssh
18+
json_message=json.loads(item.message)
19+
20+
args = None
21+
if json_message is not None:
22+
args = json_message["args"]
23+
24+
if type(args) is list and len(args) == 1:
25+
args=args[0]
26+
27+
if type(args) is list:
28+
args=tuple([ quote(v) for v in args ]) #escape the shell args
29+
elif type(args) is str or type(args) is unicode:
30+
args=(quote(args),)
31+
32+
# parse topic
33+
topic=list(map( lambda x: quote(x), item.topic.split('/') ))
34+
35+
outgoing_topic = None
36+
# replace palceholders args[0], args[1] ..., full_topic, topic[0],
37+
if item.addrs[0] is not None:
38+
outgoing_topic = item.addrs[0].format(args=args,full_topic=quote(item.topic),topic=topic)
39+
qos = item.addrs[1]
40+
retain = item.addrs[2]
41+
addrs = item.addrs[3:]
42+
43+
cmd = None
44+
if addrs is not None:
45+
cmd = [i.format(args=args, full_topic=quote(item.topic),topic=topic) for i in addrs]
46+
47+
srv.logging.debug("*** MODULE=%s: service=%s, command=%s outgoing_topic=%s", __file__, item.service, str( cmd ),outgoing_topic)
48+
49+
try:
50+
res = subprocess.check_output(cmd, stdin=None, stderr=subprocess.STDOUT, shell=False, universal_newlines=True, cwd='/tmp')
51+
except Exception as e:
52+
srv.logging.warning("Cannot execute %s because %s" % (cmd, e))
53+
return False
54+
55+
if outgoing_topic is not None:
56+
outgoing_payload = res.rstrip('\n')
57+
if isinstance(outgoing_payload, string_types):
58+
outgoing_payload = bytearray(outgoing_payload, encoding='utf-8')
59+
try:
60+
srv.mqttc.publish(outgoing_topic, outgoing_payload, qos=qos, retain=retain)
61+
except Exception as e:
62+
srv.logging.warning("Cannot PUBlish response %s: %s" % (outgoing_topic, e))
63+
return False
64+
65+
return True

0 commit comments

Comments
 (0)