|
4 | 4 | package itest |
5 | 5 |
|
6 | 6 | import ( |
| 7 | + "context" |
| 8 | + "fmt" |
| 9 | + "io" |
| 10 | + "net" |
| 11 | + "sync" |
7 | 12 | "testing" |
8 | 13 | "time" |
9 | 14 |
|
@@ -137,3 +142,200 @@ func testEtcdFailoverCase(ht *lntest.HarnessTest, kill bool) { |
137 | 142 | // process. |
138 | 143 | ht.Shutdown(carol2) |
139 | 144 | } |
| 145 | + |
| 146 | +// Proxy is a simple TCP proxy that forwards all traffic between a local and a |
| 147 | +// remote address. We use it to simulate a network partition in the leader |
| 148 | +// health check test. |
| 149 | +type Proxy struct { |
| 150 | + listenAddr string |
| 151 | + targetAddr string |
| 152 | + cancel context.CancelFunc |
| 153 | + wg sync.WaitGroup |
| 154 | + stopped chan struct{} |
| 155 | +} |
| 156 | + |
| 157 | +// NewProxy creates a new Proxy instance with a provided context. |
| 158 | +func NewProxy(listenAddr, targetAddr string) *Proxy { |
| 159 | + return &Proxy{ |
| 160 | + listenAddr: listenAddr, |
| 161 | + targetAddr: targetAddr, |
| 162 | + stopped: make(chan struct{}), |
| 163 | + } |
| 164 | +} |
| 165 | + |
| 166 | +// Start starts the proxy. It listens on the listen address and forwards all |
| 167 | +// traffic to the target address. |
| 168 | +func (p *Proxy) Start(ctx context.Context, t *testing.T) { |
| 169 | + listener, err := net.Listen("tcp", p.listenAddr) |
| 170 | + require.NoError(t, err, "Failed to listen on %s", p.listenAddr) |
| 171 | + t.Logf("Proxy is listening on %s", p.listenAddr) |
| 172 | + |
| 173 | + proxyCtx, cancel := context.WithCancel(ctx) |
| 174 | + p.cancel = cancel |
| 175 | + |
| 176 | + p.wg.Add(1) |
| 177 | + go func() { |
| 178 | + defer func() { |
| 179 | + close(p.stopped) |
| 180 | + p.wg.Done() |
| 181 | + }() |
| 182 | + |
| 183 | + for { |
| 184 | + select { |
| 185 | + case <-proxyCtx.Done(): |
| 186 | + listener.Close() |
| 187 | + return |
| 188 | + default: |
| 189 | + } |
| 190 | + |
| 191 | + conn, err := listener.Accept() |
| 192 | + if err != nil { |
| 193 | + if proxyCtx.Err() != nil { |
| 194 | + // Context is done, exit the loop |
| 195 | + return |
| 196 | + } |
| 197 | + t.Logf("Proxy failed to accept connection: %v", |
| 198 | + err) |
| 199 | + |
| 200 | + continue |
| 201 | + } |
| 202 | + |
| 203 | + p.wg.Add(1) |
| 204 | + go p.handleConnection(proxyCtx, t, conn) |
| 205 | + } |
| 206 | + }() |
| 207 | +} |
| 208 | + |
| 209 | +// handleConnection handles an accepted connection and forwards all traffic |
| 210 | +// between the listener and target. |
| 211 | +func (p *Proxy) handleConnection(ctx context.Context, t *testing.T, |
| 212 | + conn net.Conn) { |
| 213 | + |
| 214 | + targetConn, err := net.Dial("tcp", p.targetAddr) |
| 215 | + require.NoError(t, err, "Failed to connect to target %s", p.targetAddr) |
| 216 | + |
| 217 | + defer func() { |
| 218 | + conn.Close() |
| 219 | + targetConn.Close() |
| 220 | + p.wg.Done() |
| 221 | + }() |
| 222 | + |
| 223 | + done := make(chan struct{}) |
| 224 | + |
| 225 | + p.wg.Add(2) |
| 226 | + go func() { |
| 227 | + defer p.wg.Done() |
| 228 | + // Ignore the copy error due to the connection being closed. |
| 229 | + _, _ = io.Copy(targetConn, conn) |
| 230 | + }() |
| 231 | + |
| 232 | + go func() { |
| 233 | + defer p.wg.Done() |
| 234 | + // Ignore the copy error due to the connection being closed. |
| 235 | + _, _ = io.Copy(conn, targetConn) |
| 236 | + close(done) |
| 237 | + }() |
| 238 | + |
| 239 | + select { |
| 240 | + case <-ctx.Done(): |
| 241 | + case <-done: |
| 242 | + } |
| 243 | +} |
| 244 | + |
| 245 | +// Stop stops the proxy and waits for all connections to be closed and all |
| 246 | +// goroutines to be stopped. |
| 247 | +func (p *Proxy) Stop(t *testing.T) { |
| 248 | + require.NotNil(t, p.cancel, "Proxy is not started") |
| 249 | + |
| 250 | + p.cancel() |
| 251 | + p.wg.Wait() |
| 252 | + <-p.stopped |
| 253 | + |
| 254 | + t.Log("Proxy stopped", time.Now()) |
| 255 | +} |
| 256 | + |
| 257 | +// testLeaderHealthCheck tests that a node is properly shut down when the leader |
| 258 | +// health check fails. |
| 259 | +func testLeaderHealthCheck(ht *lntest.HarnessTest) { |
| 260 | + clientPort := port.NextAvailablePort() |
| 261 | + |
| 262 | + // Let's start a test etcd instance that we'll redirect through a proxy. |
| 263 | + etcdCfg, cleanup, err := kvdb.StartEtcdTestBackend( |
| 264 | + ht.T.TempDir(), uint16(clientPort), |
| 265 | + uint16(port.NextAvailablePort()), "", |
| 266 | + ) |
| 267 | + require.NoError(ht, err, "Failed to start etcd instance") |
| 268 | + |
| 269 | + // Make leader election session TTL 5 sec to make the test run fast. |
| 270 | + const leaderSessionTTL = 5 |
| 271 | + |
| 272 | + // Create an election observer that we will use to monitor the leader |
| 273 | + // election. |
| 274 | + observer, err := cluster.MakeLeaderElector( |
| 275 | + ht.Context(), cluster.EtcdLeaderElector, "observer", |
| 276 | + lncfg.DefaultEtcdElectionPrefix, leaderSessionTTL, etcdCfg, |
| 277 | + ) |
| 278 | + require.NoError(ht, err, "Cannot start election observer") |
| 279 | + |
| 280 | + // Start a proxy that will forward all traffic to the etcd instance. |
| 281 | + clientAddr := fmt.Sprintf("localhost:%d", clientPort) |
| 282 | + proxyAddr := fmt.Sprintf("localhost:%d", port.NextAvailablePort()) |
| 283 | + |
| 284 | + ctx, cancel := context.WithCancel(ht.Context()) |
| 285 | + defer cancel() |
| 286 | + |
| 287 | + proxy := NewProxy(proxyAddr, clientAddr) |
| 288 | + proxy.Start(ctx, ht.T) |
| 289 | + |
| 290 | + // Copy the etcd config so that we can modify the host to point to the |
| 291 | + // proxy. |
| 292 | + proxyEtcdCfg := *etcdCfg |
| 293 | + // With the proxy in place, we can now configure the etcd client to |
| 294 | + // connect to the proxy instead of the etcd instance. |
| 295 | + proxyEtcdCfg.Host = "http://" + proxyAddr |
| 296 | + |
| 297 | + defer cleanup() |
| 298 | + |
| 299 | + // Start Carol-1 with cluster support and connect to etcd through the |
| 300 | + // proxy. |
| 301 | + password := []byte("the quick brown fox jumps the lazy dog") |
| 302 | + stateless := false |
| 303 | + cluster := true |
| 304 | + |
| 305 | + carol, _, _ := ht.NewNodeWithSeedEtcd( |
| 306 | + "Carol-1", &proxyEtcdCfg, password, stateless, cluster, |
| 307 | + leaderSessionTTL, |
| 308 | + ) |
| 309 | + |
| 310 | + // Make sure Carol-1 is indeed the leader. |
| 311 | + assertLeader(ht, observer, "Carol-1") |
| 312 | + |
| 313 | + // At this point Carol-1 is the elected leader, while Carol-2 will wait |
| 314 | + // to become the leader when Carol-1 releases the lease. Note that for |
| 315 | + // Carol-2 we don't use the proxy as we want to simulate a network |
| 316 | + // partition only for Carol-1. |
| 317 | + carol2 := ht.NewNodeEtcd( |
| 318 | + "Carol-2", etcdCfg, password, cluster, leaderSessionTTL, |
| 319 | + ) |
| 320 | + |
| 321 | + // Stop the proxy so that we simulate a network partition which |
| 322 | + // consequently will make the leader health check fail and force Carol |
| 323 | + // to shut down. |
| 324 | + proxy.Stop(ht.T) |
| 325 | + |
| 326 | + // Wait for Carol-1 to stop. If the health check wouldn't properly work |
| 327 | + // this call would timeout and trigger a test failure. |
| 328 | + require.NoError(ht.T, carol.WaitForProcessExit()) |
| 329 | + |
| 330 | + // Now that Carol-1 is shut down we should fail over to Carol-2. |
| 331 | + failoverTimeout := time.Duration(2*leaderSessionTTL) * time.Second |
| 332 | + |
| 333 | + // Make sure that Carol-2 becomes the leader (reported by Carol-2). |
| 334 | + err = carol2.WaitUntilLeader(failoverTimeout) |
| 335 | + |
| 336 | + require.NoError(ht, err, "Waiting for Carol-2 to become the leader "+ |
| 337 | + "failed") |
| 338 | + |
| 339 | + // Make sure Carol-2 is indeed the leader (repoted by the observer). |
| 340 | + assertLeader(ht, observer, "Carol-2") |
| 341 | +} |
0 commit comments