Skip to content

Commit 6970d58

Browse files
zhuminyimwdd146980
authored andcommitted
[CONTP-849] Fix loop detection bug and add unit tests for cluster agent leader election (#43680)
### What does this PR do? - Add unit tests for cluster agent leader election and leadership transitions - Fix bug in LeaderForwarder.Forward() where loop detection didn't return after writing error ### Motivation Fixed a bug in pkg/clusteragent/api/leader_forwarder.go where loop detection (via X-DCA-Follower-Forwarded header) wrote an HTTP 508 error but continued to forward the request to the leader anyway. Added missing return statement to properly short-circuit the request. New Tests pkg/clusteragent/api/leader_forwarder_test.go - TestLeaderForwarder_SetLeaderIP - tests leader IP setting, updating, and clearing - TestLeaderForwarder_Forward_NilProxy - tests behavior when no leader is configured - TestLeaderForwarder_Forward_LoopDetection - tests that forwarding loops are properly detected and rejected - TestLeaderForwarder_Forward_WithLeader - tests successful forwarding to leader pkg/clusteragent/api/leader_handler_test.go - Enhanced fakeLeaderForwarder mock to track leader IP changes and forward calls - TestRejectOrForwardLeaderQuery_LeadershipTransition - tests behavior when leadership changes (leader → follower → leader) - TestRejectOrForwardLeaderQuery_LeaderIPChange - tests that forwarder is updated when leader IP changes during follower state ### Describe how you validated your changes ### Additional Notes Co-authored-by: minyi.zhu <minyi.zhu@datadoghq.com>
1 parent ef75a91 commit 6970d58

File tree

3 files changed

+204
-7
lines changed

3 files changed

+204
-7
lines changed

pkg/clusteragent/api/leader_forwarder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func (lf *LeaderForwarder) Forward(rw http.ResponseWriter, req *http.Request) {
8787

8888
if req.Header.Get(forwardHeader) != "" {
8989
http.Error(rw, "Query was already forwarded from: "+req.RemoteAddr, http.StatusLoopDetected)
90+
return
9091
}
9192

9293
var currentProxy *httputil.ReverseProxy
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
//go:build test
7+
8+
package api
9+
10+
import (
11+
"net"
12+
"net/http"
13+
"net/http/httptest"
14+
"testing"
15+
16+
"github.com/stretchr/testify/assert"
17+
)
18+
19+
func TestLeaderForwarder_SetLeaderIP(t *testing.T) {
20+
lf := NewLeaderForwarder(5005, 10)
21+
22+
// Initially no leader IP
23+
assert.Equal(t, "", lf.GetLeaderIP())
24+
assert.Nil(t, lf.proxy)
25+
26+
// Set leader IP
27+
lf.SetLeaderIP("1.1.1.1")
28+
assert.Equal(t, "1.1.1.1", lf.GetLeaderIP())
29+
assert.NotNil(t, lf.proxy)
30+
31+
// Update leader IP
32+
lf.SetLeaderIP("2.2.2.2")
33+
assert.Equal(t, "2.2.2.2", lf.GetLeaderIP())
34+
assert.NotNil(t, lf.proxy)
35+
36+
// Clear proxy with empty string - note: leaderIP is NOT cleared (returns early)
37+
lf.SetLeaderIP("")
38+
assert.Equal(t, "2.2.2.2", lf.GetLeaderIP()) // leaderIP unchanged
39+
assert.Nil(t, lf.proxy) // but proxy is cleared
40+
}
41+
42+
func TestLeaderForwarder_Forward_NilProxy(t *testing.T) {
43+
lf := NewLeaderForwarder(5005, 10)
44+
45+
// No leader set, proxy is nil
46+
rw := httptest.NewRecorder()
47+
req := httptest.NewRequest("GET", "http://example.com/foo", nil)
48+
49+
lf.Forward(rw, req)
50+
51+
assert.Equal(t, http.StatusServiceUnavailable, rw.Code)
52+
assert.Equal(t, "true", rw.Header().Get("X-DCA-Forwarded"))
53+
}
54+
55+
func TestLeaderForwarder_Forward_LoopDetection(t *testing.T) {
56+
// Track if leader server was called
57+
leaderCalled := false
58+
leaderServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
59+
leaderCalled = true
60+
w.WriteHeader(http.StatusOK)
61+
}))
62+
defer leaderServer.Close()
63+
64+
port := leaderServer.Listener.Addr().(*net.TCPAddr).Port
65+
lf := NewLeaderForwarder(port, 10)
66+
lf.SetLeaderIP("127.0.0.1")
67+
68+
// Request already has forward header (loop detection)
69+
rw := httptest.NewRecorder()
70+
req := httptest.NewRequest("GET", "http://example.com/foo", nil)
71+
req.Header.Set("X-DCA-Follower-Forwarded", "true")
72+
73+
lf.Forward(rw, req)
74+
75+
// Loop detection should return 508 and NOT forward to leader
76+
assert.Equal(t, http.StatusLoopDetected, rw.Code)
77+
assert.Equal(t, "true", rw.Header().Get("X-DCA-Forwarded"))
78+
assert.False(t, leaderCalled, "Request should not be forwarded to leader when loop is detected")
79+
}
80+
81+
func TestLeaderForwarder_Forward_WithLeader(t *testing.T) {
82+
// Create a test server to act as the leader
83+
leaderServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
84+
// Verify the forward header was added
85+
assert.Equal(t, "true", r.Header.Get("X-DCA-Follower-Forwarded"))
86+
w.WriteHeader(http.StatusOK)
87+
w.Write([]byte("leader response"))
88+
}))
89+
defer leaderServer.Close()
90+
91+
// Extract port from test server
92+
port := leaderServer.Listener.Addr().(*net.TCPAddr).Port
93+
lf := NewLeaderForwarder(port, 10)
94+
lf.SetLeaderIP("127.0.0.1")
95+
96+
rw := httptest.NewRecorder()
97+
req := httptest.NewRequest("GET", "http://example.com/foo", nil)
98+
99+
lf.Forward(rw, req)
100+
101+
assert.Equal(t, http.StatusOK, rw.Code)
102+
assert.Equal(t, "true", rw.Header().Get("X-DCA-Forwarded"))
103+
assert.Equal(t, "leader response", rw.Body.String())
104+
}

pkg/clusteragent/api/leader_handler_test.go

Lines changed: 99 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,25 @@ func (m *mockLeaderEngine) GetLeaderIP() (string, error) {
2929
return m.leaderIP, nil
3030
}
3131

32-
// fakeLeaderForwarder is a fake implementation of the forwarder for testing purposes
33-
type fakeLeaderForwarder struct{}
32+
// fakeLeaderForwarder is a fake implementation of the forwarder for testing purposes.
33+
// It tracks leader IP changes and forward calls for verifying leadership transition behavior.
34+
type fakeLeaderForwarder struct {
35+
currentLeaderIP string
36+
leaderIPChangeCount int
37+
forwardCallCount int
38+
}
3439

35-
// SetLeaderIP does nothing
36-
func (f *fakeLeaderForwarder) SetLeaderIP(_ string) {}
40+
func (f *fakeLeaderForwarder) SetLeaderIP(ip string) {
41+
f.currentLeaderIP = ip
42+
f.leaderIPChangeCount++
43+
}
3744

38-
// GetLeaderIP does nothing
3945
func (f *fakeLeaderForwarder) GetLeaderIP() string {
40-
return ""
46+
return f.currentLeaderIP
4147
}
4248

43-
// Forward returns ok
4449
func (f *fakeLeaderForwarder) Forward(w http.ResponseWriter, _ *http.Request) {
50+
f.forwardCallCount++
4551
w.WriteHeader(http.StatusOK)
4652
}
4753

@@ -92,3 +98,89 @@ func TestRejectOrForwardLeaderQuery_AsLeader(t *testing.T) {
9298

9399
assert.False(t, lph.rejectOrForwardLeaderQuery(rw, req))
94100
}
101+
102+
// TestRejectOrForwardLeaderQuery_LeadershipTransition tests the behavior when
103+
// leadership changes between requests (leader to follower and back).
104+
func TestRejectOrForwardLeaderQuery_LeadershipTransition(t *testing.T) {
105+
mockEngine := &mockLeaderEngine{
106+
isLeader: true,
107+
leaderIP: "1.1.1.1",
108+
}
109+
forwarder := &fakeLeaderForwarder{}
110+
111+
lph := &LeaderProxyHandler{
112+
leaderElectionEnabled: true,
113+
le: mockEngine,
114+
leaderForwarder: forwarder,
115+
}
116+
117+
// First request: we are the leader, should handle locally
118+
rw1 := httptest.NewRecorder()
119+
req1 := httptest.NewRequest("GET", "http://example.com/foo", nil)
120+
assert.False(t, lph.rejectOrForwardLeaderQuery(rw1, req1), "Should handle locally as leader")
121+
assert.Equal(t, 0, forwarder.forwardCallCount, "Should not forward when leader")
122+
123+
// Simulate leadership loss
124+
mockEngine.isLeader = false
125+
mockEngine.leaderIP = "2.2.2.2"
126+
127+
// Second request: we lost leadership, should forward to new leader
128+
rw2 := httptest.NewRecorder()
129+
req2 := httptest.NewRequest("GET", "http://example.com/foo", nil)
130+
assert.True(t, lph.rejectOrForwardLeaderQuery(rw2, req2), "Should forward as follower")
131+
assert.Equal(t, 1, forwarder.forwardCallCount, "Should forward once")
132+
assert.Equal(t, "2.2.2.2", forwarder.currentLeaderIP, "Should update to new leader IP")
133+
134+
// Simulate regaining leadership
135+
mockEngine.isLeader = true
136+
137+
// Third request: we became the leader again, should handle locally
138+
rw3 := httptest.NewRecorder()
139+
req3 := httptest.NewRequest("GET", "http://example.com/foo", nil)
140+
assert.False(t, lph.rejectOrForwardLeaderQuery(rw3, req3), "Should handle locally as new leader")
141+
assert.Equal(t, 1, forwarder.forwardCallCount, "Should not forward additional requests")
142+
}
143+
144+
// TestRejectOrForwardLeaderQuery_LeaderIPChange tests that the forwarder is updated
145+
// when the leader IP changes while we remain a follower.
146+
func TestRejectOrForwardLeaderQuery_LeaderIPChange(t *testing.T) {
147+
mockEngine := &mockLeaderEngine{
148+
isLeader: false,
149+
leaderIP: "1.1.1.1",
150+
}
151+
forwarder := &fakeLeaderForwarder{
152+
currentLeaderIP: "1.1.1.1", // Already knows old leader
153+
}
154+
155+
lph := &LeaderProxyHandler{
156+
leaderElectionEnabled: true,
157+
le: mockEngine,
158+
leaderForwarder: forwarder,
159+
}
160+
161+
// First request: forward to current leader
162+
rw1 := httptest.NewRecorder()
163+
req1 := httptest.NewRequest("GET", "http://example.com/foo", nil)
164+
assert.True(t, lph.rejectOrForwardLeaderQuery(rw1, req1))
165+
assert.Equal(t, 1, forwarder.forwardCallCount)
166+
// IP didn't change, so SetLeaderIP should not have been called
167+
assert.Equal(t, 0, forwarder.leaderIPChangeCount, "Should not update IP when unchanged")
168+
169+
// Simulate leader failover - new leader elected
170+
mockEngine.leaderIP = "2.2.2.2"
171+
172+
// Second request: should detect IP change and update forwarder
173+
rw2 := httptest.NewRecorder()
174+
req2 := httptest.NewRequest("GET", "http://example.com/foo", nil)
175+
assert.True(t, lph.rejectOrForwardLeaderQuery(rw2, req2))
176+
assert.Equal(t, 2, forwarder.forwardCallCount)
177+
assert.Equal(t, 1, forwarder.leaderIPChangeCount, "Should update IP once")
178+
assert.Equal(t, "2.2.2.2", forwarder.currentLeaderIP, "Should have new leader IP")
179+
180+
// Third request: IP hasn't changed again
181+
rw3 := httptest.NewRecorder()
182+
req3 := httptest.NewRequest("GET", "http://example.com/foo", nil)
183+
assert.True(t, lph.rejectOrForwardLeaderQuery(rw3, req3))
184+
assert.Equal(t, 3, forwarder.forwardCallCount)
185+
assert.Equal(t, 1, forwarder.leaderIPChangeCount, "Should not update IP when unchanged")
186+
}

0 commit comments

Comments
 (0)