Skip to content

Commit cd95e49

Browse files
committed
update peer timeout logic:
- Update the default PeerTimeout value to a higher, safer value (e.g., 7 seconds). - added a validating admission webhook that issues a warning if the configured PeerTimeout is less than APIServerTimeout + MinimumBuffer (e.g., 2 seconds). - if the configured PeerTimeout is less than APIServerTimeout + MinimumBuffer, the operator will internally use APIServerTimeout + MinimumBuffer for the peer check. - added Unit tests Signed-off-by: Michael Shitrit <[email protected]>
1 parent c960b00 commit cd95e49

10 files changed

+136
-27
lines changed

api/v1alpha1/selfnoderemediationconfig_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ type SelfNodeRemediationConfigSpec struct {
9494

9595
// Timeout for each peer request.
9696
// Valid time units are "ms", "s", "m", "h".
97-
// +kubebuilder:default:="5s"
97+
// +kubebuilder:default:="7s"
9898
// +kubebuilder:validation:Pattern="^([0-9]+(\\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$"
9999
// +kubebuilder:validation:Type:=string
100100
// +optional

api/v1alpha1/selfnoderemediationconfig_webhook.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ const (
4949
minDurPeerRequestTimeout = 10 * time.Millisecond
5050
minDurApiCheckInterval = 1 * time.Second
5151
minDurPeerUpdateInterval = 10 * time.Second
52+
53+
// MinimumBuffer is the minimum buffer time between APIServerTimeout and PeerRequestTimeout
54+
// It is required to make sure there is enough time for network communication between the peers in case the API Server is out
55+
MinimumBuffer = 2 * time.Second
5256
)
5357

5458
type field struct {
@@ -74,7 +78,9 @@ var _ webhook.Validator = &SelfNodeRemediationConfig{}
7478
func (r *SelfNodeRemediationConfig) ValidateCreate() (warning admission.Warnings, err error) {
7579
selfNodeRemediationConfigLog.Info("validate create", "name", r.Name)
7680

77-
return admission.Warnings{}, errors.NewAggregate([]error{
81+
warnings := r.validatePeerTimeoutSafety()
82+
83+
return warnings, errors.NewAggregate([]error{
7884
r.validateTimes(),
7985
r.validateCustomTolerations(),
8086
r.validateSingleton(),
@@ -86,7 +92,9 @@ func (r *SelfNodeRemediationConfig) ValidateCreate() (warning admission.Warnings
8692
func (r *SelfNodeRemediationConfig) ValidateUpdate(_ runtime.Object) (warning admission.Warnings, err error) {
8793
selfNodeRemediationConfigLog.Info("validate update", "name", r.Name)
8894

89-
return admission.Warnings{}, errors.NewAggregate([]error{
95+
warnings := r.validatePeerTimeoutSafety()
96+
97+
return warnings, errors.NewAggregate([]error{
9098
r.validateTimes(),
9199
r.validateCustomTolerations(),
92100
})
@@ -184,6 +192,43 @@ func validateToleration(toleration v1.Toleration) error {
184192
return nil
185193
}
186194

195+
// validatePeerTimeoutSafety checks if PeerRequestTimeout is safe relative to ApiServerTimeout
196+
// and returns warnings if the configuration might be unsafe
197+
func (r *SelfNodeRemediationConfig) validatePeerTimeoutSafety() admission.Warnings {
198+
var warnings admission.Warnings
199+
200+
spec := r.Spec
201+
if spec.PeerRequestTimeout == nil || spec.ApiServerTimeout == nil {
202+
// Use defaults if not specified
203+
return warnings
204+
}
205+
206+
peerRequestTimeoutDuration := spec.PeerRequestTimeout.Duration
207+
apiServerTimeoutDuration := spec.ApiServerTimeout.Duration
208+
minimumSafePeerTimeout := apiServerTimeoutDuration + MinimumBuffer
209+
210+
if peerRequestTimeoutDuration < minimumSafePeerTimeout {
211+
warningMsg := fmt.Sprintf(
212+
"PeerRequestTimeout (%s) is less than ApiServerTimeout + MinimumBuffer (%s + %s = %s). "+
213+
"This configuration may lead to race conditions where peer health checks time out "+
214+
"before API server checks complete, potentially causing premature remediation. "+
215+
"Consider increasing PeerRequestTimeout to at least %s for safer operation.",
216+
peerRequestTimeoutDuration,
217+
apiServerTimeoutDuration,
218+
MinimumBuffer,
219+
minimumSafePeerTimeout,
220+
minimumSafePeerTimeout,
221+
)
222+
warnings = append(warnings, warningMsg)
223+
selfNodeRemediationConfigLog.Info("PeerRequestTimeout safety warning",
224+
"peerRequestTimeout", peerRequestTimeoutDuration,
225+
"apiServerTimeout", apiServerTimeoutDuration,
226+
"minimumSafeTimeout", minimumSafePeerTimeout)
227+
}
228+
229+
return warnings
230+
}
231+
187232
func (r *SelfNodeRemediationConfig) validateSingleton() error {
188233
if r.Name != ConfigCRName {
189234
return fmt.Errorf("to enforce only one SelfNodeRemediationConfig in the cluster, a name other than %s is not allowed", ConfigCRName)

api/v1alpha1/selfnoderemediationconfig_webhook_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const (
1818
peerApiServerTimeoutDefault = 5 * time.Second
1919
apiServerTimeoutDefault = 5 * time.Second
2020
peerDialTimeoutDefault = 5 * time.Second
21-
peerRequestTimeoutDefault = 5 * time.Second
21+
peerRequestTimeoutDefault = 7 * time.Second
2222
apiCheckIntervalDefault = 15 * time.Second
2323
peerUpdateIntervalDefault = 15 * time.Minute
2424
)
@@ -139,6 +139,42 @@ var _ = Describe("SelfNodeRemediationConfig Validation", func() {
139139
})
140140
})
141141

142+
Describe("PeerRequestTimeout Safety Validation", func() {
143+
var snrc *SelfNodeRemediationConfig
144+
BeforeEach(func() {
145+
snrc = createTestSelfNodeRemediationConfigCR()
146+
snrc.Name = ConfigCRName
147+
_ = os.Setenv("DEPLOYMENT_NAMESPACE", snrc.Namespace)
148+
})
149+
It("should produce warning when PeerRequestTimeout is too low", func() {
150+
// Set ApiServerTimeout to 5s and PeerRequestTimeout to 6s (less than 5s + 2s buffer)
151+
snrc.Spec.ApiServerTimeout = &metav1.Duration{Duration: 5 * time.Second}
152+
snrc.Spec.PeerRequestTimeout = &metav1.Duration{Duration: 6 * time.Second}
153+
154+
warnings, err := snrc.ValidateCreate()
155+
Expect(err).To(BeNil())
156+
Expect(len(warnings)).To(Equal(1))
157+
Expect(warnings[0]).To(ContainSubstring("PeerRequestTimeout (6s) is less than ApiServerTimeout + MinimumBuffer"))
158+
})
159+
160+
It("should not produce warning when PeerRequestTimeout is safe", func() {
161+
// Set ApiServerTimeout to 5s and PeerRequestTimeout to 8s (greater than 5s + 2s buffer)
162+
snrc.Spec.ApiServerTimeout = &metav1.Duration{Duration: 5 * time.Second}
163+
snrc.Spec.PeerRequestTimeout = &metav1.Duration{Duration: 8 * time.Second}
164+
165+
warnings, err := snrc.ValidateCreate()
166+
Expect(err).To(BeNil())
167+
Expect(len(warnings)).To(Equal(0))
168+
})
169+
170+
It("should not produce warning when using default values", func() {
171+
// Use default values: ApiServerTimeout=5s, PeerRequestTimeout=7s (which is safe)
172+
173+
warnings, err := snrc.ValidateCreate()
174+
Expect(err).To(BeNil())
175+
Expect(len(warnings)).To(Equal(0))
176+
})
177+
})
142178
})
143179

144180
func testSingleInvalidField(validationType validationType) {

bundle/manifests/self-node-remediation.medik8s.io_selfnoderemediationconfigs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ spec:
153153
pattern: ^([0-9]+(\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$
154154
type: string
155155
peerRequestTimeout:
156-
default: 5s
156+
default: 7s
157157
description: |-
158158
Timeout for each peer request.
159159
Valid time units are "ms", "s", "m", "h".

config/crd/bases/self-node-remediation.medik8s.io_selfnoderemediationconfigs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ spec:
151151
pattern: ^([0-9]+(\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$
152152
type: string
153153
peerRequestTimeout:
154-
default: 5s
154+
default: 7s
155155
description: |-
156156
Timeout for each peer request.
157157
Valid time units are "ms", "s", "m", "h".

controllers/tests/config/selfnoderemediationconfig_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ var _ = Describe("SNR Config Test", func() {
233233
Expect(createdConfig.Spec.MaxApiErrorThreshold).To(Equal(3))
234234

235235
Expect(createdConfig.Spec.PeerApiServerTimeout.Seconds()).To(BeEquivalentTo(5))
236-
Expect(createdConfig.Spec.PeerRequestTimeout.Seconds()).To(BeEquivalentTo(5))
236+
Expect(createdConfig.Spec.PeerRequestTimeout.Seconds()).To(BeEquivalentTo(7))
237237
Expect(createdConfig.Spec.PeerDialTimeout.Seconds()).To(BeEquivalentTo(5))
238238
Expect(createdConfig.Spec.ApiServerTimeout.Seconds()).To(BeEquivalentTo(5))
239239
Expect(createdConfig.Spec.ApiCheckInterval.Seconds()).To(BeEquivalentTo(15))

main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
344344
PeerRequestTimeout: peerRequestTimeout,
345345
PeerHealthPort: peerHealthDefaultPort,
346346
MaxTimeForNoPeersResponse: reboot.MaxTimeForNoPeersResponse,
347+
Recorder: mgr.GetEventRecorderFor("ApiConnectivityCheck"),
347348
}
348349

349350
controlPlaneManager := controlplane.NewManager(myNodeName, mgr.GetClient())
@@ -377,7 +378,7 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
377378

378379
setupLog.Info("init grpc server")
379380
// TODO make port configurable?
380-
server, err := peerhealth.NewServer(mgr.GetClient(), mgr.GetAPIReader(), ctrl.Log.WithName("peerhealth").WithName("server"), peerHealthDefaultPort, certReader)
381+
server, err := peerhealth.NewServer(mgr.GetClient(), mgr.GetAPIReader(), ctrl.Log.WithName("peerhealth").WithName("server"), peerHealthDefaultPort, certReader, apiServerTimeout)
381382
if err != nil {
382383
setupLog.Error(err, "failed to init grpc server")
383384
os.Exit(1)

pkg/apicheck/check.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@ import (
88
"time"
99

1010
"github.com/go-logr/logr"
11+
"github.com/medik8s/common/pkg/events"
1112
"google.golang.org/grpc/credentials"
1213

1314
corev1 "k8s.io/api/core/v1"
1415
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1517
"k8s.io/apimachinery/pkg/util/wait"
1618
"k8s.io/client-go/rest"
19+
"k8s.io/client-go/tools/record"
1720
"sigs.k8s.io/controller-runtime/pkg/client"
1821

1922
selfNodeRemediation "github.com/medik8s/self-node-remediation/api"
23+
"github.com/medik8s/self-node-remediation/api/v1alpha1"
2024
"github.com/medik8s/self-node-remediation/pkg/certificates"
2125
"github.com/medik8s/self-node-remediation/pkg/controlplane"
2226
"github.com/medik8s/self-node-remediation/pkg/peerhealth"
@@ -25,6 +29,10 @@ import (
2529
"github.com/medik8s/self-node-remediation/pkg/utils"
2630
)
2731

32+
const (
33+
eventReasonPeerTimeoutAdjusted = "PeerTimeoutAdjusted"
34+
)
35+
2836
type ApiConnectivityCheck struct {
2937
client.Reader
3038
config *ApiConnectivityCheckConfig
@@ -51,6 +59,7 @@ type ApiConnectivityCheckConfig struct {
5159
PeerHealthPort int
5260
MaxTimeForNoPeersResponse time.Duration
5361
MinPeersForRemediation int
62+
Recorder record.EventRecorder
5463
}
5564

5665
func New(config *ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) *ApiConnectivityCheck {
@@ -275,6 +284,25 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP
275284
return c.sumPeersResponses(nrAddresses, responsesChan)
276285
}
277286

287+
// getEffectivePeerRequestTimeout calculates the effective peer request timeout
288+
// ensuring it's safe relative to the API server timeout by enforcing a minimum buffer
289+
func (c *ApiConnectivityCheck) getEffectivePeerRequestTimeout() time.Duration {
290+
minimumSafeTimeout := c.config.ApiServerTimeout + v1alpha1.MinimumBuffer
291+
292+
if c.config.PeerRequestTimeout < minimumSafeTimeout {
293+
// Log warning about timeout adjustment
294+
c.config.Log.Info("PeerRequestTimeout is too low, using adjusted value for safety",
295+
"configuredTimeout", c.config.PeerRequestTimeout,
296+
"apiServerTimeout", c.config.ApiServerTimeout,
297+
"minimumBuffer", v1alpha1.MinimumBuffer,
298+
"effectiveTimeout", minimumSafeTimeout)
299+
events.WarningEventf(c.config.Recorder, &v1alpha1.SelfNodeRemediationConfig{ObjectMeta: metav1.ObjectMeta{Name: v1alpha1.ConfigCRName}}, eventReasonPeerTimeoutAdjusted, "PeerRequestTimeout (%s) was too low compared to ApiServerTimeout (%s), using safe value (%s) instead", c.config.PeerRequestTimeout, c.config.ApiServerTimeout, minimumSafeTimeout)
300+
return minimumSafeTimeout
301+
}
302+
303+
return c.config.PeerRequestTimeout
304+
}
305+
278306
// getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel
279307
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) {
280308

@@ -296,7 +324,8 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP,
296324
}
297325
defer phClient.Close()
298326

299-
ctx, cancel := context.WithTimeout(context.Background(), c.config.PeerRequestTimeout)
327+
effectiveTimeout := c.getEffectivePeerRequestTimeout()
328+
ctx, cancel := context.WithTimeout(context.Background(), effectiveTimeout)
300329
defer cancel()
301330

302331
resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{

pkg/peerhealth/client_server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var _ = Describe("Checking health using grpc client and server", func() {
4949
}
5050

5151
By("Creating server")
52-
phServer, err = NewServer(k8sClient, reader, ctrl.Log.WithName("peerhealth test").WithName("phServer"), 9000, certReader)
52+
phServer, err = NewServer(k8sClient, reader, ctrl.Log.WithName("peerhealth test").WithName("phServer"), 9000, certReader, 7*time.Second)
5353
Expect(err).ToNot(HaveOccurred())
5454

5555
By("Starting server")

pkg/peerhealth/server.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ import (
2121

2222
const (
2323
connectionTimeout = 5 * time.Second
24-
//IMPORTANT! this MUST be less than PeerRequestTimeout in apicheck
25-
//The difference between them should allow some time for sending the request over the network
26-
//todo enforce this
27-
apiServerTimeout = 3 * time.Second
2824
)
2925

3026
var (
@@ -42,21 +38,23 @@ var (
4238

4339
type Server struct {
4440
UnimplementedPeerHealthServer
45-
c client.Client
46-
reader client.Reader
47-
log logr.Logger
48-
certReader certificates.CertStorageReader
49-
port int
41+
c client.Client
42+
reader client.Reader
43+
log logr.Logger
44+
certReader certificates.CertStorageReader
45+
port int
46+
apiServerTimeout time.Duration
5047
}
5148

5249
// NewServer returns a new Server
53-
func NewServer(c client.Client, reader client.Reader, log logr.Logger, port int, certReader certificates.CertStorageReader) (*Server, error) {
50+
func NewServer(c client.Client, reader client.Reader, log logr.Logger, port int, certReader certificates.CertStorageReader, apiServerTimeout time.Duration) (*Server, error) {
5451
return &Server{
55-
c: c,
56-
reader: reader,
57-
log: log,
58-
certReader: certReader,
59-
port: port,
52+
c: c,
53+
reader: reader,
54+
log: log,
55+
certReader: certReader,
56+
port: port,
57+
apiServerTimeout: apiServerTimeout,
6058
}, nil
6159
}
6260

@@ -109,7 +107,7 @@ func (s *Server) IsHealthy(ctx context.Context, request *HealthRequest) (*Health
109107
return nil, fmt.Errorf("empty node name in HealthRequest")
110108
}
111109

112-
apiCtx, cancelFunc := context.WithTimeout(ctx, apiServerTimeout)
110+
apiCtx, cancelFunc := context.WithTimeout(ctx, s.apiServerTimeout)
113111
defer cancelFunc()
114112

115113
snrs := &v1alpha1.SelfNodeRemediationList{}
@@ -157,7 +155,7 @@ func (s *Server) listWithTimeoutHandling(apiCtx context.Context, snrs *v1alpha1.
157155
}
158156

159157
func (s *Server) getNode(ctx context.Context, nodeName string) (*corev1.Node, error) {
160-
apiCtx, cancelFunc := context.WithTimeout(ctx, apiServerTimeout)
158+
apiCtx, cancelFunc := context.WithTimeout(ctx, s.apiServerTimeout)
161159
defer cancelFunc()
162160

163161
node := &corev1.Node{}

0 commit comments

Comments
 (0)