Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions conf/documentation.conf
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,13 @@ description=<<EOT
Should pfstats be managed by PacketFence?
EOT

[services.pfudpproxy]
type=toggle
options=enabled|disabled
description=<<EOT
Should pfudpproxy be managed by PacketFence? This is a UDP reverse proxy for NetFlow/sFlow in cluster mode.
EOT

[services.tracking-config]
type=toggle
options=enabled|disabled
Expand Down Expand Up @@ -579,6 +586,12 @@ description=<<EOT
Location of the pfstats binary.
EOT

[services.pfudpproxy_binary]
type=hidden
description=<<EOT
Location of the pfudpproxy binary.
EOT

[services.radsniff_binary]
type=hidden
description=<<EOT
Expand Down
1 change: 0 additions & 1 deletion conf/keepalived.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,3 @@ vrrp_script haproxy_portal {

%%vrrp%%

%%lvs%%
10 changes: 10 additions & 0 deletions conf/pf.conf.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,16 @@ pfstats=enabled
# Location of the pfstats binary.
pfstats_binary=/usr/local/pf/sbin/pfstats

# services.pfudpproxy
#
# Should pfudpproxy be managed by PacketFence?
pfudpproxy=enabled
#
# services.pfudpproxy_binary
#
# Location of the pfudpproxy binary.
pfudpproxy_binary=/usr/local/pf/sbin/pfudpproxy

# services.tracking-config
#
# Should tracking-config be managed by PacketFence?
Expand Down
19 changes: 19 additions & 0 deletions conf/systemd/packetfence-pfudpproxy.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (C) Inverse inc.
[Unit]
Description=PacketFence UDP Reverse Proxy for Cluster Mode
Wants=packetfence-keepalived.service packetfence-base.target packetfence-config.service
After=packetfence-keepalived.service packetfence-base.target packetfence-config.service

[Service]
Type=notify
WatchdogSec=30s
NotifyAccess=all
Environment=LOG_LEVEL=INFO
ExecStart=/usr/local/pf/sbin/pfudpproxy
Restart=on-failure
StartLimitBurst=3
StartLimitInterval=60
Slice=packetfence.slice

[Install]
WantedBy=packetfence-cluster.target
2 changes: 1 addition & 1 deletion config.mk
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ LOCAL_REGISTRY = packetfence
# Golang
#
GOVERSION = go1.25.5
PF_BINARIES = pfhttpd pfqueue-go pfdhcp pfdns pfstats pfdetect galera-autofix pfacct pfcron mysql-probe pfconnector sdnotify-proxy
PF_BINARIES = pfhttpd pfqueue-go pfdhcp pfdns pfstats pfdetect galera-autofix pfacct pfcron mysql-probe pfconnector sdnotify-proxy pfudpproxy
PF_GO_CMDS = pfcrypt pfkafka

#
Expand Down
1 change: 1 addition & 0 deletions debian/packetfence-pfudpproxy.service
1 change: 1 addition & 0 deletions debian/rules
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ binary-arch: build install
dh_installinit --no-start --name=packetfence-tracking-config
dh_installinit --no-start --name=packetfence-kafka
dh_installinit --name=packetfence-netdata
dh_installinit --name=packetfence-pfudpproxy

dh_installcron
# dh_installinfo
Expand Down
128 changes: 128 additions & 0 deletions go/cmd/pfudpproxy/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"context"
"fmt"
"time"

"github.com/inverse-inc/go-utils/log"
"github.com/inverse-inc/packetfence/go/pfconfigdriver"
)

// Default configuration values
const (
DefaultHealthCheckPort = 4723
DefaultHealthCheckPath = "/"
DefaultHealthCheckInterval = 5 * time.Second
DefaultHealthCheckTimeout = 10 * time.Second
DefaultExpectedStatusCode = 404

PortNetFlow = 2055
PortSFlow = 6343
PortIPFIX = 4739
)

// ProxyConfig holds the configuration for the UDP proxy
type ProxyConfig struct {
VIPAddress string
Ports []int
Backends []*Backend
HealthCheckPort int
HealthCheckPath string
HealthCheckInterval time.Duration
HealthCheckTimeout time.Duration
ExpectedStatusCode int
}

// Backend represents a cluster member that can receive forwarded packets
type Backend struct {
Host string
ManagementIP string
Healthy bool
LastCheck time.Time
}

// getHealthCheckPort returns the health check port from FingerbankSettingsCollector or default
func getHealthCheckPort(ctx context.Context) int {
collector := pfconfigdriver.GetType[pfconfigdriver.FingerbankSettingsCollector](ctx)
if port, err := collector.Port.Int64(); err == nil && port > 0 && port < 65536 {
return int(port)
}
return DefaultHealthCheckPort
}

// LoadConfig loads the proxy configuration from pfconfig
func LoadConfig(ctx context.Context) (*ProxyConfig, error) {
config := &ProxyConfig{
Ports: []int{PortNetFlow, PortSFlow, PortIPFIX},
HealthCheckPort: getHealthCheckPort(ctx),
HealthCheckPath: DefaultHealthCheckPath,
HealthCheckInterval: DefaultHealthCheckInterval,
HealthCheckTimeout: DefaultHealthCheckTimeout,
ExpectedStatusCode: DefaultExpectedStatusCode,
}

// Load VIP address from management network configuration
vip, err := loadVIPAddress(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load VIP address: %w", err)
}
config.VIPAddress = vip

// Load cluster members as backends
backends, err := loadBackends(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load backends: %w", err)
}
config.Backends = backends

return config, nil
}

// loadVIPAddress loads the VIP address from the cluster configuration
func loadVIPAddress(ctx context.Context) (string, error) {
var mgmtNet pfconfigdriver.ManagementNetwork
pfconfigdriver.FetchDecodeSocketCache(ctx, &mgmtNet)

var keyConfCluster pfconfigdriver.NetInterface
keyConfCluster.PfconfigNS = "config::Pf(CLUSTER," + pfconfigdriver.FindClusterName(ctx) + ")"
keyConfCluster.PfconfigHashNS = "interface " + mgmtNet.Int
pfconfigdriver.FetchDecodeSocket(ctx, &keyConfCluster)

if keyConfCluster.Ip == "" {
log.LoggerWContext(ctx).Warn("No VIP configured in cluster config for interface " + mgmtNet.Int)
return "", nil
}

log.LoggerWContext(ctx).Debug("Loaded VIP address from cluster config: " + keyConfCluster.Ip)
return keyConfCluster.Ip, nil
}

// loadBackends loads cluster members from pfconfig
func loadBackends(ctx context.Context) ([]*Backend, error) {
var clusterServers pfconfigdriver.AllClusterServers
pfconfigdriver.FetchDecodeSocketCache(ctx, &clusterServers)

backends := make([]*Backend, 0, len(clusterServers.Element))
for _, server := range clusterServers.Element {
if server.ManagementIp == "" {
log.LoggerWContext(ctx).Warn("Cluster server " + server.Host + " has no management IP, skipping")
continue
}

backend := &Backend{
Host: server.Host,
ManagementIP: server.ManagementIp,
Healthy: false, // Will be updated by health checker
LastCheck: time.Time{},
}
backends = append(backends, backend)
log.LoggerWContext(ctx).Debug("Added backend: " + server.Host + " (" + server.ManagementIp + ")")
}

if len(backends) == 0 {
return nil, fmt.Errorf("no valid cluster servers found")
}

return backends, nil
}
147 changes: 147 additions & 0 deletions go/cmd/pfudpproxy/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package main

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"sync"
"time"

"github.com/inverse-inc/go-utils/log"
)

// HealthChecker periodically checks the health of backends by making
// HTTPS requests to fingerbank-collector.
type HealthChecker struct {
config *ProxyConfig
lb *LoadBalancer
httpClient *http.Client
}

// NewHealthChecker creates a new health checker.
func NewHealthChecker(config *ProxyConfig, lb *LoadBalancer) *HealthChecker {
// Create HTTP client with TLS config that skips certificate verification
// (fingerbank-collector uses self-signed certificates)
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
TLSHandshakeTimeout: 5 * time.Second,
}
Comment on lines +24 to +31
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Health checks unconditionally set InsecureSkipVerify: true. Even if fingerbank-collector commonly uses self-signed certs, this removes TLS authenticity entirely; consider wiring this to an explicit config toggle (or trusting a configured CA bundle) so deployments that can validate certs are not forced into insecure mode.

Copilot uses AI. Check for mistakes.

httpClient := &http.Client{
Transport: transport,
Timeout: config.HealthCheckTimeout,
// Don't follow redirects
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}

return &HealthChecker{
config: config,
lb: lb,
httpClient: httpClient,
}
}

// Start begins the health checking loop.
func (hc *HealthChecker) Start(ctx context.Context) {
log.LoggerWContext(ctx).Info("Starting health checker")

// Do an initial health check immediately
hc.checkAllBackends(ctx)

ticker := time.NewTicker(hc.config.HealthCheckInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.LoggerWContext(ctx).Info("Health checker stopped")
return
case <-ticker.C:
hc.checkAllBackends(ctx)
}
}
}

// checkAllBackends checks the health of all backends in parallel.
func (hc *HealthChecker) checkAllBackends(ctx context.Context) {
backends := hc.lb.GetAllBackends()
if len(backends) == 0 {
log.LoggerWContext(ctx).Warn("No backends to check")
return
}

log.LoggerWContext(ctx).Debug(fmt.Sprintf("Running health checks for %d backends", len(backends)))

var wg sync.WaitGroup
for i := range backends {
wg.Add(1)
go func(b Backend) {
defer wg.Done()
hc.checkBackend(ctx, b)
}(backends[i])
}
wg.Wait()

// Log current primary after health checks
primary := hc.lb.GetPrimary()
if primary != nil {
log.LoggerWContext(ctx).Debug(fmt.Sprintf("Current primary backend: %s (%s)", primary.Host, primary.ManagementIP))
} else {
log.LoggerWContext(ctx).Warn("No healthy backend available after health checks")
}
}

// checkBackend checks the health of a single backend.
// backend is received by value (a snapshot from GetAllBackends) so reads
// of its fields are safe without holding the load balancer lock.
func (hc *HealthChecker) checkBackend(ctx context.Context, backend Backend) {
url := fmt.Sprintf("https://%s:%d%s",
backend.ManagementIP,
hc.config.HealthCheckPort,
hc.config.HealthCheckPath,
)

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
log.LoggerWContext(ctx).Error(fmt.Sprintf("Failed to create health check request for %s: %s",
backend.Host, err.Error()))
hc.lb.SetHealth(backend.Host, false)
return
}

resp, err := hc.httpClient.Do(req)
if err != nil {
log.LoggerWContext(ctx).Debug(fmt.Sprintf("Health check failed for %s: %s",
backend.Host, err.Error()))
hc.lb.SetHealth(backend.Host, false)
return
}
defer resp.Body.Close()

// fingerbank-collector returns 404 on "/" when healthy
healthy := resp.StatusCode == hc.config.ExpectedStatusCode

log.LoggerWContext(ctx).Debug(fmt.Sprintf("Health check result for %s (%s): status=%d, healthy=%v, wasHealthy=%v",
backend.Host, backend.ManagementIP, resp.StatusCode, healthy, backend.Healthy))

if healthy && !backend.Healthy {
log.LoggerWContext(ctx).Info(fmt.Sprintf("Backend %s (%s) is now healthy",
backend.Host, backend.ManagementIP))
} else if !healthy && backend.Healthy {
log.LoggerWContext(ctx).Warn(fmt.Sprintf("Backend %s (%s) is now unhealthy (status: %d, expected: %d)",
backend.Host, backend.ManagementIP, resp.StatusCode, hc.config.ExpectedStatusCode))
}

hc.lb.SetHealth(backend.Host, healthy)
}

// UpdateConfig updates the health checker configuration.
func (hc *HealthChecker) UpdateConfig(config *ProxyConfig) {
hc.config = config
hc.httpClient.Timeout = config.HealthCheckTimeout
}
Loading