From 8d59d67d17d5e87c702f783ab23bfc1f1995b449 Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Sat, 24 Nov 2018 15:24:58 +0300 Subject: [PATCH 1/8] Feature: Reading a configuration file. Probably a very overkill way to do it, but seeing as how I know next to no Golang, it's surprising I got it working at all. --- go.mod | 4 +++- go.sum | 29 +++++++++++++++++++++++ main.go | 73 ++++++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 89 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 370713a..898e8c1 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/rs/moquette require ( github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24 + github.com/spf13/pflag v1.0.3 + github.com/spf13/viper v1.2.1 golang.org/x/net v0.0.0-20180724234803-3673e40ba225 // indirect - golang.org/x/sys v0.0.0-20180806192500-2be389f392cd + golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 ) diff --git a/go.sum b/go.sum index 55317dd..fc14226 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,37 @@ +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24 h1:21LB+xEZf6xdOrp2JRw5Uongjp7wzGoJ99UnApIlUeg= github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/eclipse/paho.mqtt.golang v1.1.1 h1:iPJYXJLaViCshRTW/PSqImSS6HJ2Rf671WR0bXZ2GIU= github.com/eclipse/paho.mqtt.golang v1.1.1/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I= +github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= +github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M= +github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225 h1:kNX+jCowfMYzvlSvJu5pQWEmyWFrBXJ3PBy10xKMXK8= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sys v0.0.0-20180806192500-2be389f392cd h1:KFYUs6SCkSktZ+xJWb5YbuSCJLLphbTsg0kvyirtlQ8= golang.org/x/sys v0.0.0-20180806192500-2be389f392cd/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 h1:BH3eQWeGbwRU2+wxxuuPOdFBmaiBH81O8BugSjHeTFg= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 7e78df7..86eb846 100644 --- a/main.go +++ b/main.go @@ -2,41 +2,82 @@ package main import ( "crypto/tls" - "flag" "log" "os" "strconv" "time" + flag "github.com/spf13/pflag" + "github.com/spf13/viper" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/rs/moquette/server" ) func main() { hostname, _ := os.Hostname() - debug := flag.Bool("debug", false, "Turn on debugging") - broker := flag.String("broker", "tcp://127.0.0.1:1883", "The full url of the mqtt broker to connect to ex: tcp://127.0.0.1:1883") - clientID := flag.String("client-id", hostname+strconv.Itoa(time.Now().Second()), "A client id for the connection") - username := flag.String("username", "", "A username to authenticate to the mqtt server") - password := flag.String("password", "", "Password to match username") - confDir := flag.String("conf", "/etc/moquette.d", "Path to the configuration director") - sep := flag.String("sep", ":", "File name separator used for topic separator (/)") + + flag.Bool("debug", false, "Turn on debugging") + flag.String("broker", "tcp://127.0.0.1:1883", "The full url of the mqtt broker to connect to ex: tcp://127.0.0.1:1883") + flag.String("client-id", hostname+strconv.Itoa(time.Now().Second()), "A client id for the connection") + flag.String("username", "", "A username to authenticate to the mqtt server") + flag.String("password", "", "Password to match username") + flag.String("conf", "/etc/moquette.d", "Path to the configuration directory") + flag.String("sep", ":", "File name separator used for topic separator (/)") + flag.String("configfile", "", "Configuration file name to read options from.") + flag.Parse() + viper.BindPFlags(flag.CommandLine) + + explicitConfig := viper.GetString("configfile") + + if explicitConfig != "" { + viper.SetConfigFile(explicitConfig) + err := viper.ReadInConfig() + if err != nil { + log.Fatal("Fatal error when loading config file given on command line:\n", err) + } + } else { + viper.SetConfigName("moquette") + viper.AddConfigPath(".") + viper.AddConfigPath("$HOME/.local/config/moquette") + viper.AddConfigPath("/etc/moquette/") + viper.AddConfigPath("/etc/") + + // With implicit config files, it's permissible not to have them at all. + err := viper.ReadInConfig() + + if err != nil { + if _, cnf := err.(viper.ConfigFileNotFoundError); cnf { + log.Print("No config file found, using defaults and command line only.") + } else { + log.Fatal("Fatal error when loading config file:\n", err) + } + } + } + + debug := viper.GetBool("debug") + broker := viper.GetString("broker") + clientID := viper.GetString("client-id") + username := viper.GetString("username") + password := viper.GetString("password") + confDir := viper.GetString("conf") + sep := viper.GetString("sep") - if *debug { + if debug { mqtt.DEBUG = log.New(os.Stderr, "", 0) } mqtt.ERROR = log.New(os.Stderr, "", 0) connOpts := mqtt.NewClientOptions(). - AddBroker(*broker). - SetClientID(*clientID). + AddBroker(broker). + SetClientID(clientID). SetCleanSession(true). SetKeepAlive(2 * time.Second) - if *username != "" { - connOpts.SetUsername(*username) - if *password != "" { - connOpts.SetPassword(*password) + if username != "" { + connOpts.SetUsername(username) + if password != "" { + connOpts.SetPassword(password) } } tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert} @@ -46,7 +87,7 @@ func main() { log.Print("Connected") }) - s := server.New(connOpts, *confDir, *sep) + s := server.New(connOpts, confDir, sep) stop := make(chan struct{}) if err := s.Run(stop); err != nil { panic(err) From d858d11bf4348fc382514f4ebbdb23c5d82f3ac7 Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Thu, 27 Dec 2018 19:24:00 +0300 Subject: [PATCH 2/8] Silly typo. --- server/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/server.go b/server/server.go index 45a80bf..120bcdb 100644 --- a/server/server.go +++ b/server/server.go @@ -50,7 +50,7 @@ func (s *Server) Run(stop chan struct{}) error { return nil } -func (s *Server) intputHandler(p *os.Process, r io.Reader) { +func (s *Server) inputHandler(p *os.Process, r io.Reader) { proto := newProtoReader(r) for { cmd, err := proto.Next() @@ -104,7 +104,7 @@ func (s *Server) handleMessage(msg mqtt.Message) { if err := c.Start(); err != nil { log.Printf("%s: %v", cmd, err) } - go s.intputHandler(c.Process, r) + go s.inputHandler(c.Process, r) log.Printf("executing %s %s (pid: %d)", cmd, p, c.Process.Pid) s.addProc(c.Process, topic) defer s.removeProc(c.Process) From ca67cb522ca4587da5eaeb3746c5efab816f0ff8 Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Thu, 27 Dec 2018 19:35:33 +0300 Subject: [PATCH 3/8] Sorting out dependencies. --- go.mod | 5 ++++- go.sum | 12 ++++++++---- main.go | 2 +- server/server.go | 2 +- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 898e8c1..ab49a60 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,12 @@ -module github.com/rs/moquette +module moquette require ( + github.com/BurntSushi/toml v0.3.1 // indirect github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24 + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.3 github.com/spf13/viper v1.2.1 + github.com/stretchr/testify v1.2.2 // indirect golang.org/x/net v0.0.0-20180724234803-3673e40ba225 // indirect golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 ) diff --git a/go.sum b/go.sum index fc14226..b8b38ac 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,9 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24 h1:21LB+xEZf6xdOrp2JRw5Uongjp7wzGoJ99UnApIlUeg= github.com/eclipse/paho.mqtt.golang v0.0.0-20180614102224-88c4622b8e24/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= -github.com/eclipse/paho.mqtt.golang v1.1.1 h1:iPJYXJLaViCshRTW/PSqImSS6HJ2Rf671WR0bXZ2GIU= -github.com/eclipse/paho.mqtt.golang v1.1.1/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -13,6 +14,8 @@ github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KH github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= @@ -24,14 +27,15 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M= github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225 h1:kNX+jCowfMYzvlSvJu5pQWEmyWFrBXJ3PBy10xKMXK8= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/sys v0.0.0-20180806192500-2be389f392cd h1:KFYUs6SCkSktZ+xJWb5YbuSCJLLphbTsg0kvyirtlQ8= -golang.org/x/sys v0.0.0-20180806192500-2be389f392cd/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 h1:BH3eQWeGbwRU2+wxxuuPOdFBmaiBH81O8BugSjHeTFg= golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 86eb846..4fbe1d7 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( "github.com/spf13/viper" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/rs/moquette/server" + "moquette/server" ) func main() { diff --git a/server/server.go b/server/server.go index 120bcdb..d6297af 100644 --- a/server/server.go +++ b/server/server.go @@ -9,7 +9,7 @@ import ( "sync" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/rs/moquette/router" + "moquette/router" ) type Server struct { From 346ee7a8c8751912f30fa4d43089426991ad0f57 Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Thu, 27 Dec 2018 19:57:12 +0300 Subject: [PATCH 4/8] Pass all of our environment to the invoked scripts too. --- server/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/server.go b/server/server.go index d6297af..d587ea1 100644 --- a/server/server.go +++ b/server/server.go @@ -97,10 +97,10 @@ func (s *Server) handleMessage(msg mqtt.Message) { c.Stdout = os.Stdout c.Stderr = os.Stderr c.ExtraFiles = []*os.File{w} - c.Env = []string{ + c.Env = append(os.Environ(), fmt.Sprintf("MQTT_TOPIC=%s", msg.Topic()), - fmt.Sprintf("MQTT_MSGID=%d", msg.MessageID()), - } + fmt.Sprintf("MQTT_MSGID=%d", msg.MessageID())) + if err := c.Start(); err != nil { log.Printf("%s: %v", cmd, err) } From bddba9e797ee38bf780b1bdfb88973715be83dd5 Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Thu, 27 Dec 2018 22:19:53 +0300 Subject: [PATCH 5/8] Golang's handling of sub-modules within a project is utterly insane. What's even more surprising is that the aficionados report using various stupid hacks to deal with it: globally replacing paths, working on code directly where go get downloaded it, splitting a unit into sub-repositories when other projects are never supposed to import it piecewise, etc, etc. Seeing as how my pull request resulted in no reaction from the original developer, I'm adopting this, and let's make it a learning experience. For starters, let's merge it into one package. --- main.go | 3 +-- server/proto.go => proto.go | 2 +- router/router.go => router.go | 2 +- server/server.go => server.go | 25 ++++++++++++------------- 4 files changed, 15 insertions(+), 17 deletions(-) rename server/proto.go => proto.go (99%) rename router/router.go => router.go (98%) rename server/server.go => server.go (80%) diff --git a/main.go b/main.go index 4fbe1d7..e20f358 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "github.com/spf13/viper" mqtt "github.com/eclipse/paho.mqtt.golang" - "moquette/server" ) func main() { @@ -87,7 +86,7 @@ func main() { log.Print("Connected") }) - s := server.New(connOpts, confDir, sep) + s := NewServer(connOpts, confDir, sep) stop := make(chan struct{}) if err := s.Run(stop); err != nil { panic(err) diff --git a/server/proto.go b/proto.go similarity index 99% rename from server/proto.go rename to proto.go index ed718a3..6bd57b9 100644 --- a/server/proto.go +++ b/proto.go @@ -1,4 +1,4 @@ -package server +package main import ( "bufio" diff --git a/router/router.go b/router.go similarity index 98% rename from router/router.go rename to router.go index aa87895..58fc9bd 100644 --- a/router/router.go +++ b/router.go @@ -1,4 +1,4 @@ -package router +package main import ( "errors" diff --git a/server/server.go b/server.go similarity index 80% rename from server/server.go rename to server.go index d587ea1..a019ecb 100644 --- a/server/server.go +++ b/server.go @@ -1,4 +1,4 @@ -package server +package main import ( "fmt" @@ -9,10 +9,9 @@ import ( "sync" mqtt "github.com/eclipse/paho.mqtt.golang" - "moquette/router" ) -type Server struct { +type server struct { conf string sep string client mqtt.Client @@ -20,8 +19,8 @@ type Server struct { mu sync.RWMutex } -func New(mqttOpts *mqtt.ClientOptions, confDir, sep string) *Server { - s := &Server{ +func NewServer(mqttOpts *mqtt.ClientOptions, confDir, sep string) *server { + s := &server{ conf: confDir, sep: sep, procs: map[*os.Process]string{}, @@ -40,7 +39,7 @@ func New(mqttOpts *mqtt.ClientOptions, confDir, sep string) *Server { return s } -func (s *Server) Run(stop chan struct{}) error { +func (s *server) Run(stop chan struct{}) error { if token := s.client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } @@ -50,7 +49,7 @@ func (s *Server) Run(stop chan struct{}) error { return nil } -func (s *Server) inputHandler(p *os.Process, r io.Reader) { +func (s *server) inputHandler(p *os.Process, r io.Reader) { proto := newProtoReader(r) for { cmd, err := proto.Next() @@ -70,14 +69,14 @@ func (s *Server) inputHandler(p *os.Process, r io.Reader) { } } -func (s *Server) handleMessage(msg mqtt.Message) { - rt := router.Router{ +func (s *server) handleMessage(msg mqtt.Message) { + rt := Router{ Dir: s.conf, Sep: s.sep, } topic := msg.Topic() cmd, err := rt.Match(topic) - if err == router.ErrNotFound || cmd == "" { + if err == ErrNotFound || cmd == "" { return } if err != nil { @@ -113,19 +112,19 @@ func (s *Server) handleMessage(msg mqtt.Message) { } } -func (s *Server) addProc(p *os.Process, topic string) { +func (s *server) addProc(p *os.Process, topic string) { s.mu.Lock() defer s.mu.Unlock() s.procs[p] = topic } -func (s *Server) removeProc(p *os.Process) { +func (s *server) removeProc(p *os.Process) { s.mu.Lock() defer s.mu.Unlock() delete(s.procs, p) } -func (s *Server) kill(topic string, except *os.Process) { +func (s *server) kill(topic string, except *os.Process) { s.mu.RLock() defer s.mu.RUnlock() for p, t := range s.procs { From 80088550972532e3c4348dc1f3be60148ba30b00 Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Thu, 27 Dec 2018 22:34:15 +0300 Subject: [PATCH 6/8] Meant to fix that a while ago. --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index e20f358..9b228fc 100644 --- a/main.go +++ b/main.go @@ -39,7 +39,7 @@ func main() { } else { viper.SetConfigName("moquette") viper.AddConfigPath(".") - viper.AddConfigPath("$HOME/.local/config/moquette") + viper.AddConfigPath("$HOME/.config/moquette") viper.AddConfigPath("/etc/moquette/") viper.AddConfigPath("/etc/") From d762a7440b57ece96affc9dc8d5e217aee8246bb Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Thu, 27 Dec 2018 23:19:15 +0300 Subject: [PATCH 7/8] Wait for inputHandler to complete, too. I've noticed that occasionally, commands whose only job is to send a PUB message to another channel do not execute, especially if multiple messages triggering them arrive in quick succession. It appears that due to all the goroutines floating around, it is possible for os.exec.Wait to complete before inputHandler actually gets to do anything to the pipe -- and Wait does not appear to wait for the pipe to be closed on both ends too, so commands can get lost entirely or even fail because the pipe was already closed. Waiting for the inputHandler to complete appears to fix the issue. Concurrency, fun for the whole family. --- server.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server.go b/server.go index a019ecb..4eaee3a 100644 --- a/server.go +++ b/server.go @@ -49,7 +49,7 @@ func (s *server) Run(stop chan struct{}) error { return nil } -func (s *server) inputHandler(p *os.Process, r io.Reader) { +func (s *server) inputHandler(p *os.Process, r io.Reader, wg *sync.WaitGroup) { proto := newProtoReader(r) for { cmd, err := proto.Next() @@ -67,6 +67,7 @@ func (s *server) inputHandler(p *os.Process, r io.Reader) { s.kill(t.Topic, p) } } + wg.Done() } func (s *server) handleMessage(msg mqtt.Message) { @@ -103,13 +104,16 @@ func (s *server) handleMessage(msg mqtt.Message) { if err := c.Start(); err != nil { log.Printf("%s: %v", cmd, err) } - go s.inputHandler(c.Process, r) + var wg sync.WaitGroup + wg.Add(1) + go s.inputHandler(c.Process, r, &wg) log.Printf("executing %s %s (pid: %d)", cmd, p, c.Process.Pid) s.addProc(c.Process, topic) defer s.removeProc(c.Process) if err := c.Wait(); err != nil { log.Printf("%s: %v", cmd, err) } + wg.Wait() } func (s *server) addProc(p *os.Process, topic string) { From f36829c1463a534dc43d7f839a0bdfa5458affcd Mon Sep 17 00:00:00 2001 From: Eugene Medvedev Date: Thu, 10 Jan 2019 04:09:59 +0300 Subject: [PATCH 8/8] Explicit pipe close to prevent stuck goroutines. --- server.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/server.go b/server.go index 4eaee3a..6cc268e 100644 --- a/server.go +++ b/server.go @@ -51,6 +51,8 @@ func (s *server) Run(stop chan struct{}) error { func (s *server) inputHandler(p *os.Process, r io.Reader, wg *sync.WaitGroup) { proto := newProtoReader(r) + defer wg.Done() + for { cmd, err := proto.Next() if err != nil { @@ -67,7 +69,6 @@ func (s *server) inputHandler(p *os.Process, r io.Reader, wg *sync.WaitGroup) { s.kill(t.Topic, p) } } - wg.Done() } func (s *server) handleMessage(msg mqtt.Message) { @@ -90,7 +91,6 @@ func (s *server) handleMessage(msg mqtt.Message) { return } defer r.Close() - defer w.Close() p := string(msg.Payload()) c := exec.Command(cmd, p) c.Dir = s.conf @@ -104,15 +104,27 @@ func (s *server) handleMessage(msg mqtt.Message) { if err := c.Start(); err != nil { log.Printf("%s: %v", cmd, err) } + var wg sync.WaitGroup wg.Add(1) go s.inputHandler(c.Process, r, &wg) + log.Printf("executing %s %s (pid: %d)", cmd, p, c.Process.Pid) s.addProc(c.Process, topic) defer s.removeProc(c.Process) + if err := c.Wait(); err != nil { log.Printf("%s: %v", cmd, err) } + + // Due to all the mess of threads going on, we can't defer + // the closing of the pipe -- we close it to indicate + // that no more input will appear, + // which generates an EOF on the read end, + // which lets the inputHandler complete, + // which lets *this* goroutine complete and clean up. + w.Close() + wg.Wait() }