diff --git a/go.mod b/go.mod index 370713a..ab49a60 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,12 @@ -module github.com/rs/moquette +module moquette require ( + github.com/BurntSushi/toml v0.3.1 // indirect github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24 + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spf13/pflag v1.0.3 + github.com/spf13/viper v1.2.1 + github.com/stretchr/testify v1.2.2 // indirect golang.org/x/net v0.0.0-20180724234803-3673e40ba225 // indirect - golang.org/x/sys v0.0.0-20180806192500-2be389f392cd + golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 ) diff --git a/go.sum b/go.sum index 55317dd..b8b38ac 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,41 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24 h1:21LB+xEZf6xdOrp2JRw5Uongjp7wzGoJ99UnApIlUeg= github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= -github.com/eclipse/paho.mqtt.golang v1.1.1 h1:iPJYXJLaViCshRTW/PSqImSS6HJ2Rf671WR0bXZ2GIU= -github.com/eclipse/paho.mqtt.golang v1.1.1/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I= +github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= +github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M= +github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225 h1:kNX+jCowfMYzvlSvJu5pQWEmyWFrBXJ3PBy10xKMXK8= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/sys v0.0.0-20180806192500-2be389f392cd h1:KFYUs6SCkSktZ+xJWb5YbuSCJLLphbTsg0kvyirtlQ8= -golang.org/x/sys v0.0.0-20180806192500-2be389f392cd/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 h1:BH3eQWeGbwRU2+wxxuuPOdFBmaiBH81O8BugSjHeTFg= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 7e78df7..9b228fc 100644 --- a/main.go +++ b/main.go @@ -2,41 +2,81 @@ package main import ( "crypto/tls" - "flag" "log" "os" "strconv" "time" + flag "github.com/spf13/pflag" + "github.com/spf13/viper" + mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/rs/moquette/server" ) func main() { hostname, _ := os.Hostname() - debug := flag.Bool("debug", false, "Turn on debugging") - broker := flag.String("broker", "tcp://127.0.0.1:1883", "The full url of the mqtt broker to connect to ex: tcp://127.0.0.1:1883") - clientID := flag.String("client-id", hostname+strconv.Itoa(time.Now().Second()), "A client id for the connection") - username := flag.String("username", "", "A username to authenticate to the mqtt server") - password := flag.String("password", "", "Password to match username") - confDir := flag.String("conf", "/etc/moquette.d", "Path to the configuration director") - sep := flag.String("sep", ":", "File name separator used for topic separator (/)") + + flag.Bool("debug", false, "Turn on debugging") + flag.String("broker", "tcp://127.0.0.1:1883", "The full url of the mqtt broker to connect to ex: tcp://127.0.0.1:1883") + flag.String("client-id", hostname+strconv.Itoa(time.Now().Second()), "A client id for the connection") + flag.String("username", "", "A username to authenticate to the mqtt server") + flag.String("password", "", "Password to match username") + flag.String("conf", "/etc/moquette.d", "Path to the configuration directory") + flag.String("sep", ":", "File name separator used for topic separator (/)") + flag.String("configfile", "", "Configuration file name to read options from.") + flag.Parse() + viper.BindPFlags(flag.CommandLine) + + explicitConfig := viper.GetString("configfile") + + if explicitConfig != "" { + viper.SetConfigFile(explicitConfig) + err := viper.ReadInConfig() + if err != nil { + log.Fatal("Fatal error when loading config file given on command line:\n", err) + } + } else { + viper.SetConfigName("moquette") + viper.AddConfigPath(".") + viper.AddConfigPath("$HOME/.config/moquette") + viper.AddConfigPath("/etc/moquette/") + viper.AddConfigPath("/etc/") + + // With implicit config files, it's permissible not to have them at all. + err := viper.ReadInConfig() + + if err != nil { + if _, cnf := err.(viper.ConfigFileNotFoundError); cnf { + log.Print("No config file found, using defaults and command line only.") + } else { + log.Fatal("Fatal error when loading config file:\n", err) + } + } + } + + debug := viper.GetBool("debug") + broker := viper.GetString("broker") + clientID := viper.GetString("client-id") + username := viper.GetString("username") + password := viper.GetString("password") + confDir := viper.GetString("conf") + sep := viper.GetString("sep") - if *debug { + if debug { mqtt.DEBUG = log.New(os.Stderr, "", 0) } mqtt.ERROR = log.New(os.Stderr, "", 0) connOpts := mqtt.NewClientOptions(). - AddBroker(*broker). - SetClientID(*clientID). + AddBroker(broker). + SetClientID(clientID). SetCleanSession(true). SetKeepAlive(2 * time.Second) - if *username != "" { - connOpts.SetUsername(*username) - if *password != "" { - connOpts.SetPassword(*password) + if username != "" { + connOpts.SetUsername(username) + if password != "" { + connOpts.SetPassword(password) } } tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert} @@ -46,7 +86,7 @@ func main() { log.Print("Connected") }) - s := server.New(connOpts, *confDir, *sep) + s := NewServer(connOpts, confDir, sep) stop := make(chan struct{}) if err := s.Run(stop); err != nil { panic(err) diff --git a/server/proto.go b/proto.go similarity index 99% rename from server/proto.go rename to proto.go index ed718a3..6bd57b9 100644 --- a/server/proto.go +++ b/proto.go @@ -1,4 +1,4 @@ -package server +package main import ( "bufio" diff --git a/router/router.go b/router.go similarity index 98% rename from router/router.go rename to router.go index aa87895..58fc9bd 100644 --- a/router/router.go +++ b/router.go @@ -1,4 +1,4 @@ -package router +package main import ( "errors" diff --git a/server/server.go b/server.go similarity index 67% rename from server/server.go rename to server.go index 45a80bf..6cc268e 100644 --- a/server/server.go +++ b/server.go @@ -1,4 +1,4 @@ -package server +package main import ( "fmt" @@ -9,10 +9,9 @@ import ( "sync" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/rs/moquette/router" ) -type Server struct { +type server struct { conf string sep string client mqtt.Client @@ -20,8 +19,8 @@ type Server struct { mu sync.RWMutex } -func New(mqttOpts *mqtt.ClientOptions, confDir, sep string) *Server { - s := &Server{ +func NewServer(mqttOpts *mqtt.ClientOptions, confDir, sep string) *server { + s := &server{ conf: confDir, sep: sep, procs: map[*os.Process]string{}, @@ -40,7 +39,7 @@ func New(mqttOpts *mqtt.ClientOptions, confDir, sep string) *Server { return s } -func (s *Server) Run(stop chan struct{}) error { +func (s *server) Run(stop chan struct{}) error { if token := s.client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } @@ -50,8 +49,10 @@ func (s *Server) Run(stop chan struct{}) error { return nil } -func (s *Server) intputHandler(p *os.Process, r io.Reader) { +func (s *server) inputHandler(p *os.Process, r io.Reader, wg *sync.WaitGroup) { proto := newProtoReader(r) + defer wg.Done() + for { cmd, err := proto.Next() if err != nil { @@ -70,14 +71,14 @@ func (s *Server) intputHandler(p *os.Process, r io.Reader) { } } -func (s *Server) handleMessage(msg mqtt.Message) { - rt := router.Router{ +func (s *server) handleMessage(msg mqtt.Message) { + rt := Router{ Dir: s.conf, Sep: s.sep, } topic := msg.Topic() cmd, err := rt.Match(topic) - if err == router.ErrNotFound || cmd == "" { + if err == ErrNotFound || cmd == "" { return } if err != nil { @@ -90,42 +91,56 @@ func (s *Server) handleMessage(msg mqtt.Message) { return } defer r.Close() - defer w.Close() p := string(msg.Payload()) c := exec.Command(cmd, p) c.Dir = s.conf c.Stdout = os.Stdout c.Stderr = os.Stderr c.ExtraFiles = []*os.File{w} - c.Env = []string{ + c.Env = append(os.Environ(), fmt.Sprintf("MQTT_TOPIC=%s", msg.Topic()), - fmt.Sprintf("MQTT_MSGID=%d", msg.MessageID()), - } + fmt.Sprintf("MQTT_MSGID=%d", msg.MessageID())) + if err := c.Start(); err != nil { log.Printf("%s: %v", cmd, err) } - go s.intputHandler(c.Process, r) + + var wg sync.WaitGroup + wg.Add(1) + go s.inputHandler(c.Process, r, &wg) + log.Printf("executing %s %s (pid: %d)", cmd, p, c.Process.Pid) s.addProc(c.Process, topic) defer s.removeProc(c.Process) + if err := c.Wait(); err != nil { log.Printf("%s: %v", cmd, err) } + + // Due to all the mess of threads going on, we can't defer + // the closing of the pipe -- we close it to indicate + // that no more input will appear, + // which generates an EOF on the read end, + // which lets the inputHandler complete, + // which lets *this* goroutine complete and clean up. + w.Close() + + wg.Wait() } -func (s *Server) addProc(p *os.Process, topic string) { +func (s *server) addProc(p *os.Process, topic string) { s.mu.Lock() defer s.mu.Unlock() s.procs[p] = topic } -func (s *Server) removeProc(p *os.Process) { +func (s *server) removeProc(p *os.Process) { s.mu.Lock() defer s.mu.Unlock() delete(s.procs, p) } -func (s *Server) kill(topic string, except *os.Process) { +func (s *server) kill(topic string, except *os.Process) { s.mu.RLock() defer s.mu.RUnlock() for p, t := range s.procs {