Skip to content

Commit d8b8822

Browse files
author
Lars Grahmann
committed
Honor Kafka admin connector timeouts with full coverage
- Update pkg/admin/brokerclient.go to expose a requestTimeout helper that returns the configured connector timeout when set (defaulting to defaultRequestTimeout), and use it for alter-reassignment and leader-election requests so user overrides take effect. - Teach pkg/admin/connector.go to propagate ConnTimeout to whichever dialer path we take, while keeping the 10s default when nothing is supplied. - Harden scripts/set_up_net_alias.sh with set -euo pipefail, better Linux tooling detection (ifconfig vs ip), idempotent alias creation, and clearer error handling
1 parent 83e63fd commit d8b8822

File tree

11 files changed

+209
-19
lines changed

11 files changed

+209
-19
lines changed

cmd/topicctl/subcmd/apply.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ func applyTopic(
227227
aws.Config{},
228228
config.AdminClientOpts{
229229
ReadOnly: applyConfig.dryRun,
230+
KafkaConnTimeout: applyConfig.shared.connTimeout,
230231
UsernameOverride: applyConfig.shared.saslUsername,
231232
PasswordOverride: applyConfig.shared.saslPassword,
232233
SecretsManagerArnOverride: applyConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/bootstrap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func bootstrapRun(cmd *cobra.Command, args []string) error {
8282
aws.Config{},
8383
config.AdminClientOpts{
8484
ReadOnly: true,
85+
KafkaConnTimeout: bootstrapConfig.shared.connTimeout,
8586
UsernameOverride: bootstrapConfig.shared.saslUsername,
8687
PasswordOverride: bootstrapConfig.shared.saslPassword,
8788
SecretsManagerArnOverride: bootstrapConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/check.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func checkTopicFile(
140140
aws.Config{},
141141
config.AdminClientOpts{
142142
ReadOnly: true,
143+
KafkaConnTimeout: checkConfig.shared.connTimeout,
143144
UsernameOverride: checkConfig.shared.saslUsername,
144145
PasswordOverride: checkConfig.shared.saslPassword,
145146
SecretsManagerArnOverride: checkConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/create.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func createACL(
153153
aws.Config{},
154154
config.AdminClientOpts{
155155
ReadOnly: createConfig.dryRun,
156+
KafkaConnTimeout: createConfig.shared.connTimeout,
156157
UsernameOverride: createConfig.shared.saslUsername,
157158
PasswordOverride: createConfig.shared.saslPassword,
158159
SecretsManagerArnOverride: createConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/rebalance.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
128128
aws.Config{},
129129
config.AdminClientOpts{
130130
ReadOnly: rebalanceConfig.dryRun,
131+
KafkaConnTimeout: rebalanceConfig.shared.connTimeout,
131132
UsernameOverride: rebalanceConfig.shared.saslUsername,
132133
PasswordOverride: rebalanceConfig.shared.saslPassword,
133134
SecretsManagerArnOverride: rebalanceConfig.shared.saslSecretsManagerArn,

cmd/topicctl/subcmd/shared.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,4 +320,10 @@ func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {
320320
os.Getenv("TOPICCTL_SASL_SECRETS_MANAGER_ARN"),
321321
"Secrets Manager ARN to use for credentials if using SASL; will override value set in cluster config",
322322
)
323+
cmd.Flags().DurationVar(
324+
&options.connTimeout,
325+
"conn-timeout",
326+
10*time.Second,
327+
"Kafka connection timeout",
328+
)
323329
}

pkg/admin/brokerclient.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
const (
18-
defaultTimeout = 5 * time.Second
18+
defaultRequestTimeout = 5 * time.Second
1919

2020
// Used for filtering out default configs
2121
configSourceUnknown int8 = 0
@@ -138,6 +138,14 @@ func NewBrokerAdminClient(
138138
return adminClient, nil
139139
}
140140

141+
func (c *BrokerAdminClient) requestTimeout() time.Duration {
142+
if c.config.ConnTimeout > 0 {
143+
return c.config.ConnTimeout
144+
}
145+
146+
return defaultRequestTimeout
147+
}
148+
141149
// GetClusterID gets the ID of the cluster.
142150
func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error) {
143151
resp, err := c.getMetadata(ctx, nil)
@@ -597,7 +605,7 @@ func (c *BrokerAdminClient) AssignPartitions(
597605
req := kafka.AlterPartitionReassignmentsRequest{
598606
Topic: topic,
599607
Assignments: apiAssignments,
600-
Timeout: defaultTimeout,
608+
Timeout: c.requestTimeout(),
601609
}
602610
log.Debugf("AlterPartitionReassignments request: %+v", req)
603611

@@ -708,7 +716,7 @@ func (c *BrokerAdminClient) RunLeaderElection(
708716
req := kafka.ElectLeadersRequest{
709717
Topic: topic,
710718
Partitions: partitions,
711-
Timeout: defaultTimeout,
719+
Timeout: c.requestTimeout(),
712720
}
713721
log.Debugf("ElectLeaders request: %+v", req)
714722

pkg/admin/brokerclient_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,25 @@ func TestBrokerClientControllerID(t *testing.T) {
4444
}, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID))
4545
}
4646

47+
func TestBrokerAdminClientRequestTimeoutDefault(t *testing.T) {
48+
client := &BrokerAdminClient{}
49+
50+
assert.Equal(t, defaultRequestTimeout, client.requestTimeout())
51+
}
52+
53+
func TestBrokerAdminClientRequestTimeoutOverride(t *testing.T) {
54+
customTimeout := 42 * time.Second
55+
client := &BrokerAdminClient{
56+
config: BrokerAdminClientConfig{
57+
ConnectorConfig: ConnectorConfig{
58+
ConnTimeout: customTimeout,
59+
},
60+
},
61+
}
62+
63+
assert.Equal(t, customTimeout, client.requestTimeout())
64+
}
65+
4766
func TestBrokerClientGetClusterID(t *testing.T) {
4867
if !util.CanTestBrokerAdmin() {
4968
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")

pkg/admin/connector.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
7373
Config: config,
7474
}
7575

76+
timeout := config.ConnTimeout
77+
if timeout == 0 {
78+
timeout = 10 * time.Second
79+
}
80+
7681
var mechanismClient sasl.Mechanism
7782
var tlsConfig *tls.Config
7883
var err error
@@ -143,6 +148,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
143148

144149
if !config.TLS.Enabled {
145150
connector.Dialer = kafka.DefaultDialer
151+
connector.Dialer.Timeout = timeout
146152
connector.Dialer.SASLMechanism = mechanismClient
147153
} else {
148154
var certs []tls.Certificate
@@ -184,7 +190,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
184190
}
185191
connector.Dialer = &kafka.Dialer{
186192
SASLMechanism: mechanismClient,
187-
Timeout: 10 * time.Second,
193+
Timeout: timeout,
188194
TLS: tlsConfig,
189195
}
190196
}
@@ -194,13 +200,17 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
194200
config.TLS.Enabled,
195201
config.SASL.Enabled,
196202
)
203+
transport := &kafka.Transport{
204+
SASL: mechanismClient,
205+
TLS: tlsConfig,
206+
DialTimeout: timeout,
207+
}
208+
if connector.Dialer.DialFunc != nil {
209+
transport.Dial = connector.Dialer.DialFunc
210+
}
197211
connector.KafkaClient = &kafka.Client{
198212
Addr: kafka.TCP(config.BrokerAddr),
199-
Transport: &kafka.Transport{
200-
Dial: connector.Dialer.DialFunc,
201-
SASL: mechanismClient,
202-
TLS: tlsConfig,
203-
},
213+
Transport: transport,
204214
}
205215

206216
return connector, nil

pkg/admin/connector_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"testing"
8+
"time"
9+
10+
"github.com/segmentio/kafka-go"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestNewConnectorDefaultTimeout(t *testing.T) {
16+
originalTimeout := kafka.DefaultDialer.Timeout
17+
t.Cleanup(func() { kafka.DefaultDialer.Timeout = originalTimeout })
18+
19+
connector, err := NewConnector(
20+
ConnectorConfig{
21+
BrokerAddr: "localhost:9092",
22+
},
23+
)
24+
require.NoError(t, err)
25+
26+
assert.Equal(t, 10*time.Second, connector.Dialer.Timeout)
27+
transport, ok := connector.KafkaClient.Transport.(*kafka.Transport)
28+
require.True(t, ok)
29+
assert.Equal(t, 10*time.Second, transport.DialTimeout)
30+
}
31+
32+
func TestNewConnectorCustomTimeout(t *testing.T) {
33+
customTimeout := 3 * time.Second
34+
35+
connector, err := NewConnector(
36+
ConnectorConfig{
37+
BrokerAddr: "localhost:9092",
38+
ConnTimeout: customTimeout,
39+
TLS: TLSConfig{
40+
Enabled: true,
41+
SkipVerify: true,
42+
},
43+
},
44+
)
45+
require.NoError(t, err)
46+
47+
assert.Equal(t, customTimeout, connector.Dialer.Timeout)
48+
assert.NotNil(t, connector.Dialer.TLS)
49+
transport, ok := connector.KafkaClient.Transport.(*kafka.Transport)
50+
require.True(t, ok)
51+
assert.Equal(t, customTimeout, transport.DialTimeout)
52+
}
53+
54+
func TestConnectorDialerTimeoutHappyPath(t *testing.T) {
55+
listener, err := net.Listen("tcp", "127.0.0.1:0")
56+
require.NoError(t, err)
57+
t.Cleanup(func() { _ = listener.Close() })
58+
59+
acceptErrCh := make(chan error)
60+
go func() {
61+
defer close(acceptErrCh)
62+
conn, err := listener.Accept()
63+
if err != nil {
64+
acceptErrCh <- err
65+
return
66+
}
67+
if err := conn.Close(); err != nil {
68+
acceptErrCh <- err
69+
}
70+
}()
71+
72+
connector, err := NewConnector(
73+
ConnectorConfig{
74+
BrokerAddr: listener.Addr().String(),
75+
ConnTimeout: 100 * time.Millisecond,
76+
},
77+
)
78+
require.NoError(t, err)
79+
80+
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
81+
defer cancel()
82+
83+
conn, err := connector.Dialer.DialContext(ctx, "tcp", listener.Addr().String())
84+
require.NoError(t, err)
85+
require.NoError(t, conn.Close())
86+
87+
select {
88+
case err, ok := <-acceptErrCh:
89+
if ok {
90+
require.NoError(t, err)
91+
}
92+
case <-time.After(time.Second):
93+
t.Fatal("timed out waiting for listener accept")
94+
}
95+
}
96+
97+
func TestConnectorDialerTimeoutUnhappyPath(t *testing.T) {
98+
listener, err := net.Listen("tcp", "127.0.0.1:0")
99+
require.NoError(t, err)
100+
t.Cleanup(func() { _ = listener.Close() })
101+
102+
connector, err := NewConnector(
103+
ConnectorConfig{
104+
BrokerAddr: listener.Addr().String(),
105+
ConnTimeout: time.Nanosecond,
106+
},
107+
)
108+
require.NoError(t, err)
109+
110+
_, err = connector.Dialer.DialContext(t.Context(), "tcp", listener.Addr().String())
111+
require.Error(t, err)
112+
113+
var netErr net.Error
114+
if errors.As(err, &netErr) {
115+
require.True(t, netErr.Timeout(), "expected timeout error, got: %v", err)
116+
return
117+
}
118+
119+
require.True(t, errors.Is(err, context.DeadlineExceeded), "expected deadline exceeded, got: %v", err)
120+
}

0 commit comments

Comments
 (0)