Skip to content

Commit c639d66

Browse files
committed
implement ephemeral port reservation for k3d
Prior to this commit we discovered that CI can flake when standing up multiple k3d clusters due to port conflicts. This commit attempts to implement "port reservation" such that pkg/k3d can always create k3d clusters without conflicts without coordination across different processes. It's unclear if this will actually work as engineering an intentional failure is not feasible.
1 parent a1865c2 commit c639d66

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed

operator/pkg/k3d/k3d.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@ import (
1919
"embed"
2020
"fmt"
2121
"io/fs"
22+
"net"
2223
"os"
2324
"os/exec"
2425
"slices"
26+
"strconv"
2527
"strings"
2628
"sync"
29+
"syscall"
2730
"time"
2831

2932
"github.com/cockroachdb/errors"
3033
"github.com/redpanda-data/helm-charts/pkg/kube"
34+
"golang.org/x/sys/unix"
3135
batchv1 "k8s.io/api/batch/v1"
3236
corev1 "k8s.io/api/core/v1"
3337
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -76,10 +80,16 @@ func NewCluster(name string) (*Cluster, error) {
7680
image = override
7781
}
7882

83+
port, err := reserveEphemeralPort("tcp4")
84+
if err != nil {
85+
return nil, err
86+
}
87+
7988
args := []string{
8089
"cluster",
8190
"create",
8291
name,
92+
fmt.Sprintf("--api-port=127.0.0.1:%d", port),
8393
fmt.Sprintf("--agents=%d", 3),
8494
fmt.Sprintf("--timeout=%s", 30*time.Second),
8595
fmt.Sprintf("--image=%s", image),
@@ -268,3 +278,61 @@ var startupManifests = sync.OnceValue(func() []client.Object {
268278

269279
return objs
270280
})
281+
282+
// reserveEphemeralPort attempts to "reserve" an ephemeral OS port by putting
283+
// it into a TIME_WAIT state which prevents the OS re-allocating it when
284+
// binding to port 0.
285+
// Big thanks to:
286+
// - https://github.com/libp2p/go-reuseport
287+
// - https://github.com/Yelp/ephemeral-port-reserve
288+
func reserveEphemeralPort(network string) (int, error) {
289+
lc := net.ListenConfig{
290+
Control: func(network, address string, c syscall.RawConn) error {
291+
var optErr error
292+
ctrlErr := c.Control(func(fd uintptr) {
293+
// Setting SO_REUSEADDR allows this port to be rebound after we finish using it.
294+
// It works roughly the same way on macos and linux.
295+
optErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
296+
})
297+
return errors.Join(ctrlErr, optErr)
298+
},
299+
}
300+
301+
// Listen to port 0 to get an OS allocated port.
302+
lis, err := lc.Listen(context.Background(), network, "127.0.0.1:0")
303+
if err != nil {
304+
return 0, errors.WithStack(err)
305+
}
306+
307+
defer lis.Close()
308+
309+
// According to ephemeral-port-reserve, we need to accept a connection to
310+
// actually put this port into the TIME_WAIT state.
311+
errCh := make(chan error, 1)
312+
go func() {
313+
conn, err := net.Dial(lis.Addr().Network(), lis.Addr().String())
314+
if err != nil {
315+
errCh <- err
316+
return
317+
}
318+
errCh <- conn.Close()
319+
}()
320+
321+
conn, err := lis.Accept()
322+
if err != nil {
323+
return 0, err
324+
}
325+
326+
defer conn.Close()
327+
328+
if err := <-errCh; err != nil {
329+
return 0, err
330+
}
331+
332+
_, port, err := net.SplitHostPort(lis.Addr().String())
333+
if err != nil {
334+
return 0, err
335+
}
336+
337+
return strconv.Atoi(port)
338+
}

operator/pkg/k3d/k3d_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package k3d
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"strconv"
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestReservePort(t *testing.T) {
13+
const network = "tcp4"
14+
const iterations = 100
15+
16+
reserved := map[int]struct{}{}
17+
for i := 0; i < iterations; i++ {
18+
port, err := reserveEphemeralPort("tcp4")
19+
assert.NoError(t, err)
20+
assert.NotZero(t, port)
21+
22+
// Assert that we can listen on the provided port.
23+
lis, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", port))
24+
assert.NoError(t, err)
25+
assert.NoError(t, lis.Close())
26+
27+
reserved[port] = struct{}{}
28+
}
29+
30+
// Not the best test as failures are exceptionally unlikely to be
31+
// reproducible.
32+
// Bind a bunch of ephemeral ports and assert that we don't get allocated
33+
// any of the ports we've reserved.
34+
for i := 0; i < iterations; i++ {
35+
lis, err := net.Listen(network, "127.0.0.1:0")
36+
assert.NoError(t, err)
37+
38+
// Defer closing of this listener to ensure we always get new ports
39+
// from listening to 0.
40+
defer lis.Close()
41+
42+
_, portStr, err := net.SplitHostPort(lis.Addr().String())
43+
assert.NoError(t, err)
44+
45+
port, err := strconv.Atoi(portStr)
46+
assert.NoError(t, err)
47+
48+
t.Logf("%d", port)
49+
50+
assert.NotContains(t, reserved, port)
51+
}
52+
}

0 commit comments

Comments
 (0)