Skip to content
This repository was archived by the owner on Mar 27, 2026. It is now read-only.

Commit 70a722c

Browse files
authored
Initial scaffolding for MQTT remote API (#3)
1 parent 0ede21b commit 70a722c

File tree

5 files changed

+180
-61
lines changed

5 files changed

+180
-61
lines changed

api/local/local.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package local
2+
3+
import (
4+
"context"
5+
"log"
6+
"net"
7+
8+
"google.golang.org/grpc"
9+
"google.golang.org/grpc/reflection"
10+
)
11+
12+
type Local struct {
13+
UnimplementedLocalServer
14+
SocketPath string
15+
RegisterHandler func(string, string, []string) error
16+
DeregisterHandler func(string, string, []string) error
17+
}
18+
19+
func (l *Local) Register(ctx context.Context, req *RegisterRequest) (*RegisterReply, error) {
20+
if err := l.RegisterHandler(req.GetVpc(), req.GetVpcattachment(), req.GetNetworks()); err != nil {
21+
return nil, err
22+
}
23+
return &RegisterReply{Confirmed: true}, nil
24+
}
25+
26+
func (l *Local) Deregister(ctx context.Context, req *DeregisterRequest) (*DeregisterReply, error) {
27+
if err := l.DeregisterHandler(req.GetVpc(), req.GetVpcattachment(), req.GetNetworks()); err != nil {
28+
return nil, err
29+
}
30+
return &DeregisterReply{Confirmed: true}, nil
31+
}
32+
33+
func (l *Local) Serve(ctx context.Context) error {
34+
listener, err := net.Listen("unix", l.SocketPath)
35+
if err != nil {
36+
return err
37+
}
38+
defer listener.Close() //nolint:errcheck
39+
40+
s := grpc.NewServer()
41+
RegisterLocalServer(s, l)
42+
43+
reflection.Register(s)
44+
45+
routineErr := make(chan error, 1)
46+
go func() {
47+
log.Printf("gRPC listening: unix://%s", l.SocketPath)
48+
if err := s.Serve(listener); err != nil {
49+
routineErr <- err
50+
return
51+
}
52+
routineErr <- nil
53+
}()
54+
55+
<-ctx.Done()
56+
s.Stop()
57+
log.Println("gRPC stopped")
58+
return <-routineErr
59+
}

api/remote/remote.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package remote
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
mqtt "github.com/eclipse/paho.mqtt.golang"
10+
)
11+
12+
type Remote struct {
13+
Host string
14+
Port int
15+
QoS byte
16+
TopicRX string
17+
TopicTX string
18+
ReceiveHandler func(interface{})
19+
20+
client mqtt.Client
21+
}
22+
23+
func (r *Remote) Run(ctx context.Context) error {
24+
mqtt_url := fmt.Sprintf("tcp://%s:%d", r.Host, r.Port)
25+
log.Printf("MQTT connecting: %s", mqtt_url)
26+
27+
r.client = mqtt.NewClient(
28+
mqtt.NewClientOptions().
29+
AddBroker(mqtt_url),
30+
)
31+
if tok := r.client.Connect(); tok.Wait() && tok.Error() != nil {
32+
return tok.Error()
33+
}
34+
log.Println("MQTT connected")
35+
36+
token := r.client.Subscribe(
37+
r.TopicRX,
38+
r.QoS,
39+
func(_ mqtt.Client, msg mqtt.Message) {
40+
r.ReceiveHandler(msg.Payload())
41+
},
42+
)
43+
if !token.WaitTimeout(5*time.Second) || token.Error() != nil {
44+
return token.Error()
45+
}
46+
log.Printf("MQTT subscribed: %s", r.TopicRX)
47+
48+
<-ctx.Done()
49+
if r.client.IsConnected() {
50+
r.client.Disconnect(250)
51+
}
52+
log.Println("MQTT disconnected")
53+
54+
return nil
55+
}
56+
57+
func (r *Remote) Send(payload interface{}) {
58+
token := r.client.Publish(r.TopicTX, r.QoS, false, payload)
59+
token.Wait()
60+
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ module github.com/datum-cloud/galactic-agent
33
go 1.24
44

55
require (
6+
github.com/eclipse/paho.mqtt.golang v1.5.0
67
github.com/spf13/cobra v1.9.1
78
github.com/spf13/viper v1.20.1
9+
golang.org/x/sync v0.14.0
810
google.golang.org/grpc v1.74.2
911
google.golang.org/protobuf v1.36.6
1012
)
1113

1214
require (
1315
github.com/fsnotify/fsnotify v1.8.0 // indirect
1416
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
17+
github.com/gorilla/websocket v1.5.3 // indirect
1518
github.com/inconshreveable/mousetrap v1.1.0 // indirect
1619
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
1720
github.com/sagikazarmark/locafero v0.7.0 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
44
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
6+
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
57
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
68
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
79
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
@@ -18,6 +20,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
1820
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
1921
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
2022
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
23+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
24+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
2125
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
2226
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
2327
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -69,6 +73,8 @@ go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
6973
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
7074
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
7175
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
76+
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
77+
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
7278
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
7379
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
7480
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=

main.go

Lines changed: 52 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,90 +2,45 @@ package main
22

33
import (
44
"context"
5+
"fmt"
6+
"golang.org/x/sync/errgroup"
57
"log"
6-
"net"
78
"os"
89
"os/signal"
910
"syscall"
10-
"time"
1111

1212
"github.com/spf13/cobra"
1313
"github.com/spf13/viper"
1414

15-
"google.golang.org/grpc"
16-
"google.golang.org/grpc/reflection"
17-
1815
"github.com/datum-cloud/galactic-agent/api/local"
16+
"github.com/datum-cloud/galactic-agent/api/remote"
1917
)
2018

21-
const DEFAULT_SOCKET_PATH = "/var/run/galactic/agent.sock"
22-
23-
type LocalServer struct {
24-
local.UnimplementedLocalServer
25-
}
26-
27-
func (s *LocalServer) Register(ctx context.Context, req *local.RegisterRequest) (*local.RegisterReply, error) {
28-
log.Printf("REGISTER: vpc='%v', vpcattachment='%v', networks='%v'\n", req.GetVpc(), req.GetVpcattachment(), req.GetNetworks())
29-
return &local.RegisterReply{Confirmed: true}, nil
30-
}
31-
32-
func (s *LocalServer) Deregister(ctx context.Context, req *local.DeregisterRequest) (*local.DeregisterReply, error) {
33-
log.Printf("DEREGISTER: vpc='%v', vpcattachment='%v', networks='%v'\n", req.GetVpc(), req.GetVpcattachment(), req.GetNetworks())
34-
return &local.DeregisterReply{Confirmed: true}, nil
35-
}
36-
37-
func Serve(socketPath string) error {
38-
listener, err := net.Listen("unix", socketPath)
39-
if err != nil {
40-
return err
41-
}
42-
defer listener.Close() //nolint:errcheck
43-
44-
s := grpc.NewServer()
45-
local.RegisterLocalServer(s, &LocalServer{})
46-
47-
reflection.Register(s)
48-
49-
stop := make(chan os.Signal, 1)
50-
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
51-
52-
go func() {
53-
log.Printf("gRPC listening on unix://%s", socketPath)
54-
if err := s.Serve(listener); err != nil {
55-
log.Printf("serve exited: %v", err)
56-
}
57-
}()
58-
59-
<-stop
60-
log.Println("shutting down...")
61-
done := make(chan struct{})
62-
go func() {
63-
s.GracefulStop()
64-
close(done)
65-
}()
66-
select {
67-
case <-done:
68-
case <-time.After(5 * time.Second):
69-
s.Stop()
70-
}
71-
return nil
72-
}
73-
7419
var configFile string
7520

7621
func initConfig() {
22+
viper.SetDefault("socket_path", "/var/run/galactic/agent.sock")
23+
viper.SetDefault("mqtt_host", "mqtt")
24+
viper.SetDefault("mqtt_port", 1883)
25+
viper.SetDefault("mqtt_qos", 0)
26+
viper.SetDefault("mqtt_topic_receive", "galactic/default/receive")
27+
viper.SetDefault("mqtt_topic_send", "galactic/default/send")
7728
if configFile != "" {
7829
viper.SetConfigFile(configFile)
7930
}
8031
viper.AutomaticEnv()
81-
viper.SetDefault("socket_path", "/var/run/galactic/agent.sock")
8232
if err := viper.ReadInConfig(); err == nil {
8333
log.Printf("Using config file: %s\n", viper.ConfigFileUsed())
8434
} else {
8535
log.Printf("No config file found - using defaults.")
8636
}
8737
}
8838

39+
var (
40+
l local.Local
41+
r remote.Remote
42+
)
43+
8944
func main() {
9045
cmd := &cobra.Command{
9146
Use: "galactic-agent",
@@ -94,9 +49,45 @@ func main() {
9449
initConfig()
9550
},
9651
Run: func(cmd *cobra.Command, args []string) {
97-
if err := Serve(viper.GetString("socket_path")); err != nil {
98-
log.Fatalf("Serve failed: %v", err)
52+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
53+
defer stop() //nolint:errcheck
54+
55+
l = local.Local{
56+
SocketPath: viper.GetString("socket_path"),
57+
RegisterHandler: func(vpc, vpcAttachment string, networks []string) error {
58+
log.Printf("REGISTER: vpc='%v', vpcattachment='%v', networks='%v'\n", vpc, vpcAttachment, networks)
59+
r.Send(fmt.Sprintf("REGISTER: vpc='%v', vpcattachment='%v', networks='%v'", vpc, vpcAttachment, networks))
60+
return nil
61+
},
62+
DeregisterHandler: func(vpc, vpcAttachment string, networks []string) error {
63+
log.Printf("DEREGISTER: vpc='%v', vpcattachment='%v', networks='%v'\n", vpc, vpcAttachment, networks)
64+
r.Send(fmt.Sprintf("DEREGISTER: vpc='%v', vpcattachment='%v', networks='%v'", vpc, vpcAttachment, networks))
65+
return nil
66+
},
67+
}
68+
69+
r = remote.Remote{
70+
Host: viper.GetString("mqtt_host"),
71+
Port: viper.GetInt("mqtt_port"),
72+
QoS: byte(viper.GetInt("mqtt_qos")),
73+
TopicRX: viper.GetString("mqtt_topic_receive"),
74+
TopicTX: viper.GetString("mqtt_topic_send"),
75+
ReceiveHandler: func(payload interface{}) {
76+
log.Printf("MQTT received: %s", payload)
77+
},
78+
}
79+
80+
g, ctx := errgroup.WithContext(ctx)
81+
g.Go(func() error {
82+
return l.Serve(ctx)
83+
})
84+
g.Go(func() error {
85+
return r.Run(ctx)
86+
})
87+
if err := g.Wait(); err != nil {
88+
log.Printf("Error: %v", err)
9989
}
90+
log.Printf("Shutdown")
10091
},
10192
}
10293
cmd.PersistentFlags().StringVar(&configFile, "config", "", "config file")

0 commit comments

Comments
 (0)