Skip to content

Commit d11c84b

Browse files
author
Lars Grahmann
committed
Honor Kafka admin connector timeouts with full coverage
- Update pkg/admin/brokerclient.go to expose a requestTimeout helper that returns the configured connector timeout when set (defaulting to defaultRequestTimeout), and use it for alter-reassignment and leader-election requests so user overrides take effect. - Teach pkg/admin/connector.go to propagate ConnTimeout to whichever dialer path we take, while keeping the 10s default when nothing is supplied. - Harden scripts/set_up_net_alias.sh with set -euo pipefail, better Linux tooling detection (ifconfig vs ip), idempotent alias creation, and clearer error handling
1 parent 83e63fd commit d11c84b

File tree

11 files changed

+227
-19
lines changed

11 files changed

+227
-19
lines changed

cmd/topicctl/subcmd/apply.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ func applyTopic(
227227
aws.Config{},
228228
config.AdminClientOpts{
229229
ReadOnly: applyConfig.dryRun,
230+
KafkaConnTimeout: applyConfig.shared.connTimeout,
230231
UsernameOverride: applyConfig.shared.saslUsername,
231232
PasswordOverride: applyConfig.shared.saslPassword,
232233
SecretsManagerArnOverride: applyConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/bootstrap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func bootstrapRun(cmd *cobra.Command, args []string) error {
8282
aws.Config{},
8383
config.AdminClientOpts{
8484
ReadOnly: true,
85+
KafkaConnTimeout: bootstrapConfig.shared.connTimeout,
8586
UsernameOverride: bootstrapConfig.shared.saslUsername,
8687
PasswordOverride: bootstrapConfig.shared.saslPassword,
8788
SecretsManagerArnOverride: bootstrapConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/check.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func checkTopicFile(
140140
aws.Config{},
141141
config.AdminClientOpts{
142142
ReadOnly: true,
143+
KafkaConnTimeout: checkConfig.shared.connTimeout,
143144
UsernameOverride: checkConfig.shared.saslUsername,
144145
PasswordOverride: checkConfig.shared.saslPassword,
145146
SecretsManagerArnOverride: checkConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/create.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func createACL(
153153
aws.Config{},
154154
config.AdminClientOpts{
155155
ReadOnly: createConfig.dryRun,
156+
KafkaConnTimeout: createConfig.shared.connTimeout,
156157
UsernameOverride: createConfig.shared.saslUsername,
157158
PasswordOverride: createConfig.shared.saslPassword,
158159
SecretsManagerArnOverride: createConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/rebalance.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
128128
aws.Config{},
129129
config.AdminClientOpts{
130130
ReadOnly: rebalanceConfig.dryRun,
131+
KafkaConnTimeout: rebalanceConfig.shared.connTimeout,
131132
UsernameOverride: rebalanceConfig.shared.saslUsername,
132133
PasswordOverride: rebalanceConfig.shared.saslPassword,
133134
SecretsManagerArnOverride: rebalanceConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/shared.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,4 +320,10 @@ func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {
320320
os.Getenv("TOPICCTL_SASL_SECRETS_MANAGER_ARN"),
321321
"Secrets Manager ARN to use for credentials if using SASL; will override value set in cluster config",
322322
)
323+
cmd.Flags().DurationVar(
324+
&options.connTimeout,
325+
"conn-timeout",
326+
10*time.Second,
327+
"Kafka connection timeout",
328+
)
323329
}

pkg/admin/brokerclient.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
const (
18-
defaultTimeout = 5 * time.Second
18+
defaultRequestTimeout = 5 * time.Second
1919

2020
// Used for filtering out default configs
2121
configSourceUnknown int8 = 0
@@ -138,6 +138,14 @@ func NewBrokerAdminClient(
138138
return adminClient, nil
139139
}
140140

141+
func (c *BrokerAdminClient) requestTimeout() time.Duration {
142+
if c.config.ConnTimeout > 0 {
143+
return c.config.ConnTimeout
144+
}
145+
146+
return defaultRequestTimeout
147+
}
148+
141149
// GetClusterID gets the ID of the cluster.
142150
func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error) {
143151
resp, err := c.getMetadata(ctx, nil)
@@ -597,7 +605,7 @@ func (c *BrokerAdminClient) AssignPartitions(
597605
req := kafka.AlterPartitionReassignmentsRequest{
598606
Topic: topic,
599607
Assignments: apiAssignments,
600-
Timeout: defaultTimeout,
608+
Timeout: c.requestTimeout(),
601609
}
602610
log.Debugf("AlterPartitionReassignments request: %+v", req)
603611

@@ -708,7 +716,7 @@ func (c *BrokerAdminClient) RunLeaderElection(
708716
req := kafka.ElectLeadersRequest{
709717
Topic: topic,
710718
Partitions: partitions,
711-
Timeout: defaultTimeout,
719+
Timeout: c.requestTimeout(),
712720
}
713721
log.Debugf("ElectLeaders request: %+v", req)
714722

pkg/admin/brokerclient_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,25 @@ func TestBrokerClientControllerID(t *testing.T) {
4444
}, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID))
4545
}
4646

47+
func TestBrokerAdminClientRequestTimeoutDefault(t *testing.T) {
48+
client := &BrokerAdminClient{}
49+
50+
assert.Equal(t, defaultRequestTimeout, client.requestTimeout())
51+
}
52+
53+
func TestBrokerAdminClientRequestTimeoutOverride(t *testing.T) {
54+
customTimeout := 42 * time.Second
55+
client := &BrokerAdminClient{
56+
config: BrokerAdminClientConfig{
57+
ConnectorConfig: ConnectorConfig{
58+
ConnTimeout: customTimeout,
59+
},
60+
},
61+
}
62+
63+
assert.Equal(t, customTimeout, client.requestTimeout())
64+
}
65+
4766
func TestBrokerClientGetClusterID(t *testing.T) {
4867
if !util.CanTestBrokerAdmin() {
4968
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")

pkg/admin/connector.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
7373
Config: config,
7474
}
7575

76+
connTimeout := config.ConnTimeout
77+
if connTimeout == 0 {
78+
connTimeout = 10 * time.Second
79+
}
80+
7681
var mechanismClient sasl.Mechanism
7782
var tlsConfig *tls.Config
7883
var err error
@@ -143,6 +148,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
143148

144149
if !config.TLS.Enabled {
145150
connector.Dialer = kafka.DefaultDialer
151+
connector.Dialer.Timeout = connTimeout
146152
connector.Dialer.SASLMechanism = mechanismClient
147153
} else {
148154
var certs []tls.Certificate
@@ -184,7 +190,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
184190
}
185191
connector.Dialer = &kafka.Dialer{
186192
SASLMechanism: mechanismClient,
187-
Timeout: 10 * time.Second,
193+
Timeout: connTimeout,
188194
TLS: tlsConfig,
189195
}
190196
}
@@ -194,13 +200,18 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
194200
config.TLS.Enabled,
195201
config.SASL.Enabled,
196202
)
203+
transport := &kafka.Transport{
204+
SASL: mechanismClient,
205+
TLS: tlsConfig,
206+
DialTimeout: connTimeout,
207+
}
208+
if connector.Dialer.DialFunc != nil {
209+
transport.Dial = connector.Dialer.DialFunc
210+
}
197211
connector.KafkaClient = &kafka.Client{
198212
Addr: kafka.TCP(config.BrokerAddr),
199-
Transport: &kafka.Transport{
200-
Dial: connector.Dialer.DialFunc,
201-
SASL: mechanismClient,
202-
TLS: tlsConfig,
203-
},
213+
Timeout: connTimeout,
214+
Transport: transport,
204215
}
205216

206217
return connector, nil

pkg/admin/connector_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"testing"
8+
"time"
9+
10+
"github.com/segmentio/kafka-go"
11+
"github.com/segmentio/kafka-go/protocol"
12+
"github.com/segmentio/kafka-go/protocol/createtopics"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestNewConnectorDefaultTimeout(t *testing.T) {
18+
originalTimeout := kafka.DefaultDialer.Timeout
19+
t.Cleanup(func() { kafka.DefaultDialer.Timeout = originalTimeout })
20+
21+
connector, err := NewConnector(
22+
ConnectorConfig{
23+
BrokerAddr: "localhost:9092",
24+
},
25+
)
26+
require.NoError(t, err)
27+
28+
assert.Equal(t, 10*time.Second, connector.Dialer.Timeout)
29+
transport, ok := connector.KafkaClient.Transport.(*kafka.Transport)
30+
require.True(t, ok)
31+
assert.Equal(t, 10*time.Second, transport.DialTimeout)
32+
assert.Equal(t, 10*time.Second, connector.KafkaClient.Timeout)
33+
}
34+
35+
func TestNewConnectorCustomTimeout(t *testing.T) {
36+
customTimeout := 3 * time.Second
37+
38+
connector, err := NewConnector(
39+
ConnectorConfig{
40+
BrokerAddr: "localhost:9092",
41+
ConnTimeout: customTimeout,
42+
TLS: TLSConfig{
43+
Enabled: true,
44+
SkipVerify: true,
45+
},
46+
},
47+
)
48+
require.NoError(t, err)
49+
50+
assert.Equal(t, customTimeout, connector.Dialer.Timeout)
51+
assert.NotNil(t, connector.Dialer.TLS)
52+
transport, ok := connector.KafkaClient.Transport.(*kafka.Transport)
53+
require.True(t, ok)
54+
assert.Equal(t, customTimeout, transport.DialTimeout)
55+
assert.Equal(t, customTimeout, connector.KafkaClient.Timeout)
56+
}
57+
58+
func TestConnectorDialerTimeoutHappyPath(t *testing.T) {
59+
listener, err := net.Listen("tcp", "127.0.0.1:0")
60+
require.NoError(t, err)
61+
t.Cleanup(func() { _ = listener.Close() })
62+
63+
acceptErrCh := make(chan error)
64+
go func() {
65+
defer close(acceptErrCh)
66+
conn, err := listener.Accept()
67+
if err != nil {
68+
acceptErrCh <- err
69+
return
70+
}
71+
if err := conn.Close(); err != nil {
72+
acceptErrCh <- err
73+
}
74+
}()
75+
76+
connector, err := NewConnector(
77+
ConnectorConfig{
78+
BrokerAddr: listener.Addr().String(),
79+
ConnTimeout: 100 * time.Millisecond,
80+
},
81+
)
82+
require.NoError(t, err)
83+
84+
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
85+
defer cancel()
86+
87+
conn, err := connector.Dialer.DialContext(ctx, "tcp", listener.Addr().String())
88+
require.NoError(t, err)
89+
require.NoError(t, conn.Close())
90+
91+
select {
92+
case err, ok := <-acceptErrCh:
93+
if ok {
94+
require.NoError(t, err)
95+
}
96+
case <-time.After(time.Second):
97+
t.Fatal("timed out waiting for listener accept")
98+
}
99+
}
100+
101+
func TestConnectorDialerTimeoutUnhappyPath(t *testing.T) {
102+
listener, err := net.Listen("tcp", "127.0.0.1:0")
103+
require.NoError(t, err)
104+
t.Cleanup(func() { _ = listener.Close() })
105+
106+
connector, err := NewConnector(
107+
ConnectorConfig{
108+
BrokerAddr: listener.Addr().String(),
109+
ConnTimeout: time.Nanosecond,
110+
},
111+
)
112+
require.NoError(t, err)
113+
114+
_, err = connector.Dialer.DialContext(t.Context(), "tcp", listener.Addr().String())
115+
require.Error(t, err)
116+
117+
var netErr net.Error
118+
if errors.As(err, &netErr) {
119+
require.True(t, netErr.Timeout(), "expected timeout error, got: %v", err)
120+
return
121+
}
122+
123+
require.True(t, errors.Is(err, context.DeadlineExceeded), "expected deadline exceeded, got: %v", err)
124+
}
125+
126+
type captureTransport struct {
127+
t *testing.T
128+
expectedTimeoutMs int32
129+
}
130+
131+
func (c *captureTransport) RoundTrip(_ context.Context, _ net.Addr, req protocol.Message) (protocol.Message, error) {
132+
createReq, ok := req.(*createtopics.Request)
133+
require.True(c.t, ok)
134+
require.Equal(c.t, c.expectedTimeoutMs, createReq.TimeoutMs)
135+
136+
return &createtopics.Response{}, nil
137+
}

0 commit comments

Comments
 (0)