Skip to content

Commit 930a37e

Browse files
committed
Add utility functions for HTTP header management and client IP extraction; implement metrics tracking in LoadBalancer
1 parent ed5e834 commit 930a37e

File tree

8 files changed

+540
-68
lines changed

8 files changed

+540
-68
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
.env
1+
.env
2+
*.exe

balancer/balancer.go

Lines changed: 106 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,102 @@
11
package balancer
22

3-
import "fmt"
4-
53
import (
4+
"errors"
5+
"fmt"
6+
"io"
67
"log"
78
"net/http"
89
"sync"
10+
"sync/atomic"
911
"time"
1012

1113
sv "loadbalancer/server"
1214
"loadbalancer/utils"
1315
)
1416

1517
type LoadBalancer struct {
16-
Servers []*sv.Server
17-
mu sync.Mutex
18-
Logger *log.Logger
19-
wg sync.WaitGroup
20-
shutdown bool
18+
Servers []*sv.Server
19+
mu sync.RWMutex
20+
Logger *log.Logger
21+
wg sync.WaitGroup
22+
shutdown bool
23+
metrics *Metrics
24+
maxRetries int
25+
}
26+
27+
type Metrics struct {
28+
TotalRequests uint64
29+
FailedRequests uint64
30+
ActiveConnections int64
31+
}
32+
33+
func NewLoadBalancer(logger *log.Logger) *LoadBalancer {
34+
if logger == nil {
35+
logger = log.New(io.Discard, "", log.LstdFlags)
36+
}
37+
return &LoadBalancer{
38+
Logger: logger,
39+
metrics: &Metrics{},
40+
maxRetries: 3,
41+
}
2142
}
2243

23-
func (lb *LoadBalancer) AddServer(url string) {
44+
func (lb *LoadBalancer) AddServer(url string) error {
45+
lb.mu.Lock()
46+
defer lb.mu.Unlock()
47+
48+
// Validate server URL
49+
if url == "" {
50+
return errors.New("server URL cannot be empty")
51+
}
52+
53+
// Check for duplicate servers
54+
for _, server := range lb.Servers {
55+
if server.URL == url {
56+
return fmt.Errorf("server %s already exists", url)
57+
}
58+
}
59+
2460
server := sv.NewServer(url, lb.Logger)
2561
lb.Servers = append(lb.Servers, server)
26-
//lb.Logger.Printf("Added server %s to the load balancer", url)
2762
lb.Logger.Println(utils.Colorize("Added server "+url+" to the load balancer", utils.GREEN))
63+
return nil
2864
}
2965

30-
func (lb *LoadBalancer) GetLeastLoadedServer() *sv.Server {
66+
func (lb *LoadBalancer) RemoveServer(url string) error {
3167
lb.mu.Lock()
3268
defer lb.mu.Unlock()
69+
70+
for i, server := range lb.Servers {
71+
if server.URL == url {
72+
// Wait for active connections to finish
73+
for server.Load > 0 {
74+
lb.mu.Unlock()
75+
time.Sleep(100 * time.Millisecond)
76+
lb.mu.Lock()
77+
}
78+
79+
// Remove server
80+
lb.Servers = append(lb.Servers[:i], lb.Servers[i+1:]...)
81+
lb.Logger.Println(utils.Colorize("Removed server "+url, utils.YELLOW))
82+
return nil
83+
}
84+
}
85+
86+
return fmt.Errorf("server %s not found", url)
87+
}
88+
89+
func (lb *LoadBalancer) GetMetrics() *Metrics {
90+
return &Metrics{
91+
TotalRequests: atomic.LoadUint64(&lb.metrics.TotalRequests),
92+
FailedRequests: atomic.LoadUint64(&lb.metrics.FailedRequests),
93+
ActiveConnections: atomic.LoadInt64(&lb.metrics.ActiveConnections),
94+
}
95+
}
96+
97+
func (lb *LoadBalancer) GetLeastLoadedServer() *sv.Server {
98+
lb.mu.RLock()
99+
defer lb.mu.RUnlock()
33100

34101
var leastLoadedServer *sv.Server
35102
for _, server := range lb.Servers {
@@ -50,13 +117,36 @@ func (lb *LoadBalancer) GetLeastLoadedServer() *sv.Server {
50117
}
51118

52119
func (lb *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
53-
server := lb.GetLeastLoadedServer()
54-
if server != nil {
55-
server.HandleRequest(w, r)
56-
} else {
57-
http.Error(w, "No servers available", http.StatusServiceUnavailable)
58-
lb.Logger.Println(utils.Colorize("No servers available to handle the request", utils.RED))
120+
// Increment metrics
121+
atomic.AddUint64(&lb.metrics.TotalRequests, 1)
122+
atomic.AddInt64(&lb.metrics.ActiveConnections, 1)
123+
defer atomic.AddInt64(&lb.metrics.ActiveConnections, -1)
124+
125+
// Check if we're shutting down
126+
if lb.shutdown {
127+
http.Error(w, "Service is shutting down", http.StatusServiceUnavailable)
128+
return
59129
}
130+
131+
// Try multiple servers if needed
132+
var err error
133+
for retry := 0; retry < lb.maxRetries; retry++ {
134+
server := lb.GetLeastLoadedServer()
135+
if server == nil {
136+
continue
137+
}
138+
139+
err = server.HandleRequest(w, r)
140+
if err == nil {
141+
return
142+
}
143+
144+
lb.Logger.Printf("Request failed on server %s, attempt %d: %v", server.URL, retry+1, err)
145+
}
146+
147+
// All retries failed
148+
atomic.AddUint64(&lb.metrics.FailedRequests, 1)
149+
http.Error(w, "All servers failed to process the request", http.StatusServiceUnavailable)
60150
}
61151

62152
func (lb *LoadBalancer) StartHealthChecks(interval time.Duration) {

main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"log"
67
"net/http"
@@ -25,9 +26,7 @@ func main() {
2526
}
2627

2728
// Create load balancer
28-
lb := &balancer.LoadBalancer{
29-
Logger: logger,
30-
}
29+
lb := balancer.NewLoadBalancer(logger)
3130

3231
// Add servers from configuration
3332
for _, url := range config.Servers.URLs {
@@ -38,6 +37,12 @@ func main() {
3837
healthCheckInterval := time.Duration(config.LoadBalancer.HealthCheckIntervalSeconds) * time.Second
3938
lb.StartHealthChecks(healthCheckInterval)
4039

40+
// Add metrics endpoint
41+
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
42+
metrics := lb.GetMetrics()
43+
json.NewEncoder(w).Encode(metrics)
44+
})
45+
4146
// Setup graceful shutdown
4247
stop := make(chan os.Signal, 1)
4348
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)

main_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"loadbalancer/balancer"
8+
"loadbalancer/config"
9+
"net/http"
10+
"net/http/httptest"
11+
"os"
12+
"testing"
13+
"time"
14+
)
15+
16+
// MockServer represents a test HTTP server
17+
type MockServer struct {
18+
server *httptest.Server
19+
URL string
20+
}
21+
22+
// setupMockServers creates test backend servers
23+
func setupMockServers(count int) []MockServer {
24+
var servers []MockServer
25+
for i := 0; i < count; i++ {
26+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
27+
w.WriteHeader(http.StatusOK)
28+
fmt.Fprintf(w, "Mock Server %d", i+1)
29+
})
30+
server := httptest.NewServer(handler)
31+
servers = append(servers, MockServer{
32+
server: server,
33+
URL: server.URL,
34+
})
35+
}
36+
return servers
37+
}
38+
39+
// createTestConfig creates a temporary config file
40+
func createTestConfig(t *testing.T, servers []MockServer) string {
41+
urls := make([]string, len(servers))
42+
for i, s := range servers {
43+
urls[i] = s.URL
44+
}
45+
46+
configData := config.Config{
47+
LoadBalancer: struct {
48+
Port int "json:\"port\""
49+
HealthCheckIntervalSeconds int "json:\"health_check_interval_seconds\""
50+
}{
51+
Port: 8080,
52+
HealthCheckIntervalSeconds: 5,
53+
},
54+
Servers: struct {
55+
URLs []string "json:\"urls\""
56+
}{
57+
URLs: urls,
58+
},
59+
}
60+
61+
file, err := os.CreateTemp("", "test_config_*.json")
62+
if err != nil {
63+
t.Fatalf("Failed to create temp config file: %v", err)
64+
}
65+
66+
encoder := json.NewEncoder(file)
67+
if err := encoder.Encode(configData); err != nil {
68+
t.Fatalf("Failed to write config data: %v", err)
69+
}
70+
71+
return file.Name()
72+
}
73+
74+
func TestLoadBalancerInitialization(t *testing.T) {
75+
// Setup mock servers
76+
mockServers := setupMockServers(2)
77+
defer func() {
78+
for _, s := range mockServers {
79+
s.server.Close()
80+
}
81+
}()
82+
83+
// Create test config
84+
configFile := createTestConfig(t, mockServers)
85+
defer os.Remove(configFile)
86+
87+
// Test configuration loading
88+
cfg, err := config.LoadConfig()
89+
if err != nil {
90+
t.Fatalf("Failed to load config: %v", err)
91+
}
92+
93+
if len(cfg.Servers.URLs) != 2 {
94+
t.Errorf("Expected 2 servers in config, got %d", len(cfg.Servers.URLs))
95+
}
96+
97+
// Test load balancer creation
98+
lb := balancer.NewLoadBalancer(nil)
99+
if lb == nil {
100+
t.Fatal("Failed to create load balancer")
101+
}
102+
}
103+
104+
func TestMetricsEndpoint(t *testing.T) {
105+
// Create a test server with the metrics endpoint
106+
lb := balancer.NewLoadBalancer(nil)
107+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
108+
if r.URL.Path == "/metrics" {
109+
metrics := lb.GetMetrics()
110+
json.NewEncoder(w).Encode(metrics)
111+
}
112+
}))
113+
defer server.Close()
114+
115+
// Test metrics endpoint
116+
resp, err := http.Get(server.URL + "/metrics")
117+
if err != nil {
118+
t.Fatalf("Failed to get metrics: %v", err)
119+
}
120+
defer resp.Body.Close()
121+
122+
if resp.StatusCode != http.StatusOK {
123+
t.Errorf("Expected status OK, got %v", resp.Status)
124+
}
125+
126+
var metrics balancer.Metrics
127+
if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil {
128+
t.Fatalf("Failed to decode metrics: %v", err)
129+
}
130+
}
131+
132+
func TestGracefulShutdown(t *testing.T) {
133+
lb := balancer.NewLoadBalancer(nil)
134+
server := httptest.NewServer(lb)
135+
defer server.Close()
136+
137+
// Create a channel to signal when shutdown is complete
138+
done := make(chan bool)
139+
140+
// Simulate SIGTERM
141+
p, err := os.FindProcess(os.Getpid())
142+
if err != nil {
143+
t.Fatalf("Failed to find process: %v", err)
144+
}
145+
146+
go func() {
147+
p.Signal(os.Interrupt)
148+
done <- true
149+
}()
150+
151+
// Wait for shutdown with timeout
152+
select {
153+
case <-done:
154+
// Shutdown completed successfully
155+
case <-time.After(5 * time.Second):
156+
t.Fatal("Shutdown timeout")
157+
}
158+
}
159+
160+
func TestLoadBalancerRequestHandling(t *testing.T) {
161+
// Setup mock backend servers
162+
mockServers := setupMockServers(2)
163+
defer func() {
164+
for _, s := range mockServers {
165+
s.server.Close()
166+
}
167+
}()
168+
169+
// Create load balancer
170+
lb := balancer.NewLoadBalancer(nil)
171+
for _, s := range mockServers {
172+
if err := lb.AddServer(s.URL); err != nil {
173+
t.Fatalf("Failed to add server: %v", err)
174+
}
175+
}
176+
177+
// Create test server with the load balancer
178+
server := httptest.NewServer(lb)
179+
defer server.Close()
180+
181+
// Test request handling
182+
resp, err := http.Get(server.URL)
183+
if err != nil {
184+
t.Fatalf("Failed to send request: %v", err)
185+
}
186+
defer resp.Body.Close()
187+
188+
if resp.StatusCode != http.StatusOK {
189+
t.Errorf("Expected status OK, got %v", resp.Status)
190+
}
191+
192+
body, err := io.ReadAll(resp.Body)
193+
if err != nil {
194+
t.Fatalf("Failed to read response: %v", err)
195+
}
196+
197+
if len(body) == 0 {
198+
t.Error("Expected non-empty response body")
199+
}
200+
}

0 commit comments

Comments
 (0)