Skip to content

Commit 67526f5

Browse files
committed
feature: Listen on all interfaces for multicast messages
1 parent ab3d365 commit 67526f5

File tree

5 files changed

+162
-59
lines changed

5 files changed

+162
-59
lines changed

cmd/ssl-status-board/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,25 @@ import (
66
"github.com/gobuffalo/packr"
77
"log"
88
"net/http"
9+
"strings"
910
)
1011

1112
func main() {
1213
var config = board.DefaultConfig()
1314
var address = flag.String("address", config.ListenAddress, "The address on which the UI and API is served")
1415
var refereeAddress = flag.String("refereeAddress", config.RefereeConnection.MulticastAddress, "The multicast address of ssl-game-controller")
16+
var skipInterfaces = flag.String("skipInterfaces", "", "Comma separated list of interface names to ignore when receiving multicast packets")
1517
flag.Parse()
1618

1719
config.ListenAddress = *address
1820
config.RefereeConnection.MulticastAddress = *refereeAddress
21+
if skipInterfaces != nil && *skipInterfaces != "" {
22+
config.RefereeConnection.SkipInterfaces = parseSkipInterfaces(*skipInterfaces)
23+
}
1924

2025
refereeBoard := board.NewBoard(config.RefereeConnection)
21-
go refereeBoard.HandleIncomingMessages()
26+
refereeBoard.MulticastReceiver.SkipInterfaces = config.RefereeConnection.SkipInterfaces
27+
refereeBoard.Start()
2228
http.HandleFunc(config.RefereeConnection.SubscribePath, refereeBoard.WsHandler)
2329

2430
setupUi(config.ListenAddress)
@@ -38,3 +44,7 @@ func setupUi(listenAddress string) {
3844
log.Print("Backend-only version started. Run the UI separately or get a binary that has the UI included")
3945
}
4046
}
47+
48+
func parseSkipInterfaces(skipInterfaces string) []string {
49+
return strings.Split(skipInterfaces, ",")
50+
}
Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,57 @@
11
package board
22

33
import (
4+
"github.com/RoboCup-SSL/ssl-status-board/pkg/sslnet"
45
"github.com/gorilla/websocket"
56
"log"
67
"net/http"
8+
"sync"
79
"time"
810
)
911

1012
// Board contains the state of this referee board
1113
type Board struct {
12-
cfg RefereeConfig
13-
refereeData []byte
14+
cfg RefereeConfig
15+
refereeData []byte
16+
mutex sync.Mutex
17+
MulticastReceiver *sslnet.MulticastReceiver
1418
}
1519

1620
// NewBoard creates a new referee board
17-
func NewBoard(cfg RefereeConfig) Board {
18-
return Board{cfg: cfg}
21+
func NewBoard(cfg RefereeConfig) (b *Board) {
22+
b = new(Board)
23+
b.cfg = cfg
24+
b.MulticastReceiver = sslnet.NewMulticastReceiver(b.handlingMessage)
25+
return
1926
}
2027

21-
// HandleIncomingMessages listens for new messages and stores the latest ones
22-
func (b *Board) HandleIncomingMessages() {
23-
HandleIncomingMessages(b.cfg.ConnectionConfig.MulticastAddress, b.handlingMessage)
28+
// Start listening for messages
29+
func (b *Board) Start() {
30+
b.MulticastReceiver.Start(b.cfg.MulticastAddress)
31+
}
32+
33+
// Stop listening for messages
34+
func (b *Board) Stop() {
35+
b.MulticastReceiver.Stop()
2436
}
2537

2638
func (b *Board) handlingMessage(data []byte) {
39+
b.mutex.Lock()
40+
defer b.mutex.Unlock()
2741
b.refereeData = data
2842
}
2943

3044
// SendToWebSocket sends latest data to the given websocket
3145
func (b *Board) SendToWebSocket(conn *websocket.Conn) {
3246
for {
47+
b.mutex.Lock()
3348
if len(b.refereeData) > 0 {
3449
if err := conn.WriteMessage(websocket.BinaryMessage, b.refereeData); err != nil {
3550
log.Println("Could not write to referee websocket: ", err)
3651
return
3752
}
3853
}
54+
b.mutex.Unlock()
3955

4056
time.Sleep(b.cfg.SendingInterval)
4157
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type ConnectionConfig struct {
1010
SubscribePath string `yaml:"SubscribePath"`
1111
SendingInterval time.Duration `yaml:"SendingInterval"`
1212
MulticastAddress string `yaml:"MulticastAddress"`
13+
SkipInterfaces []string `yaml:"skip-interfaces"`
1314
}
1415

1516
// RefereeConfig contains referee specific connection parameters
@@ -39,6 +40,7 @@ func DefaultConfig() Config {
3940
RefereeConnection: RefereeConfig{
4041
ConnectionConfig: ConnectionConfig{
4142
MulticastAddress: "224.5.23.1:10003",
43+
SkipInterfaces: []string{},
4244
SendingInterval: time.Millisecond * 100,
4345
SubscribePath: "/api/referee",
4446
},

pkg/board/multicast.go

Lines changed: 0 additions & 51 deletions
This file was deleted.

pkg/sslnet/multicast_receiver.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package sslnet
2+
3+
import (
4+
"log"
5+
"net"
6+
"sync"
7+
"time"
8+
)
9+
10+
const maxDatagramSize = 8192
11+
12+
type MulticastReceiver struct {
13+
activeIfis map[string]bool
14+
connections []*net.UDPConn
15+
running bool
16+
consumer func([]byte)
17+
mutex sync.Mutex
18+
SkipInterfaces []string
19+
}
20+
21+
func NewMulticastReceiver(consumer func([]byte)) (r *MulticastReceiver) {
22+
r = new(MulticastReceiver)
23+
r.activeIfis = map[string]bool{}
24+
r.consumer = consumer
25+
return
26+
}
27+
28+
func (r *MulticastReceiver) Start(multicastAddress string) {
29+
r.running = true
30+
go r.receive(multicastAddress)
31+
}
32+
33+
func (r *MulticastReceiver) Stop() {
34+
r.mutex.Lock()
35+
defer r.mutex.Unlock()
36+
r.running = false
37+
for _, c := range r.connections {
38+
if err := c.Close(); err != nil {
39+
log.Println("Could not close connection: ", err)
40+
}
41+
}
42+
}
43+
44+
func (r *MulticastReceiver) receive(multicastAddress string) {
45+
for r.isRunning() {
46+
ifis, _ := net.Interfaces()
47+
for _, ifi := range ifis {
48+
if ifi.Flags&net.FlagMulticast == 0 || // No multicast support
49+
r.skipInterface(ifi.Name) {
50+
continue
51+
}
52+
r.mutex.Lock()
53+
if _, ok := r.activeIfis[ifi.Name]; !ok {
54+
// interface not active, (re-)start receiving
55+
go r.receiveOnInterface(multicastAddress, ifi)
56+
}
57+
r.mutex.Unlock()
58+
}
59+
time.Sleep(1 * time.Second)
60+
}
61+
}
62+
63+
func (r *MulticastReceiver) isRunning() bool {
64+
r.mutex.Lock()
65+
defer r.mutex.Unlock()
66+
return r.running
67+
}
68+
69+
func (r *MulticastReceiver) skipInterface(ifiName string) bool {
70+
for _, skipIfi := range r.SkipInterfaces {
71+
if skipIfi == ifiName {
72+
return true
73+
}
74+
}
75+
return false
76+
}
77+
78+
func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net.Interface) {
79+
addr, err := net.ResolveUDPAddr("udp", multicastAddress)
80+
if err != nil {
81+
log.Printf("Could resolve multicast address %v: %v", multicastAddress, err)
82+
return
83+
}
84+
85+
listener, err := net.ListenMulticastUDP("udp", &ifi, addr)
86+
if err != nil {
87+
log.Printf("Could not listen at %v: %v", multicastAddress, err)
88+
return
89+
}
90+
91+
if err := listener.SetReadBuffer(maxDatagramSize); err != nil {
92+
log.Println("Could not set read buffer: ", err)
93+
}
94+
95+
r.mutex.Lock()
96+
r.connections = append(r.connections, listener)
97+
r.activeIfis[ifi.Name] = true
98+
r.mutex.Unlock()
99+
100+
log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name)
101+
102+
data := make([]byte, maxDatagramSize)
103+
for {
104+
n, _, err := listener.ReadFrom(data)
105+
if err != nil {
106+
log.Println("ReadFromUDP failed:", err)
107+
break
108+
}
109+
110+
r.consumer(data[:n])
111+
}
112+
113+
log.Printf("Stop listening on %s (%s)", multicastAddress, ifi.Name)
114+
115+
if err := listener.Close(); err != nil {
116+
log.Println("Could not close listener: ", err)
117+
}
118+
r.mutex.Lock()
119+
delete(r.activeIfis, ifi.Name)
120+
for i, c := range r.connections {
121+
if c == listener {
122+
r.connections = append(r.connections[:i], r.connections[i+1:]...)
123+
}
124+
}
125+
r.mutex.Unlock()
126+
}

0 commit comments

Comments
 (0)