Skip to content

Commit 8fc5eb3

Browse files
authored
Merge pull request #164 from Notifiarr/dn2_api
Split http into its own package
2 parents dcb53fb + 93010bf commit 8fc5eb3

File tree

7 files changed

+206
-44
lines changed

7 files changed

+206
-44
lines changed

fog.conf

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
# FW_PASSWORD: If a password is set then clients must send it as a setting.
66
password = ""
7-
# FW_LISTEN_ADDR: IP and port to listen on.
7+
# FW_LISTEN_ADDR: IP and UDP port to listen on for UDP packets to write to disk.
88
listen_addr = ":9000"
99
# FW_FLUSH_INTERVAL: How old an item must be to flush it to disk.
1010
flush_interval = "16s"
@@ -19,18 +19,37 @@ log_file_mb = 0
1919
# FW_LOG_FILES: How many logs files to keep around.
2020
log_files = 0
2121
# FW_BUFFER_UDP: This is the UDP socket buffer in bytes.
22-
buffer_udp = 1048576
22+
buffer_udp = 4194304 # 4MB
2323
# FW_BUFFER_PACKET: This is the size of the read buffer per packet.
24-
buffer_packet = 8192
24+
buffer_packet = 262144 # 256KB
2525
# FW_BUFFER_CHAN: This is the channel buffer between the listeners and processors.
26-
buffer_chan = 10240
26+
buffer_chan = 32768 # 32 thousand
2727
# FW_LISTENERS: How many UDP socket listener threads to start.
2828
listeners = 1
2929
# FW_PROCESSORS: How many packet processor threads to start.
3030
processors = 1
3131
# FW_WRITERS: How many file system operations may happen in parallel. 2-5 is fine with fast disks.
3232
writers = 1
3333
# FW_BUFFER_FILE_SYS: How many file pointers can be buffered into the file system writer.
34-
buffer_file_sys = 10240
34+
buffer_file_sys = 32768 # 32 thousand
3535
# FW_DEBUG: Prints 1 line per packet when enabled.
3636
debug = false
37+
38+
# FW_HTTP_SERVER: Configuration for the HTTP server.
39+
[http_server]
40+
# FW_HTTP_SERVER_LISTEN_ADDR: IP and TCP port to listen on. For API and metrics via HTTP.
41+
listen_addr = ":9000"
42+
# FW_HTTP_SERVER_READ_TIMEOUT: Maximum duration for reading the entire request, including the body.
43+
read_timeout = "10s"
44+
# FW_HTTP_SERVER_READ_HEADER_TIMEOUT: Maximum duration for reading the request header.
45+
read_header_timeout = "2s"
46+
# FW_HTTP_SERVER_WRITE_TIMEOUT: Maximum duration for writing the response to the client.
47+
write_timeout = "10s"
48+
# FW_HTTP_SERVER_IDLE_TIMEOUT: Maximum amount of time to wait for the next request when keep-alives are enabled.
49+
idle_timeout = "120s"
50+
# FW_HTTP_SERVER_MAX_HEADER_BYTES: Maximum number of bytes the server will read parsing the request header's keys and values, including the request line.
51+
max_header_bytes = 65536 # 64KB
52+
# FW_HTTP_SERVER_TLS_CERT_PATH: Path to the TLS certificate file. Provide this to enable HTTPS.
53+
tls_cert_path = ""
54+
# FW_HTTP_SERVER_TLS_KEY_PATH: Path to the TLS private key file for the certificate. Provide this to enable HTTPS.
55+
tls_key_path = ""

pkg/fog/logger.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,22 @@ func (c *Config) Errorf(msg string, v ...any) {
6464
// printConfig logs the current configuration information.
6565
func (c *Config) printConfig() {
6666
c.Printf("=> Fog Willow Starting, pid: %d", os.Getpid())
67-
c.Printf("=> Listen Address / Password: %s / %v", c.ListenAddr, c.Password != "")
67+
c.Printf("=> UDP Listen Address / Password: %s / %v", c.ListenAddr, c.Password != "")
68+
69+
if c.HTTPServer.TLSCertPath != "" && c.HTTPServer.TLSKeyPath != "" {
70+
c.Printf("=> HTTPS Listen Address: %s (Cert=%s, Key=%s)", c.HTTPServer.ListenAddr, c.HTTPServer.TLSCertPath, c.HTTPServer.TLSKeyPath)
71+
} else {
72+
c.Printf("=> HTTP Listen Address: %s", c.HTTPServer.ListenAddr)
73+
}
74+
75+
c.Printf("=> HTTP Server; Read/Header/Write/Idle: %s/%s/%s/%s, Max Header Bytes: %d",
76+
c.HTTPServer.ReadTimeout, c.HTTPServer.ReadHeaderTimeout, c.HTTPServer.WriteTimeout,
77+
c.HTTPServer.IdleTimeout, c.HTTPServer.MaxHeaderBytes)
78+
6879
c.Printf("=> Output Path: %s", c.OutputPath)
6980
c.Printf("=> Intervals; Flush/Group: %s/%s", c.FlushInterval, c.GroupInterval)
7081
c.Printf("=> Buffers; UDP/Packet/Chan/FS: %d/%d/%d/%d", c.BufferUDP, c.BufferPacket, c.BufferChan, c.BufferFileSys)
82+
c.Printf("=> Buffer Pool (File Buffers): %v", c.BufferPool)
7183
c.Printf("=> Threads; Listen/Process/Writer: %d/%d/%d", c.Listeners, c.Processors, c.Writers)
7284

7385
if c.LogFile != "" {

pkg/fog/start.go

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,52 +3,50 @@ package fog
33

44
import (
55
"context"
6-
"errors"
76
"fmt"
87
"log"
98
"net"
10-
"net/http"
119
"sync"
1210
"time"
1311

1412
"github.com/Notifiarr/fogwillow/pkg/buf"
13+
"github.com/Notifiarr/fogwillow/pkg/httpserver"
1514
"github.com/Notifiarr/fogwillow/pkg/metrics"
1615
"github.com/Notifiarr/fogwillow/pkg/willow"
17-
"github.com/prometheus/client_golang/prometheus/promhttp"
1816
"golift.io/cnfg"
1917
"golift.io/cnfgfile"
2018
)
2119

2220
// Some application defaults.
2321
const (
2422
DefaultOutputPath = "/tmp"
25-
DefaultListenAddr = ":9000"
26-
DefaultUDPBuffer = 1024 * 1024
27-
DefaultPacketBuffer = 1024 * 8
28-
DefaultChanBuffer = 1024
23+
DefaultUDPBuffer = 1 << 22 // 4MB
24+
DefaultPacketBuffer = 1 << 18 // 256KB
25+
DefaultChanBuffer = 1 << 15 // 32 thousand
2926
)
3027

3128
// Config is the input _and_ running data.
3229
type Config struct {
3330
*willow.Config
34-
Password string `toml:"password" xml:"password"`
35-
OutputPath string `toml:"output_path" xml:"output_path"`
36-
ListenAddr string `toml:"listen_addr" xml:"listen_addr"`
37-
LogFile string `toml:"log_file" xml:"log_file"`
38-
LogFileMB uint `toml:"log_file_mb" xml:"log_file_mb"`
39-
LogFiles uint `toml:"log_files" xml:"log_files"`
40-
BufferUDP uint `toml:"buffer_udp" xml:"buffer_udp"`
41-
BufferPacket uint `toml:"buffer_packet" xml:"buffer_packet"`
42-
BufferChan uint `toml:"buffer_chan" xml:"buffer_chan"`
43-
Listeners uint `toml:"listeners" xml:"listeners"`
44-
Processors uint `toml:"processors" xml:"processors"`
45-
Debug bool `toml:"debug" xml:"debug"`
31+
HTTPServer *httpserver.Config `toml:"http_server" xml:"http_server"`
32+
Password string `toml:"password" xml:"password"`
33+
OutputPath string `toml:"output_path" xml:"output_path"`
34+
ListenAddr string `toml:"listen_addr" xml:"listen_addr"`
35+
LogFile string `toml:"log_file" xml:"log_file"`
36+
LogFileMB uint `toml:"log_file_mb" xml:"log_file_mb"`
37+
LogFiles uint `toml:"log_files" xml:"log_files"`
38+
BufferUDP uint `toml:"buffer_udp" xml:"buffer_udp"`
39+
BufferPacket uint `toml:"buffer_packet" xml:"buffer_packet"`
40+
BufferChan uint `toml:"buffer_chan" xml:"buffer_chan"`
41+
Listeners uint `toml:"listeners" xml:"listeners"`
42+
Processors uint `toml:"processors" xml:"processors"`
43+
Debug bool `toml:"debug" xml:"debug"`
4644
log *log.Logger
4745
packets chan *packet
4846
sock *net.UDPConn
4947
willow *willow.Willow
5048
metrics *metrics.Metrics
51-
httpSrv *http.Server
49+
httpSrv *httpserver.Server
5250
newBuf func(path string, data []byte) *buf.FileBuffer
5351
bytesPool sync.Pool
5452
listenerWg sync.WaitGroup
@@ -59,7 +57,7 @@ type Config struct {
5957
func LoadConfigFile(path string) (*Config, error) {
6058
config := &Config{
6159
OutputPath: DefaultOutputPath,
62-
ListenAddr: DefaultListenAddr,
60+
HTTPServer: httpserver.DefaultConfig(),
6361
BufferUDP: DefaultUDPBuffer,
6462
BufferPacket: DefaultPacketBuffer,
6563
BufferChan: DefaultChanBuffer,
@@ -104,20 +102,10 @@ func (c *Config) Start() error {
104102
go c.packetListener(idx)
105103
}
106104

107-
smx := http.NewServeMux()
108-
smx.Handle("/metrics", promhttp.Handler())
109-
110-
c.httpSrv = &http.Server{
111-
Handler: smx,
112-
Addr: c.ListenAddr,
113-
ReadTimeout: time.Second,
114-
ReadHeaderTimeout: time.Second,
115-
WriteTimeout: time.Second,
116-
IdleTimeout: 20 * time.Second, //nolint:mnd
117-
}
105+
c.httpSrv = httpserver.New(c.HTTPServer, nil)
118106

119107
err = c.httpSrv.ListenAndServe()
120-
if err != nil && !errors.Is(err, http.ErrServerClosed) {
108+
if err != nil {
121109
return fmt.Errorf("web server failed: %w", err)
122110
}
123111

@@ -148,17 +136,23 @@ func (c *Config) setup() {
148136
c.Logger = c
149137
c.Metrics = c.metrics
150138
c.willow = willow.NeWillow(c.Config)
139+
140+
if c.HTTPServer == nil {
141+
c.HTTPServer = httpserver.DefaultConfig()
142+
}
143+
144+
c.HTTPServer.Setup()
151145
}
152146

153147
func (c *Config) setupSocket() error {
154148
addr, err := net.ResolveUDPAddr("udp", c.ListenAddr)
155149
if err != nil {
156-
return fmt.Errorf("invalid listen_addr provided: %w", err)
150+
return fmt.Errorf("invalid udp listen_addr: %w", err)
157151
}
158152

159153
c.sock, err = net.ListenUDP("udp", addr)
160154
if err != nil {
161-
return fmt.Errorf("unable to use provided listen_addr: %w", err)
155+
return fmt.Errorf("unable to use udp listen_addr: %w", err)
162156
}
163157

164158
err = c.sock.SetReadBuffer(int(c.BufferUDP)) //nolint:gosec

pkg/httpserver/config.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package httpserver
2+
3+
import "time"
4+
5+
// HTTP server defaults.
6+
const (
7+
DefaultListenAddr = ":9000"
8+
DefaultReadTimeout = 10 * time.Second
9+
DefaultReadHeaderTimeout = 2 * time.Second
10+
DefaultWriteTimeout = 10 * time.Second
11+
DefaultIdleTimeout = 120 * time.Second
12+
DefaultMaxHeaderBytes = 1 << 16 // 64KB
13+
)
14+
15+
// Config is the configuration for the HTTP server.
16+
type Config struct {
17+
ListenAddr string `toml:"listen_addr" xml:"listen_addr"`
18+
ReadTimeout time.Duration `toml:"read_timeout" xml:"read_timeout"`
19+
ReadHeaderTimeout time.Duration `toml:"read_header_timeout" xml:"read_header_timeout"`
20+
WriteTimeout time.Duration `toml:"write_timeout" xml:"write_timeout"`
21+
IdleTimeout time.Duration `toml:"idle_timeout" xml:"idle_timeout"`
22+
MaxHeaderBytes int `toml:"max_header_bytes" xml:"max_header_bytes"`
23+
TLSCertPath string `toml:"tls_cert_path" xml:"tls_cert_path"`
24+
TLSKeyPath string `toml:"tls_key_path" xml:"tls_key_path"`
25+
}
26+
27+
// DefaultConfig returns the default configuration for the HTTP server.
28+
func DefaultConfig() *Config {
29+
return &Config{
30+
ListenAddr: DefaultListenAddr,
31+
ReadTimeout: DefaultReadTimeout,
32+
ReadHeaderTimeout: DefaultReadHeaderTimeout,
33+
WriteTimeout: DefaultWriteTimeout,
34+
IdleTimeout: DefaultIdleTimeout,
35+
MaxHeaderBytes: DefaultMaxHeaderBytes,
36+
}
37+
}
38+
39+
// Setup fills empty listen address and any zero timeouts or limits so logs and runtime match.
40+
func (c *Config) Setup() {
41+
if c.ListenAddr == "" {
42+
c.ListenAddr = DefaultListenAddr
43+
}
44+
45+
if c.ReadTimeout <= 0 {
46+
c.ReadTimeout = DefaultReadTimeout
47+
}
48+
49+
if c.ReadHeaderTimeout <= 0 {
50+
c.ReadHeaderTimeout = DefaultReadHeaderTimeout
51+
}
52+
53+
if c.WriteTimeout <= 0 {
54+
c.WriteTimeout = DefaultWriteTimeout
55+
}
56+
57+
if c.IdleTimeout <= 0 {
58+
c.IdleTimeout = DefaultIdleTimeout
59+
}
60+
61+
if c.MaxHeaderBytes <= 0 {
62+
c.MaxHeaderBytes = DefaultMaxHeaderBytes
63+
}
64+
}

pkg/httpserver/server.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Package httpserver owns the HTTP listener used for Prometheus metrics and future API routes.
2+
// It is separate from the UDP ingest path and from pkg/willow.
3+
package httpserver
4+
5+
import (
6+
"context"
7+
"errors"
8+
"fmt"
9+
"net/http"
10+
11+
"github.com/prometheus/client_golang/prometheus/promhttp"
12+
)
13+
14+
// Server wraps http.Server with a shared ServeMux for /metrics and optional handlers.
15+
type Server struct {
16+
Mux *http.ServeMux
17+
srv *http.Server
18+
TLSCertPath string
19+
TLSKeyPath string
20+
}
21+
22+
// New builds an HTTP server on addr. register is optional; use it to attach API or other routes
23+
// to the same mux (e.g. mux.Handle("/api/", apiHandler)). /metrics is always registered.
24+
func New(config *Config, register func(mux *http.ServeMux)) *Server {
25+
if config == nil {
26+
config = DefaultConfig()
27+
}
28+
29+
config.Setup()
30+
31+
mux := http.NewServeMux()
32+
mux.Handle("/metrics", promhttp.Handler())
33+
34+
if register != nil {
35+
register(mux)
36+
}
37+
38+
return &Server{
39+
Mux: mux,
40+
TLSCertPath: config.TLSCertPath,
41+
TLSKeyPath: config.TLSKeyPath,
42+
srv: &http.Server{
43+
Handler: mux,
44+
Addr: config.ListenAddr,
45+
ReadTimeout: config.ReadTimeout,
46+
ReadHeaderTimeout: config.ReadHeaderTimeout,
47+
WriteTimeout: config.WriteTimeout,
48+
IdleTimeout: config.IdleTimeout,
49+
MaxHeaderBytes: config.MaxHeaderBytes,
50+
},
51+
}
52+
}
53+
54+
// ListenAndServe binds and serves until Shutdown is called or the listener errors.
55+
func (s *Server) ListenAndServe() error {
56+
var err error
57+
58+
if s.TLSCertPath != "" || s.TLSKeyPath != "" {
59+
err = s.srv.ListenAndServeTLS(s.TLSCertPath, s.TLSKeyPath)
60+
} else {
61+
err = s.srv.ListenAndServe()
62+
}
63+
64+
if err != nil && !errors.Is(err, http.ErrServerClosed) {
65+
return fmt.Errorf("http listen: %w", err)
66+
}
67+
68+
return nil
69+
}
70+
71+
// Shutdown stops the server gracefully.
72+
func (s *Server) Shutdown(ctx context.Context) error {
73+
return s.srv.Shutdown(ctx) //nolint:wrapcheck
74+
}

pkg/willow/memory.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
)
88

99
// Len returns the number of file buffers in the map.
10-
// Safe to call from any goroutine.
1110
func (w *Willow) Len() int {
1211
return int(w.memLen.Load())
1312
}
@@ -85,7 +84,7 @@ func (w *Willow) washer(now time.Time, force bool) {
8584
}
8685
}
8786

88-
// trySet is the memoryHole-internal implementation of TrySet.
87+
// trySet is the internal implementation of TrySet.
8988
// Returns nil when candidate was stored, or the existing buffer when the path was already taken.
9089
func (w *Willow) trySet(candidate *buf.FileBuffer) *buf.FileBuffer {
9190
if existing, ok := w.memory[candidate.Path]; ok {

pkg/willow/willow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
// Defaults for the module.
1414
const (
15-
DefaultFileSysBuffer = 1024 * 10
15+
DefaultFileSysBuffer = 1 << 15 // 32 thousand
1616
DefaultFlushInterval = 16 * time.Second
1717
)
1818

0 commit comments

Comments
 (0)