Skip to content

Commit fcb273b

Browse files
committed
feature: Listen on all interfaces for multicast messages
1 parent ae2b35a commit fcb273b

File tree

4 files changed

+179
-96
lines changed

4 files changed

+179
-96
lines changed

internal/app/rcon/server.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"net"
1616
"os"
1717
"strings"
18+
"sync"
1819
)
1920

2021
type Server struct {
@@ -25,6 +26,7 @@ type Server struct {
2526
connectionHandler func(net.Conn)
2627
listener net.Listener
2728
running bool
29+
mutex sync.Mutex
2830
}
2931

3032
type Client struct {
@@ -68,6 +70,8 @@ func (s *Server) Stop() {
6870
if s.listener == nil {
6971
return
7072
}
73+
s.mutex.Lock()
74+
defer s.mutex.Unlock()
7175
s.running = false
7276
if err := s.listener.Close(); err != nil {
7377
log.Printf("Could not close listener: %v", err)
@@ -81,7 +85,7 @@ func (s *Server) Stop() {
8185
func (s *Server) listen() {
8286
log.Print("Listening on ", s.address)
8387

84-
for s.running {
88+
for s.isRunning() {
8589
conn, err := s.listener.Accept()
8690
if err != nil {
8791
log.Print("Could not accept connection: ", err)
@@ -91,6 +95,12 @@ func (s *Server) listen() {
9195
}
9296
}
9397

98+
func (s *Server) isRunning() bool {
99+
s.mutex.Lock()
100+
defer s.mutex.Unlock()
101+
return s.running
102+
}
103+
94104
func (s *Server) createListener() (net.Listener, error) {
95105
return net.Listen("tcp", s.address)
96106
}
Lines changed: 18 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,47 @@
11
package tracker
22

33
import (
4+
"github.com/RoboCup-SSL/ssl-game-controller/pkg/sslnet"
45
"github.com/golang/protobuf/proto"
56
"log"
6-
"net"
7-
"time"
7+
"sync"
88
)
99

10-
const maxDatagramSize = 8192
11-
1210
type Receiver struct {
13-
address string
14-
Callback func(*TrackerWrapperPacket)
15-
conn *net.UDPConn
11+
address string
12+
Callback func(*TrackerWrapperPacket)
13+
mutex sync.Mutex
14+
MulticastReceiver *sslnet.MulticastReceiver
1615
}
1716

1817
// NewReceiver creates a new receiver
1918
func NewReceiver(address string) (v *Receiver) {
2019
v = new(Receiver)
2120
v.address = address
2221
v.Callback = func(*TrackerWrapperPacket) {}
22+
v.MulticastReceiver = sslnet.NewMulticastReceiver(v.consumeData)
2323
return
2424
}
2525

2626
// Start starts the receiver
2727
func (v *Receiver) Start() {
28-
addr, err := net.ResolveUDPAddr("udp", v.address)
29-
if err != nil {
30-
log.Print(err)
31-
return
32-
}
33-
conn, err := net.ListenMulticastUDP("udp", nil, addr)
34-
if err != nil {
35-
log.Print(err)
36-
return
37-
}
38-
39-
if err := conn.SetReadBuffer(maxDatagramSize); err != nil {
40-
log.Printf("Could not set read buffer to %v.", maxDatagramSize)
41-
}
42-
log.Println("Receiving tracking from", v.address)
43-
44-
v.conn = conn
45-
go v.receive()
28+
v.MulticastReceiver.Start(v.address)
4629
}
4730

4831
// Stop stops the receiver
4932
func (v *Receiver) Stop() {
50-
if err := v.conn.Close(); err != nil {
51-
log.Printf("Could not close vision receiver: %v", err)
52-
}
33+
v.MulticastReceiver.Stop()
5334
}
5435

55-
func (v *Receiver) receive() {
56-
b := make([]byte, maxDatagramSize)
57-
for {
58-
n, err := v.conn.Read(b)
59-
if err != nil {
60-
log.Print("Could not read: ", err)
61-
time.Sleep(1 * time.Second)
62-
continue
63-
}
64-
if n >= maxDatagramSize {
65-
log.Fatal("Buffer size too small")
66-
}
67-
wrapper := TrackerWrapperPacket{}
68-
if err := proto.Unmarshal(b[0:n], &wrapper); err != nil {
69-
log.Println("Could not unmarshal referee message")
70-
continue
71-
}
36+
func (v *Receiver) consumeData(data []byte) {
37+
v.mutex.Lock()
38+
defer v.mutex.Unlock()
7239

73-
v.Callback(&wrapper)
40+
wrapper := TrackerWrapperPacket{}
41+
if err := proto.Unmarshal(data, &wrapper); err != nil {
42+
log.Println("Could not unmarshal referee message")
43+
return
7444
}
45+
46+
v.Callback(&wrapper)
7547
}

internal/app/vision/visionReceiver.go

Lines changed: 24 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package vision
22

33
import (
4+
"github.com/RoboCup-SSL/ssl-game-controller/pkg/sslnet"
45
"github.com/RoboCup-SSL/ssl-game-controller/pkg/timer"
56
"github.com/golang/protobuf/proto"
67
"log"
7-
"net"
8+
"sync"
89
"time"
910
)
1011

11-
const maxDatagramSize = 8192
12-
1312
type Receiver struct {
1413
address string
1514
DetectionCallback func(*SSL_DetectionFrame)
1615
GeometryCallback func(*SSL_GeometryData)
17-
conn *net.UDPConn
1816
latestTimestamp time.Time
17+
mutex sync.Mutex
18+
MulticastReceiver *sslnet.MulticastReceiver
1919
}
2020

2121
// NewReceiver creates a new receiver
@@ -24,66 +24,41 @@ func NewReceiver(address string) (v *Receiver) {
2424
v.address = address
2525
v.DetectionCallback = func(*SSL_DetectionFrame) {}
2626
v.GeometryCallback = func(data *SSL_GeometryData) {}
27+
v.MulticastReceiver = sslnet.NewMulticastReceiver(v.consumeData)
2728
return
2829
}
2930

3031
// Start starts the receiver
3132
func (v *Receiver) Start() {
32-
addr, err := net.ResolveUDPAddr("udp", v.address)
33-
if err != nil {
34-
log.Print(err)
35-
return
36-
}
37-
conn, err := net.ListenMulticastUDP("udp", nil, addr)
38-
if err != nil {
39-
log.Print(err)
40-
return
41-
}
42-
43-
if err := conn.SetReadBuffer(maxDatagramSize); err != nil {
44-
log.Printf("Could not set read buffer to %v.", maxDatagramSize)
45-
}
46-
log.Println("Receiving vision from", v.address)
47-
48-
v.conn = conn
49-
go v.receive()
33+
v.MulticastReceiver.Start(v.address)
5034
}
5135

5236
// Stop stops the receiver
5337
func (v *Receiver) Stop() {
54-
if err := v.conn.Close(); err != nil {
55-
log.Printf("Could not close vision receiver: %v", err)
56-
}
38+
v.MulticastReceiver.Stop()
5739
}
5840

59-
func (v *Receiver) receive() {
60-
b := make([]byte, maxDatagramSize)
61-
for {
62-
n, err := v.conn.Read(b)
63-
if err != nil {
64-
log.Print("Could not read: ", err)
65-
time.Sleep(1 * time.Second)
66-
continue
67-
}
68-
if n >= maxDatagramSize {
69-
log.Fatal("Buffer size too small")
70-
}
71-
wrapper := SSL_WrapperPacket{}
72-
if err := proto.Unmarshal(b[0:n], &wrapper); err != nil {
73-
log.Println("Could not unmarshal referee message")
74-
continue
75-
}
41+
func (v *Receiver) consumeData(data []byte) {
42+
v.mutex.Lock()
43+
defer v.mutex.Unlock()
44+
45+
wrapper := SSL_WrapperPacket{}
46+
if err := proto.Unmarshal(data, &wrapper); err != nil {
47+
log.Println("Could not unmarshal referee message")
48+
return
49+
}
7650

77-
if wrapper.Geometry != nil {
78-
v.GeometryCallback(wrapper.Geometry)
79-
}
80-
if wrapper.Detection != nil {
81-
v.latestTimestamp = timer.TimestampToTime(*wrapper.Detection.TCapture)
82-
v.DetectionCallback(wrapper.Detection)
83-
}
51+
if wrapper.Geometry != nil {
52+
v.GeometryCallback(wrapper.Geometry)
53+
}
54+
if wrapper.Detection != nil {
55+
v.latestTimestamp = timer.TimestampToTime(*wrapper.Detection.TCapture)
56+
v.DetectionCallback(wrapper.Detection)
8457
}
8558
}
8659

8760
func (v *Receiver) Time() time.Time {
61+
v.mutex.Lock()
62+
defer v.mutex.Unlock()
8863
return v.latestTimestamp
8964
}

pkg/sslnet/multicast_receiver.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package sslnet
2+
3+
import (
4+
"log"
5+
"net"
6+
"sync"
7+
"time"
8+
)
9+
10+
const maxDatagramSize = 8192
11+
12+
type MulticastReceiver struct {
13+
activeIfis map[string]bool
14+
connections []*net.UDPConn
15+
running bool
16+
consumer func([]byte)
17+
mutex sync.Mutex
18+
SkipInterfaces []string
19+
}
20+
21+
func NewMulticastReceiver(consumer func([]byte)) (r *MulticastReceiver) {
22+
r = new(MulticastReceiver)
23+
r.activeIfis = map[string]bool{}
24+
r.consumer = consumer
25+
return
26+
}
27+
28+
func (r *MulticastReceiver) Start(multicastAddress string) {
29+
r.running = true
30+
go r.receive(multicastAddress)
31+
}
32+
33+
func (r *MulticastReceiver) Stop() {
34+
r.mutex.Lock()
35+
defer r.mutex.Unlock()
36+
r.running = false
37+
for _, c := range r.connections {
38+
if err := c.Close(); err != nil {
39+
log.Println("Could not close connection: ", err)
40+
}
41+
}
42+
}
43+
44+
func (r *MulticastReceiver) receive(multicastAddress string) {
45+
for r.isRunning() {
46+
ifis, _ := net.Interfaces()
47+
for _, ifi := range ifis {
48+
if ifi.Flags&net.FlagMulticast == 0 || // No multicast support
49+
r.skipInterface(ifi.Name) {
50+
continue
51+
}
52+
r.mutex.Lock()
53+
if _, ok := r.activeIfis[ifi.Name]; !ok {
54+
// interface not active, (re-)start receiving
55+
go r.receiveOnInterface(multicastAddress, ifi)
56+
}
57+
r.mutex.Unlock()
58+
}
59+
time.Sleep(1 * time.Second)
60+
}
61+
}
62+
63+
func (r *MulticastReceiver) isRunning() bool {
64+
r.mutex.Lock()
65+
defer r.mutex.Unlock()
66+
return r.running
67+
}
68+
69+
func (r *MulticastReceiver) skipInterface(ifiName string) bool {
70+
for _, skipIfi := range r.SkipInterfaces {
71+
if skipIfi == ifiName {
72+
return true
73+
}
74+
}
75+
return false
76+
}
77+
78+
func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net.Interface) {
79+
addr, err := net.ResolveUDPAddr("udp", multicastAddress)
80+
if err != nil {
81+
log.Printf("Could resolve multicast address %v: %v", multicastAddress, err)
82+
return
83+
}
84+
85+
listener, err := net.ListenMulticastUDP("udp", &ifi, addr)
86+
if err != nil {
87+
log.Printf("Could not listen at %v: %v", multicastAddress, err)
88+
return
89+
}
90+
91+
if err := listener.SetReadBuffer(maxDatagramSize); err != nil {
92+
log.Println("Could not set read buffer: ", err)
93+
}
94+
95+
r.mutex.Lock()
96+
r.connections = append(r.connections, listener)
97+
r.activeIfis[ifi.Name] = true
98+
r.mutex.Unlock()
99+
100+
log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name)
101+
102+
data := make([]byte, maxDatagramSize)
103+
for {
104+
n, _, err := listener.ReadFrom(data)
105+
if err != nil {
106+
log.Println("ReadFromUDP failed:", err)
107+
break
108+
}
109+
110+
r.consumer(data[:n])
111+
}
112+
113+
log.Printf("Stop listening on %s (%s)", multicastAddress, ifi.Name)
114+
115+
if err := listener.Close(); err != nil {
116+
log.Println("Could not close listener: ", err)
117+
}
118+
r.mutex.Lock()
119+
delete(r.activeIfis, ifi.Name)
120+
for i, c := range r.connections {
121+
if c == listener {
122+
r.connections = append(r.connections[:i], r.connections[i+1:]...)
123+
}
124+
}
125+
r.mutex.Unlock()
126+
}

0 commit comments

Comments
 (0)