Skip to content

Commit 6c10222

Browse files
committed
Fixes for PR
1 parent 8aa8066 commit 6c10222

File tree

5 files changed

+84
-32
lines changed

5 files changed

+84
-32
lines changed

go/cmd/pfudpproxy/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const (
1919

2020
PortNetFlow = 2055
2121
PortSFlow = 6343
22+
PortIPFIX = 4739
2223
)
2324

2425
// ProxyConfig holds the configuration for the UDP proxy
@@ -53,7 +54,7 @@ func getHealthCheckPort(ctx context.Context) int {
5354
// LoadConfig loads the proxy configuration from pfconfig
5455
func LoadConfig(ctx context.Context) (*ProxyConfig, error) {
5556
config := &ProxyConfig{
56-
Ports: []int{PortNetFlow, PortSFlow},
57+
Ports: []int{PortNetFlow, PortSFlow, PortIPFIX},
5758
HealthCheckPort: getHealthCheckPort(ctx),
5859
HealthCheckPath: DefaultHealthCheckPath,
5960
HealthCheckInterval: DefaultHealthCheckInterval,

go/cmd/pfudpproxy/healthcheck.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ func (hc *HealthChecker) checkAllBackends(ctx context.Context) {
7878
log.LoggerWContext(ctx).Debug(fmt.Sprintf("Running health checks for %d backends", len(backends)))
7979

8080
var wg sync.WaitGroup
81-
for _, backend := range backends {
81+
for i := range backends {
8282
wg.Add(1)
83-
go func(b *Backend) {
83+
go func(b Backend) {
8484
defer wg.Done()
8585
hc.checkBackend(ctx, b)
86-
}(backend)
86+
}(backends[i])
8787
}
8888
wg.Wait()
8989

@@ -97,7 +97,9 @@ func (hc *HealthChecker) checkAllBackends(ctx context.Context) {
9797
}
9898

9999
// checkBackend checks the health of a single backend.
100-
func (hc *HealthChecker) checkBackend(ctx context.Context, backend *Backend) {
100+
// backend is received by value (a snapshot from GetAllBackends) so reads
101+
// of its fields are safe without holding the load balancer lock.
102+
func (hc *HealthChecker) checkBackend(ctx context.Context, backend Backend) {
101103
url := fmt.Sprintf("https://%s:%d%s",
102104
backend.ManagementIP,
103105
hc.config.HealthCheckPort,

go/cmd/pfudpproxy/loadbalancer.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,33 @@ func NewLoadBalancer(backends []*Backend) *LoadBalancer {
2121
}
2222
}
2323

24-
// GetPrimary returns the first healthy backend (failover mode).
24+
// GetPrimary returns a snapshot of the first healthy backend (failover mode).
2525
// Returns nil if no healthy backend is available.
26+
// The returned value is a copy; mutating it does not affect the load balancer.
2627
func (lb *LoadBalancer) GetPrimary() *Backend {
2728
lb.mu.RLock()
2829
defer lb.mu.RUnlock()
2930

3031
for _, backend := range lb.backends {
3132
if backend.Healthy {
32-
return backend
33+
copy := *backend
34+
return &copy
3335
}
3436
}
3537
return nil
3638
}
3739

38-
// GetAllBackends returns a copy of all backends for health checking.
39-
func (lb *LoadBalancer) GetAllBackends() []*Backend {
40+
// GetAllBackends returns a snapshot of all backends for health checking.
41+
// Each element is a struct copy; callers can read fields safely without
42+
// the load balancer lock.
43+
func (lb *LoadBalancer) GetAllBackends() []Backend {
4044
lb.mu.RLock()
4145
defer lb.mu.RUnlock()
4246

43-
backends := make([]*Backend, len(lb.backends))
44-
copy(backends, lb.backends)
47+
backends := make([]Backend, len(lb.backends))
48+
for i, b := range lb.backends {
49+
backends[i] = *b
50+
}
4551
return backends
4652
}
4753

go/cmd/pfudpproxy/proxy.go

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,25 @@ type UDPProxy struct {
2626
running bool
2727
stopChan chan struct{}
2828
wg sync.WaitGroup
29+
30+
// fwdConn is a shared unbound UDP connection used by all forwarders.
31+
// Because it is not bound to a specific destination, WriteToUDP can
32+
// send to any backend without opening a new socket per packet.
33+
fwdConn *net.UDPConn
34+
35+
// addrCache maps "ip:port" to a resolved *net.UDPAddr so we don't
36+
// call ResolveUDPAddr on every packet.
37+
addrCache map[string]*net.UDPAddr
38+
addrCacheMu sync.RWMutex
2939
}
3040

3141
// NewUDPProxy creates a new UDP proxy.
3242
func NewUDPProxy(config *ProxyConfig, lb *LoadBalancer) *UDPProxy {
3343
return &UDPProxy{
34-
config: config,
35-
lb: lb,
36-
stopChan: make(chan struct{}),
44+
config: config,
45+
lb: lb,
46+
stopChan: make(chan struct{}),
47+
addrCache: make(map[string]*net.UDPAddr),
3748
}
3849
}
3950

@@ -49,6 +60,14 @@ func (p *UDPProxy) Start(ctx context.Context) {
4960

5061
log.LoggerWContext(ctx).Info("Starting UDP proxy")
5162

63+
// Open a single unbound UDP socket for all outbound forwarding.
64+
fwd, err := net.ListenUDP("udp", nil)
65+
if err != nil {
66+
log.LoggerWContext(ctx).Error(fmt.Sprintf("Failed to open forwarding socket: %s", err.Error()))
67+
return
68+
}
69+
p.fwdConn = fwd
70+
5271
for _, port := range p.config.Ports {
5372
p.wg.Add(1)
5473
go func(port int) {
@@ -76,6 +95,11 @@ func (p *UDPProxy) Stop(ctx context.Context) {
7695
}
7796
}
7897

98+
// Close the shared forwarding socket
99+
if p.fwdConn != nil {
100+
p.fwdConn.Close()
101+
}
102+
79103
// Wait for all goroutines to finish
80104
done := make(chan struct{})
81105
go func() {
@@ -106,6 +130,11 @@ func (p *UDPProxy) UpdateConfig(ctx context.Context, newConfig *ProxyConfig) {
106130
}
107131

108132
p.config = newConfig
133+
134+
// Flush the address cache so stale entries don't survive a backend change.
135+
p.addrCacheMu.Lock()
136+
p.addrCache = make(map[string]*net.UDPAddr)
137+
p.addrCacheMu.Unlock()
109138
}
110139

111140
// listenAndForward listens on VIP:port and forwards packets to healthy backends.
@@ -168,6 +197,27 @@ func (p *UDPProxy) listenAndForward(ctx context.Context, port int) {
168197
}
169198
}
170199

200+
// resolveAddr returns a cached *net.UDPAddr for the given key (ip:port),
201+
// resolving and caching it on the first call.
202+
func (p *UDPProxy) resolveAddr(key string) (*net.UDPAddr, error) {
203+
p.addrCacheMu.RLock()
204+
addr, ok := p.addrCache[key]
205+
p.addrCacheMu.RUnlock()
206+
if ok {
207+
return addr, nil
208+
}
209+
210+
addr, err := net.ResolveUDPAddr("udp", key)
211+
if err != nil {
212+
return nil, err
213+
}
214+
215+
p.addrCacheMu.Lock()
216+
p.addrCache[key] = addr
217+
p.addrCacheMu.Unlock()
218+
return addr, nil
219+
}
220+
171221
// forwardPacket forwards a UDP packet to the primary healthy backend.
172222
func (p *UDPProxy) forwardPacket(ctx context.Context, data []byte, srcAddr *net.UDPAddr, port int) {
173223
backend := p.lb.GetPrimary()
@@ -176,32 +226,23 @@ func (p *UDPProxy) forwardPacket(ctx context.Context, data []byte, srcAddr *net.
176226
return
177227
}
178228

179-
// Create destination address using backend's management IP and same port
180-
dstAddr := fmt.Sprintf("%s:%d", backend.ManagementIP, port)
181-
udpDstAddr, err := net.ResolveUDPAddr("udp", dstAddr)
229+
// Resolve (or retrieve from cache) the destination address
230+
dstKey := fmt.Sprintf("%s:%d", backend.ManagementIP, port)
231+
udpDstAddr, err := p.resolveAddr(dstKey)
182232
if err != nil {
183233
log.LoggerWContext(ctx).Error(fmt.Sprintf("Failed to resolve destination address %s: %s",
184-
dstAddr, err.Error()))
185-
return
186-
}
187-
188-
// Create a new UDP connection for forwarding
189-
// Using a new connection each time since NetFlow/sFlow are fire-and-forget
190-
conn, err := net.DialUDP("udp", nil, udpDstAddr)
191-
if err != nil {
192-
log.LoggerWContext(ctx).Error(fmt.Sprintf("Failed to connect to backend %s: %s",
193-
dstAddr, err.Error()))
234+
dstKey, err.Error()))
194235
return
195236
}
196-
defer conn.Close()
197237

198-
_, err = conn.Write(data)
238+
// Use the shared unbound socket to send to the backend
239+
_, err = p.fwdConn.WriteToUDP(data, udpDstAddr)
199240
if err != nil {
200241
log.LoggerWContext(ctx).Error(fmt.Sprintf("Failed to forward packet to %s: %s",
201-
dstAddr, err.Error()))
242+
dstKey, err.Error()))
202243
return
203244
}
204245

205246
log.LoggerWContext(ctx).Debug(fmt.Sprintf("Forwarded %d bytes from %s to %s",
206-
len(data), srcAddr.String(), dstAddr))
247+
len(data), srcAddr.String(), dstKey))
207248
}

lib/pf/iptables.pm

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1563,7 +1563,7 @@ sub iptables_fingerbank_collector_rules {
15631563

15641564
=item iptables_pfudpproxy_rules
15651565
1566-
Iptable rules for pfudpproxy service (UDP proxy for NetFlow/sFlow in cluster mode)
1566+
Iptable rules for pfudpproxy service (UDP proxy for NetFlow/sFlow/IPFIX in cluster mode)
15671567
15681568
=cut
15691569

@@ -1582,6 +1582,8 @@ sub iptables_pfudpproxy_rules {
15821582
util_safe_push( "-i $tint -p udp -m udp --dport 2055 --jump ACCEPT", $chains->{'filter'}{'INPUT'} );
15831583
# Port 6343 - sFlow (UDP)
15841584
util_safe_push( "-i $tint -p udp -m udp --dport 6343 --jump ACCEPT", $chains->{'filter'}{'INPUT'} );
1585+
# Port 4739 - IPFIX (UDP)
1586+
util_safe_push( "-i $tint -p udp -m udp --dport 4739 --jump ACCEPT", $chains->{'filter'}{'INPUT'} );
15851587
# Convert to JSON and save to file
15861588
util_create_service_rules($chains);
15871589
} else {

0 commit comments

Comments
 (0)