Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions operator/pkg/k3d/k3d.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import (
"embed"
"fmt"
"io/fs"
"net"
"os"
"os/exec"
"slices"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/cockroachdb/errors"
"github.com/redpanda-data/helm-charts/pkg/kube"
"golang.org/x/sys/unix"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -128,10 +132,16 @@ func NewCluster(name string, opts ...ClusterOpt) (*Cluster, error) {
opt.apply(config)
}

port, err := reserveEphemeralPort("tcp4")
if err != nil {
return nil, err
}

args := []string{
"cluster",
"create",
name,
fmt.Sprintf("--api-port=127.0.0.1:%d", port),
fmt.Sprintf("--agents=%d", config.agents),
fmt.Sprintf("--timeout=%s", config.timeout),
fmt.Sprintf("--image=%s", config.image),
Expand Down Expand Up @@ -332,3 +342,61 @@ var startupManifests = sync.OnceValue(func() []client.Object {

return objs
})

// reserveEphemeralPort attempts to "reserve" an ephemeral OS port by putting
// it into a TIME_WAIT state which prevents the OS re-allocating it when
// binding to port 0.
// Big thanks to:
// - https://github.com/libp2p/go-reuseport
// - https://github.com/Yelp/ephemeral-port-reserve
func reserveEphemeralPort(network string) (int, error) {
lc := net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var optErr error
ctrlErr := c.Control(func(fd uintptr) {
// Setting SO_REUSEADDR allows this port to be rebound after we finish using it.
// It works roughly the same way on macos and linux.
optErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
})
return errors.Join(ctrlErr, optErr)
},
}

// Listen to port 0 to get an OS allocated port.
lis, err := lc.Listen(context.Background(), network, "127.0.0.1:0")
if err != nil {
return 0, errors.WithStack(err)
}

defer lis.Close()

// According to ephemeral-port-reserve, we need to accept a connection to
// actually put this port into the TIME_WAIT state.
errCh := make(chan error, 1)
go func() {
conn, err := net.Dial(lis.Addr().Network(), lis.Addr().String())
if err != nil {
errCh <- err
return
}
errCh <- conn.Close()
}()

conn, err := lis.Accept()
if err != nil {
return 0, err
}

defer conn.Close()

if err := <-errCh; err != nil {
return 0, err
}

_, port, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
return 0, err
}

return strconv.Atoi(port)
}
44 changes: 44 additions & 0 deletions operator/pkg/k3d/k3d_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"errors"
"fmt"
"sync"
"net"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -57,3 +59,45 @@ func TestMultiInstance(t *testing.T) {

assert.NoError(t, errors.Join(errs...))
}

func TestReservePort(t *testing.T) {
const network = "tcp4"
const iterations = 100

reserved := map[int]struct{}{}
for i := 0; i < iterations; i++ {
port, err := reserveEphemeralPort("tcp4")
assert.NoError(t, err)
assert.NotZero(t, port)

// Assert that we can listen on the provided port.
lis, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", port))
assert.NoError(t, err)
assert.NoError(t, lis.Close())

reserved[port] = struct{}{}
}

// Not the best test as failures are exceptionally unlikely to be
// reproducible.
// Bind a bunch of ephemeral ports and assert that we don't get allocated
// any of the ports we've reserved.
for i := 0; i < iterations; i++ {
lis, err := net.Listen(network, "127.0.0.1:0")
assert.NoError(t, err)

// Defer closing of this listener to ensure we always get new ports
// from listening to 0.
defer lis.Close()

_, portStr, err := net.SplitHostPort(lis.Addr().String())
assert.NoError(t, err)

port, err := strconv.Atoi(portStr)
assert.NoError(t, err)

t.Logf("%d", port)

assert.NotContains(t, reserved, port)
}
}