Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
module github.com/rs/moquette
module moquette

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.2.1
github.com/stretchr/testify v1.2.2 // indirect
golang.org/x/net v0.0.0-20180724234803-3673e40ba225 // indirect
golang.org/x/sys v0.0.0-20180806192500-2be389f392cd
golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992
)
41 changes: 37 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,41 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24 h1:21LB+xEZf6xdOrp2JRw5Uongjp7wzGoJ99UnApIlUeg=
github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/eclipse/paho.mqtt.golang v1.1.1 h1:iPJYXJLaViCshRTW/PSqImSS6HJ2Rf671WR0bXZ2GIU=
github.com/eclipse/paho.mqtt.golang v1.1.1/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I=
github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M=
github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225 h1:kNX+jCowfMYzvlSvJu5pQWEmyWFrBXJ3PBy10xKMXK8=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180806192500-2be389f392cd h1:KFYUs6SCkSktZ+xJWb5YbuSCJLLphbTsg0kvyirtlQ8=
golang.org/x/sys v0.0.0-20180806192500-2be389f392cd/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 h1:BH3eQWeGbwRU2+wxxuuPOdFBmaiBH81O8BugSjHeTFg=
golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
74 changes: 57 additions & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,81 @@ package main

import (
"crypto/tls"
"flag"
"log"
"os"
"strconv"
"time"

flag "github.com/spf13/pflag"
"github.com/spf13/viper"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/moquette/server"
)

func main() {
hostname, _ := os.Hostname()
debug := flag.Bool("debug", false, "Turn on debugging")
broker := flag.String("broker", "tcp://127.0.0.1:1883", "The full url of the mqtt broker to connect to ex: tcp://127.0.0.1:1883")
clientID := flag.String("client-id", hostname+strconv.Itoa(time.Now().Second()), "A client id for the connection")
username := flag.String("username", "", "A username to authenticate to the mqtt server")
password := flag.String("password", "", "Password to match username")
confDir := flag.String("conf", "/etc/moquette.d", "Path to the configuration director")
sep := flag.String("sep", ":", "File name separator used for topic separator (/)")

flag.Bool("debug", false, "Turn on debugging")
flag.String("broker", "tcp://127.0.0.1:1883", "The full url of the mqtt broker to connect to ex: tcp://127.0.0.1:1883")
flag.String("client-id", hostname+strconv.Itoa(time.Now().Second()), "A client id for the connection")
flag.String("username", "", "A username to authenticate to the mqtt server")
flag.String("password", "", "Password to match username")
flag.String("conf", "/etc/moquette.d", "Path to the configuration directory")
flag.String("sep", ":", "File name separator used for topic separator (/)")
flag.String("configfile", "", "Configuration file name to read options from.")

flag.Parse()
viper.BindPFlags(flag.CommandLine)

explicitConfig := viper.GetString("configfile")

if explicitConfig != "" {
viper.SetConfigFile(explicitConfig)
err := viper.ReadInConfig()
if err != nil {
log.Fatal("Fatal error when loading config file given on command line:\n", err)
}
} else {
viper.SetConfigName("moquette")
viper.AddConfigPath(".")
viper.AddConfigPath("$HOME/.config/moquette")
viper.AddConfigPath("/etc/moquette/")
viper.AddConfigPath("/etc/")

// With implicit config files, it's permissible not to have them at all.
err := viper.ReadInConfig()

if err != nil {
if _, cnf := err.(viper.ConfigFileNotFoundError); cnf {
log.Print("No config file found, using defaults and command line only.")
} else {
log.Fatal("Fatal error when loading config file:\n", err)
}
}
}

debug := viper.GetBool("debug")
broker := viper.GetString("broker")
clientID := viper.GetString("client-id")
username := viper.GetString("username")
password := viper.GetString("password")
confDir := viper.GetString("conf")
sep := viper.GetString("sep")

if *debug {
if debug {
mqtt.DEBUG = log.New(os.Stderr, "", 0)
}
mqtt.ERROR = log.New(os.Stderr, "", 0)

connOpts := mqtt.NewClientOptions().
AddBroker(*broker).
SetClientID(*clientID).
AddBroker(broker).
SetClientID(clientID).
SetCleanSession(true).
SetKeepAlive(2 * time.Second)
if *username != "" {
connOpts.SetUsername(*username)
if *password != "" {
connOpts.SetPassword(*password)
if username != "" {
connOpts.SetUsername(username)
if password != "" {
connOpts.SetPassword(password)
}
}
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
Expand All @@ -46,7 +86,7 @@ func main() {
log.Print("Connected")
})

s := server.New(connOpts, *confDir, *sep)
s := NewServer(connOpts, confDir, sep)
stop := make(chan struct{})
if err := s.Run(stop); err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion server/proto.go → proto.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package main

import (
"bufio"
Expand Down
2 changes: 1 addition & 1 deletion router/router.go → router.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package router
package main

import (
"errors"
Expand Down
51 changes: 33 additions & 18 deletions server/server.go → server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package main

import (
"fmt"
Expand All @@ -9,19 +9,18 @@ import (
"sync"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/moquette/router"
)

type Server struct {
type server struct {
conf string
sep string
client mqtt.Client
procs map[*os.Process]string // proc -> topic
mu sync.RWMutex
}

func New(mqttOpts *mqtt.ClientOptions, confDir, sep string) *Server {
s := &Server{
func NewServer(mqttOpts *mqtt.ClientOptions, confDir, sep string) *server {
s := &server{
conf: confDir,
sep: sep,
procs: map[*os.Process]string{},
Expand All @@ -40,7 +39,7 @@ func New(mqttOpts *mqtt.ClientOptions, confDir, sep string) *Server {
return s
}

func (s *Server) Run(stop chan struct{}) error {
func (s *server) Run(stop chan struct{}) error {
if token := s.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
Expand All @@ -50,8 +49,10 @@ func (s *Server) Run(stop chan struct{}) error {
return nil
}

func (s *Server) intputHandler(p *os.Process, r io.Reader) {
func (s *server) inputHandler(p *os.Process, r io.Reader, wg *sync.WaitGroup) {
proto := newProtoReader(r)
defer wg.Done()

for {
cmd, err := proto.Next()
if err != nil {
Expand All @@ -70,14 +71,14 @@ func (s *Server) intputHandler(p *os.Process, r io.Reader) {
}
}

func (s *Server) handleMessage(msg mqtt.Message) {
rt := router.Router{
func (s *server) handleMessage(msg mqtt.Message) {
rt := Router{
Dir: s.conf,
Sep: s.sep,
}
topic := msg.Topic()
cmd, err := rt.Match(topic)
if err == router.ErrNotFound || cmd == "" {
if err == ErrNotFound || cmd == "" {
return
}
if err != nil {
Expand All @@ -90,42 +91,56 @@ func (s *Server) handleMessage(msg mqtt.Message) {
return
}
defer r.Close()
defer w.Close()
p := string(msg.Payload())
c := exec.Command(cmd, p)
c.Dir = s.conf
c.Stdout = os.Stdout
c.Stderr = os.Stderr
c.ExtraFiles = []*os.File{w}
c.Env = []string{
c.Env = append(os.Environ(),
fmt.Sprintf("MQTT_TOPIC=%s", msg.Topic()),
fmt.Sprintf("MQTT_MSGID=%d", msg.MessageID()),
}
fmt.Sprintf("MQTT_MSGID=%d", msg.MessageID()))

if err := c.Start(); err != nil {
log.Printf("%s: %v", cmd, err)
}
go s.intputHandler(c.Process, r)

var wg sync.WaitGroup
wg.Add(1)
go s.inputHandler(c.Process, r, &wg)

log.Printf("executing %s %s (pid: %d)", cmd, p, c.Process.Pid)
s.addProc(c.Process, topic)
defer s.removeProc(c.Process)

if err := c.Wait(); err != nil {
log.Printf("%s: %v", cmd, err)
}

// Due to all the mess of threads going on, we can't defer
// the closing of the pipe -- we close it to indicate
// that no more input will appear,
// which generates an EOF on the read end,
// which lets the inputHandler complete,
// which lets *this* goroutine complete and clean up.
w.Close()

wg.Wait()
}

func (s *Server) addProc(p *os.Process, topic string) {
func (s *server) addProc(p *os.Process, topic string) {
s.mu.Lock()
defer s.mu.Unlock()
s.procs[p] = topic
}

func (s *Server) removeProc(p *os.Process) {
func (s *server) removeProc(p *os.Process) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.procs, p)
}

func (s *Server) kill(topic string, except *os.Process) {
func (s *server) kill(topic string, except *os.Process) {
s.mu.RLock()
defer s.mu.RUnlock()
for p, t := range s.procs {
Expand Down