Skip to content

Commit d9dbca5

Browse files
author
Jelmer Snoeck
committed
Add configurable etcd call timeout
Add configurable call timeouts to the etcd client through context deadlines. This is different than the dial timeout, which is only used to establish the connection from the client to the server. This avoids a stuck reconciliation loop when there is an unresponsive call to the server.
1 parent 7a0ee7c commit d9dbca5

File tree

8 files changed

+59
-17
lines changed

8 files changed

+59
-17
lines changed

controlplane/kubeadm/controllers/alias.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type KubeadmControlPlaneReconciler struct {
3535
Tracker *remote.ClusterCacheTracker
3636

3737
EtcdDialTimeout time.Duration
38+
EtcdCallTimeout time.Duration
3839

3940
// WatchFilterValue is the label value used to filter events prior to reconciliation.
4041
WatchFilterValue string
@@ -47,6 +48,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
4748
APIReader: r.APIReader,
4849
Tracker: r.Tracker,
4950
EtcdDialTimeout: r.EtcdDialTimeout,
51+
EtcdCallTimeout: r.EtcdCallTimeout,
5052
WatchFilterValue: r.WatchFilterValue,
5153
}).SetupWithManager(ctx, mgr, options)
5254
}

controlplane/kubeadm/internal/cluster.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Management struct {
5353
Client client.Reader
5454
Tracker *remote.ClusterCacheTracker
5555
EtcdDialTimeout time.Duration
56+
EtcdCallTimeout time.Duration
5657
}
5758

5859
// RemoteClusterConnectionError represents a failure to connect to a remote cluster.
@@ -163,7 +164,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
163164
restConfig: restConfig,
164165
Client: c,
165166
CoreDNSMigrator: &CoreDNSMigrator{},
166-
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout),
167+
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout, m.EtcdCallTimeout),
167168
}, nil
168169
}
169170

controlplane/kubeadm/internal/controllers/controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type KubeadmControlPlaneReconciler struct {
7070
recorder record.EventRecorder
7171
Tracker *remote.ClusterCacheTracker
7272
EtcdDialTimeout time.Duration
73+
EtcdCallTimeout time.Duration
7374

7475
// WatchFilterValue is the label value used to filter events prior to reconciliation.
7576
WatchFilterValue string
@@ -112,6 +113,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
112113
Client: r.Client,
113114
Tracker: r.Tracker,
114115
EtcdDialTimeout: r.EtcdDialTimeout,
116+
EtcdCallTimeout: r.EtcdCallTimeout,
115117
}
116118
}
117119

controlplane/kubeadm/internal/etcd/etcd.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ type etcd interface {
4949

5050
// Client wraps an etcd client formatting its output to something more consumable.
5151
type Client struct {
52-
EtcdClient etcd
53-
Endpoint string
54-
LeaderID uint64
55-
Errors []string
52+
EtcdClient etcd
53+
Endpoint string
54+
LeaderID uint64
55+
Errors []string
56+
CallTimeout time.Duration
5657
}
5758

5859
// MemberAlarm represents an alarm type association with a cluster member.
@@ -78,6 +79,10 @@ const (
7879
AlarmCorrupt
7980
)
8081

82+
// DefaultCallTimeout represents the duration that the etcd client waits at most
83+
// for read and write operations to etcd.
84+
const DefaultCallTimeout = 15 * time.Second
85+
8186
// AlarmTypeName provides a text translation for AlarmType codes.
8287
var AlarmTypeName = map[AlarmType]string{
8388
AlarmOK: "NONE",
@@ -130,6 +135,7 @@ type ClientConfiguration struct {
130135
Proxy proxy.Proxy
131136
TLSConfig *tls.Config
132137
DialTimeout time.Duration
138+
CallTimeout time.Duration
133139
}
134140

135141
// NewClient creates a new etcd client with the given configuration.
@@ -152,30 +158,39 @@ func NewClient(ctx context.Context, config ClientConfiguration) (*Client, error)
152158
return nil, errors.Wrap(err, "unable to create etcd client")
153159
}
154160

155-
client, err := newEtcdClient(ctx, etcdClient)
161+
callTimeout := config.CallTimeout
162+
if callTimeout == 0 {
163+
callTimeout = DefaultCallTimeout
164+
}
165+
166+
client, err := newEtcdClient(ctx, etcdClient, callTimeout)
156167
if err != nil {
157168
closeErr := etcdClient.Close()
158169
return nil, errors.Wrap(kerrors.NewAggregate([]error{err, closeErr}), "unable to create etcd client")
159170
}
160171
return client, nil
161172
}
162173

163-
func newEtcdClient(ctx context.Context, etcdClient etcd) (*Client, error) {
174+
func newEtcdClient(ctx context.Context, etcdClient etcd, callTimeout time.Duration) (*Client, error) {
164175
endpoints := etcdClient.Endpoints()
165176
if len(endpoints) == 0 {
166177
return nil, errors.New("etcd client was not configured with any endpoints")
167178
}
168179

180+
ctx, cancel := context.WithTimeout(ctx, callTimeout)
181+
defer cancel()
182+
169183
status, err := etcdClient.Status(ctx, endpoints[0])
170184
if err != nil {
171185
return nil, errors.Wrap(err, "failed to get etcd status")
172186
}
173187

174188
return &Client{
175-
Endpoint: endpoints[0],
176-
EtcdClient: etcdClient,
177-
LeaderID: status.Leader,
178-
Errors: status.Errors,
189+
Endpoint: endpoints[0],
190+
EtcdClient: etcdClient,
191+
LeaderID: status.Leader,
192+
Errors: status.Errors,
193+
CallTimeout: callTimeout,
179194
}, nil
180195
}
181196

@@ -186,6 +201,9 @@ func (c *Client) Close() error {
186201

187202
// Members retrieves a list of etcd members.
188203
func (c *Client) Members(ctx context.Context) ([]*Member, error) {
204+
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
205+
defer cancel()
206+
189207
response, err := c.EtcdClient.MemberList(ctx)
190208
if err != nil {
191209
return nil, errors.Wrap(err, "failed to get list of members for etcd cluster")
@@ -214,18 +232,27 @@ func (c *Client) Members(ctx context.Context) ([]*Member, error) {
214232

215233
// MoveLeader moves the leader to the provided member ID.
216234
func (c *Client) MoveLeader(ctx context.Context, newLeaderID uint64) error {
235+
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
236+
defer cancel()
237+
217238
_, err := c.EtcdClient.MoveLeader(ctx, newLeaderID)
218239
return errors.Wrapf(err, "failed to move etcd leader: %v", newLeaderID)
219240
}
220241

221242
// RemoveMember removes a given member.
222243
func (c *Client) RemoveMember(ctx context.Context, id uint64) error {
244+
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
245+
defer cancel()
246+
223247
_, err := c.EtcdClient.MemberRemove(ctx, id)
224248
return errors.Wrapf(err, "failed to remove member: %v", id)
225249
}
226250

227251
// UpdateMemberPeerURLs updates the list of peer URLs.
228252
func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*Member, error) {
253+
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
254+
defer cancel()
255+
229256
response, err := c.EtcdClient.MemberUpdate(ctx, id, peerURLs)
230257
if err != nil {
231258
return nil, errors.Wrapf(err, "failed to update etcd member %v's peer list to %+v", id, peerURLs)
@@ -241,6 +268,9 @@ func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs [
241268

242269
// Alarms retrieves all alarms on a cluster.
243270
func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) {
271+
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
272+
defer cancel()
273+
244274
alarmResponse, err := c.EtcdClient.AlarmList(ctx)
245275
if err != nil {
246276
return nil, errors.Wrap(err, "failed to get alarms for etcd cluster")

controlplane/kubeadm/internal/etcd/etcd_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestEtcdMembers_WithErrors(t *testing.T) {
4949
ErrorResponse: errors.New("something went wrong"),
5050
}
5151

52-
client, err := newEtcdClient(ctx, fakeEtcdClient)
52+
client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout)
5353
g.Expect(err).NotTo(HaveOccurred())
5454

5555
members, err := client.Members(ctx)
@@ -86,7 +86,7 @@ func TestEtcdMembers_WithSuccess(t *testing.T) {
8686
StatusResponse: &clientv3.StatusResponse{},
8787
}
8888

89-
client, err := newEtcdClient(ctx, fakeEtcdClient)
89+
client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout)
9090
g.Expect(err).NotTo(HaveOccurred())
9191

9292
members, err := client.Members(ctx)

controlplane/kubeadm/internal/etcd_client_generator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type clientCreator func(ctx context.Context, endpoints []string) (*etcd.Client,
4343
var errEtcdNodeConnection = errors.New("failed to connect to etcd node")
4444

4545
// NewEtcdClientGenerator returns a new etcdClientGenerator instance.
46-
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout time.Duration) *EtcdClientGenerator {
46+
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout, etcdCallTimeout time.Duration) *EtcdClientGenerator {
4747
ecg := &EtcdClientGenerator{restConfig: restConfig, tlsConfig: tlsConfig}
4848

4949
ecg.createClient = func(ctx context.Context, endpoints []string) (*etcd.Client, error) {
@@ -58,6 +58,7 @@ func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcd
5858
Proxy: p,
5959
TLSConfig: tlsConfig,
6060
DialTimeout: etcdDialTimeout,
61+
CallTimeout: etcdCallTimeout,
6162
})
6263
}
6364

controlplane/kubeadm/internal/etcd_client_generator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var (
3838

3939
func TestNewEtcdClientGenerator(t *testing.T) {
4040
g := NewWithT(t)
41-
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
41+
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
4242
g.Expect(subject.createClient).To(Not(BeNil()))
4343
}
4444

@@ -90,7 +90,7 @@ func TestFirstAvailableNode(t *testing.T) {
9090
for _, tt := range tests {
9191
t.Run(tt.name, func(t *testing.T) {
9292
g := NewWithT(t)
93-
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
93+
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
9494
subject.createClient = tt.cc
9595

9696
client, err := subject.forFirstAvailableNode(ctx, tt.nodes)
@@ -212,7 +212,7 @@ func TestForLeader(t *testing.T) {
212212
t.Run(tt.name, func(t *testing.T) {
213213
g := NewWithT(t)
214214

215-
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
215+
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
216216
subject.createClient = tt.cc
217217

218218
client, err := subject.forLeader(ctx, tt.nodes)

controlplane/kubeadm/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
controlplanev1alpha4 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha4"
5252
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
5353
kubeadmcontrolplanecontrollers "sigs.k8s.io/cluster-api/controlplane/kubeadm/controllers"
54+
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
5455
kcpwebhooks "sigs.k8s.io/cluster-api/controlplane/kubeadm/webhooks"
5556
"sigs.k8s.io/cluster-api/feature"
5657
"sigs.k8s.io/cluster-api/util/flags"
@@ -90,6 +91,7 @@ var (
9091
webhookCertDir string
9192
healthAddr string
9293
etcdDialTimeout time.Duration
94+
etcdCallTimeout time.Duration
9395
tlsOptions = flags.TLSOptions{}
9496
logOptions = logs.NewOptions()
9597
)
@@ -141,6 +143,9 @@ func InitFlags(fs *pflag.FlagSet) {
141143
fs.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second,
142144
"Duration that the etcd client waits at most to establish a connection with etcd")
143145

146+
fs.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", etcd.DefaultCallTimeout,
147+
"Duration that the etcd client waits at most for read and write operations to etcd.")
148+
144149
flags.AddTLSOptions(fs, &tlsOptions)
145150

146151
feature.MutableGates.AddFlag(fs)
@@ -266,6 +271,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
266271
Tracker: tracker,
267272
WatchFilterValue: watchFilterValue,
268273
EtcdDialTimeout: etcdDialTimeout,
274+
EtcdCallTimeout: etcdCallTimeout,
269275
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
270276
setupLog.Error(err, "unable to create controller", "controller", "KubeadmControlPlane")
271277
os.Exit(1)

0 commit comments

Comments
 (0)