Skip to content

Commit 4eac036

Browse files
committed
Configure ignored and unauthenticated message type which are not disconnected
1 parent 80e6eb0 commit 4eac036

File tree

6 files changed

+66
-21
lines changed

6 files changed

+66
-21
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,16 @@ prerequisites
6464
watch -c 'curl -s localhost:9090/metrics | grep mqtt | egrep -v '^#''
6565
```
6666
67+
## Configuration
68+
### Examples
69+
70+
- Ignore subscribe / unsubscribe requests
71+
72+
```
73+
mqtt-proxy server --mqtt.publisher.name=noop --mqtt.handler.ignore-unsupported SUBSCRIBE --mqtt.handler.ignore-unsupported UNSUBSCRIBE
74+
```
75+
76+
6777
## Metrics
6878
6979

cmd/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ func registerServer(m map[string]setupFunc, app *kingpin.Application) {
4242
cmd.Flag("mqtt.server-tls-key", "TLS Key for the MQTT server, leave blank to disable TLS").Default("").StringVar(&cfg.MQTT.TLSSrv.Key)
4343
cmd.Flag("mqtt.server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").StringVar(&cfg.MQTT.TLSSrv.ClientCA)
4444

45-
cmd.Flag("mqtt.handler.allow-unauthenticated", "Allow unauthenticated MQTT requests.").Default("false").BoolVar(&cfg.MQTT.Handler.AllowUnauthenticated)
45+
cmd.Flag("mqtt.handler.ignore-unsupported", "List of unsupported messages which are ignored. One of: [SUBSCRIBE, UNSUBSCRIBE]").PlaceHolder("MSG").EnumsVar(&cfg.MQTT.Handler.IgnoreUnsupported, "SUBSCRIBE", "UNSUBSCRIBE")
46+
cmd.Flag("mqtt.handler.allow-unauthenticated", "List of messages for which connection is not disconnected if unauthenticated request is received. One of: [PUBLISH, PUBREL, PINGREQ]").PlaceHolder("MSG").EnumsVar(&cfg.MQTT.Handler.AllowUnauthenticated, "PUBLISH", "PUBREL", "PINGREQ")
4647
cmd.Flag("mqtt.handler.publish.timeout", "Maximum duration of sending publish request to broker.").Default("0s").DurationVar(&cfg.MQTT.Handler.Publish.Timeout)
4748
cmd.Flag("mqtt.handler.publish.async.at-most-once", "Async publish for AT_MOST_ONCE QoS.").Default("false").BoolVar(&cfg.MQTT.Handler.Publish.Async.AtMostOnce)
4849
cmd.Flag("mqtt.handler.publish.async.at-least-once", "Async publish for AT_LEAST_ONCE QoS.").Default("false").BoolVar(&cfg.MQTT.Handler.Publish.Async.AtLeastOnce)
@@ -128,6 +129,7 @@ func runServer(
128129
}
129130

130131
handler := mqtthandler.New(logger, registry, publisher,
132+
mqtthandler.WithIgnoreUnsupported(cfg.MQTT.Handler.IgnoreUnsupported),
131133
mqtthandler.WithAllowUnauthenticated(cfg.MQTT.Handler.AllowUnauthenticated),
132134
mqtthandler.WithPublishTimeout(cfg.MQTT.Handler.Publish.Timeout),
133135
mqtthandler.WithPublishAsyncAtMostOnce(cfg.MQTT.Handler.Publish.Async.AtMostOnce),

pkg/config/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type Server struct {
3535
ClientCA string
3636
}
3737
Handler struct {
38-
AllowUnauthenticated bool
38+
IgnoreUnsupported []string
39+
AllowUnauthenticated []string
3940
Publish struct {
4041
Timeout time.Duration
4142
Async struct {

pkg/mqtt/handler/handler.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,25 @@ func (h *MQTTHandler) HandleFunc(messageType byte, handlerFunc mqttserver.Handle
3636
h.mux.Handle(messageType, handlerFunc)
3737
}
3838

39-
func (h *MQTTHandler) isAuthenticated(conn mqttserver.Conn, messageName string) bool {
40-
if h.opts.allowUnauthenticated {
41-
return true
42-
}
43-
if !conn.Properties().Authenticated() {
44-
h.logger.Warnf("Unauthenticated '%s' from /%v", messageName, conn.RemoteAddr())
45-
_ = conn.Close()
39+
func (h *MQTTHandler) disconnectUnauthenticated(conn mqttserver.Conn, packet mqttcodec.ControlPacket) bool {
40+
if conn.Properties().Authenticated() {
4641
return false
4742
}
43+
name := packet.Name()
44+
for _, v := range h.opts.allowUnauthenticated {
45+
if v == name {
46+
return false
47+
}
48+
}
49+
h.logger.Warnf("Unauthenticated '%s' from /%v", name, conn.RemoteAddr())
50+
_ = conn.Close()
4851
return true
4952
}
5053

5154
func (h *MQTTHandler) handleConnect(conn mqttserver.Conn, packet mqttcodec.ControlPacket) {
5255
req := packet.(*mqttcodec.ConnectPacket)
5356

54-
h.logger.Infof("Handling MQTT message '%s' from /%v", req.MessageName(), conn.RemoteAddr())
57+
h.logger.Infof("Handling MQTT message '%s' from /%v", req.Name(), conn.RemoteAddr())
5558

5659
//TODO: login with user password
5760

@@ -72,12 +75,12 @@ func (h *MQTTHandler) handleConnect(conn mqttserver.Conn, packet mqttcodec.Contr
7275
func (h *MQTTHandler) handlePublish(conn mqttserver.Conn, packet mqttcodec.ControlPacket) {
7376
req := packet.(*mqttcodec.PublishPacket)
7477

75-
h.logger.Debugf("Handling MQTT message '%s' from /%v", req.MessageName(), conn.RemoteAddr())
76-
77-
if !h.isAuthenticated(conn, req.MessageName()) {
78+
if h.disconnectUnauthenticated(conn, packet) {
7879
return
7980
}
8081

82+
h.logger.Debugf("Handling MQTT message '%s' from /%v", req.Name(), conn.RemoteAddr())
83+
8184
var publishCallback apis.PublishCallbackFunc
8285

8386
switch req.Qos {
@@ -180,11 +183,11 @@ func newPublishRequest(req *mqttcodec.PublishPacket) *apis.PublishRequest {
180183
func (h *MQTTHandler) handlePublishRelease(conn mqttserver.Conn, packet mqttcodec.ControlPacket) {
181184
req := packet.(*mqttcodec.PubrelPacket)
182185

183-
if !h.isAuthenticated(conn, req.MessageName()) {
186+
if h.disconnectUnauthenticated(conn, packet) {
184187
return
185188
}
186189

187-
h.logger.Debugf("Handling MQTT message '%s' from /%v", req.MessageName(), conn.RemoteAddr())
190+
h.logger.Debugf("Handling MQTT message '%s' from /%v", req.Name(), conn.RemoteAddr())
188191
res := mqttcodec.NewControlPacket(mqttcodec.PUBCOMP).(*mqttcodec.PubcompPacket)
189192
res.MessageID = req.MessageID
190193
err := res.Write(conn)
@@ -196,7 +199,11 @@ func (h *MQTTHandler) handlePublishRelease(conn mqttserver.Conn, packet mqttcode
196199
func (h *MQTTHandler) handlePing(conn mqttserver.Conn, packet mqttcodec.ControlPacket) {
197200
req := packet.(*mqttcodec.PingreqPacket)
198201

199-
h.logger.Debugf("Handling MQTT message '%s' from /%v", req.MessageName(), conn.RemoteAddr())
202+
if h.disconnectUnauthenticated(conn, packet) {
203+
return
204+
}
205+
206+
h.logger.Debugf("Handling MQTT message '%s' from /%v", req.Name(), conn.RemoteAddr())
200207
res := mqttcodec.NewControlPacket(mqttcodec.PINGRESP)
201208
err := res.Write(conn)
202209
if err != nil {
@@ -206,13 +213,17 @@ func (h *MQTTHandler) handlePing(conn mqttserver.Conn, packet mqttcodec.ControlP
206213

207214
func (h *MQTTHandler) handleDisconnect(conn mqttserver.Conn, packet mqttcodec.ControlPacket) {
208215
req := packet.(*mqttcodec.DisconnectPacket)
209-
h.logger.Infof("Handling MQTT message '%s' from /%v", req.MessageName(), conn.RemoteAddr())
216+
h.logger.Infof("Handling MQTT message '%s' from /%v", req.Name(), conn.RemoteAddr())
210217
err := conn.Close()
211218
if err != nil {
212219
h.logger.WithError(err).Warnf("Closing connection on 'DISCONNECT' failed")
213220
}
214221
}
215222

223+
func (h *MQTTHandler) ignore(conn mqttserver.Conn, packet mqttcodec.ControlPacket) {
224+
h.logger.Debugf("No handler available for MQTT message '%s' from /%v. Ignoring", packet.Name(), conn.RemoteAddr())
225+
}
226+
216227
func New(logger log.Logger, registry *prometheus.Registry, publisher apis.Publisher, opts ...Option) *MQTTHandler {
217228
options := options{}
218229
for _, o := range opts {
@@ -230,6 +241,19 @@ func New(logger log.Logger, registry *prometheus.Registry, publisher apis.Publis
230241
h.HandleFunc(mqttcodec.DISCONNECT, h.handleDisconnect)
231242
h.HandleFunc(mqttcodec.PUBREL, h.handlePublishRelease)
232243
h.HandleFunc(mqttcodec.PINGREQ, h.handlePing)
244+
245+
for _, name := range options.ignoreUnsupported {
246+
for t, n := range mqttcodec.MqttMessageTypeNames {
247+
if n == name {
248+
logger.Infof("%s requests will be ignored", name)
249+
h.HandleFunc(t, h.ignore)
250+
continue
251+
}
252+
}
253+
}
254+
for _, name := range options.allowUnauthenticated {
255+
logger.Infof("%s requests will be allow unauthenticated", name)
256+
}
233257
return h
234258

235259
}

pkg/mqtt/handler/option.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package mqtthandler
33
import "time"
44

55
type options struct {
6-
allowUnauthenticated bool
6+
ignoreUnsupported []string
7+
allowUnauthenticated []string
78
publishTimeout time.Duration
89
publishAsyncAtMostOnce bool
910
publishAsyncAtLeastOnce bool
@@ -20,9 +21,15 @@ func (f optionFunc) apply(o *options) {
2021
f(o)
2122
}
2223

23-
func WithAllowUnauthenticated(b bool) Option {
24+
func WithIgnoreUnsupported(a []string) Option {
2425
return optionFunc(func(o *options) {
25-
o.allowUnauthenticated = b
26+
o.ignoreUnsupported = a
27+
})
28+
}
29+
30+
func WithAllowUnauthenticated(a []string) Option {
31+
return optionFunc(func(o *options) {
32+
o.allowUnauthenticated = a
2633
})
2734
}
2835

pkg/mqtt/server/handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ func (mux *ServeMux) Handle(messageType byte, handler Handler) {
6161
func (mux *ServeMux) ServeMQTT(c Conn, p mqttcodec.ControlPacket) {
6262
entry := mux.m[p.Type()]
6363
if entry.h == nil {
64-
mux.logger.Debugf("No handler available for MQTT message '%s' from /%v. Ignoring", p.Name(), c.RemoteAddr())
64+
mux.logger.Warnf("No handler available for MQTT message '%s' from /%v. Disconnecting", p.Name(), c.RemoteAddr())
65+
_ = c.Close()
6566
return
6667
}
6768
entry.h.ServeMQTT(c, p)

0 commit comments

Comments
 (0)