Skip to content

Commit 96dfd8f

Browse files
Merge pull request #264 from mshitrit/fix-peertimeout-logic
Update peer timeout logic
2 parents c960b00 + d6d028a commit 96dfd8f

11 files changed

+224
-40
lines changed

api/v1alpha1/selfnoderemediationconfig_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ type SelfNodeRemediationConfigSpec struct {
9494

9595
// Timeout for each peer request.
9696
// Valid time units are "ms", "s", "m", "h".
97-
// +kubebuilder:default:="5s"
97+
// +kubebuilder:default:="7s"
9898
// +kubebuilder:validation:Pattern="^([0-9]+(\\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$"
9999
// +kubebuilder:validation:Type:=string
100100
// +optional

api/v1alpha1/selfnoderemediationconfig_webhook.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ const (
4949
minDurPeerRequestTimeout = 10 * time.Millisecond
5050
minDurApiCheckInterval = 1 * time.Second
5151
minDurPeerUpdateInterval = 10 * time.Second
52+
53+
// MinimumBuffer is the minimum buffer time between APIServerTimeout and PeerRequestTimeout
54+
// It is required to make sure there is enough time for network communication between the peers in case the API Server is out
55+
MinimumBuffer = 2 * time.Second
5256
)
5357

5458
type field struct {
@@ -74,7 +78,9 @@ var _ webhook.Validator = &SelfNodeRemediationConfig{}
7478
func (r *SelfNodeRemediationConfig) ValidateCreate() (warning admission.Warnings, err error) {
7579
selfNodeRemediationConfigLog.Info("validate create", "name", r.Name)
7680

77-
return admission.Warnings{}, errors.NewAggregate([]error{
81+
warnings := r.validatePeerTimeoutSafety()
82+
83+
return warnings, errors.NewAggregate([]error{
7884
r.validateTimes(),
7985
r.validateCustomTolerations(),
8086
r.validateSingleton(),
@@ -86,7 +92,9 @@ func (r *SelfNodeRemediationConfig) ValidateCreate() (warning admission.Warnings
8692
func (r *SelfNodeRemediationConfig) ValidateUpdate(_ runtime.Object) (warning admission.Warnings, err error) {
8793
selfNodeRemediationConfigLog.Info("validate update", "name", r.Name)
8894

89-
return admission.Warnings{}, errors.NewAggregate([]error{
95+
warnings := r.validatePeerTimeoutSafety()
96+
97+
return warnings, errors.NewAggregate([]error{
9098
r.validateTimes(),
9199
r.validateCustomTolerations(),
92100
})
@@ -184,6 +192,43 @@ func validateToleration(toleration v1.Toleration) error {
184192
return nil
185193
}
186194

195+
// validatePeerTimeoutSafety checks if PeerRequestTimeout is safe relative to ApiServerTimeout
196+
// and returns warnings if the configuration might be unsafe
197+
func (r *SelfNodeRemediationConfig) validatePeerTimeoutSafety() admission.Warnings {
198+
var warnings admission.Warnings
199+
200+
spec := r.Spec
201+
if spec.PeerRequestTimeout == nil || spec.ApiServerTimeout == nil {
202+
// Use defaults if not specified
203+
return warnings
204+
}
205+
206+
peerRequestTimeoutDuration := spec.PeerRequestTimeout.Duration
207+
apiServerTimeoutDuration := spec.ApiServerTimeout.Duration
208+
minimumSafePeerTimeout := apiServerTimeoutDuration + MinimumBuffer
209+
210+
if peerRequestTimeoutDuration < minimumSafePeerTimeout {
211+
warningMsg := fmt.Sprintf(
212+
"PeerRequestTimeout (%s) is less than ApiServerTimeout + MinimumBuffer (%s + %s = %s). "+
213+
"This configuration may lead to race conditions where peer health checks time out "+
214+
"before API server checks complete, potentially causing premature remediation. "+
215+
"Overriding PeerRequestTimeout to %s for safer operation.",
216+
peerRequestTimeoutDuration,
217+
apiServerTimeoutDuration,
218+
MinimumBuffer,
219+
minimumSafePeerTimeout,
220+
minimumSafePeerTimeout,
221+
)
222+
warnings = append(warnings, warningMsg)
223+
selfNodeRemediationConfigLog.Info("PeerRequestTimeout safety warning, overriding PeerRequestTimeout to minimumSafeTimeout",
224+
"peerRequestTimeout", peerRequestTimeoutDuration,
225+
"apiServerTimeout", apiServerTimeoutDuration,
226+
"minimumSafeTimeout", minimumSafePeerTimeout)
227+
}
228+
229+
return warnings
230+
}
231+
187232
func (r *SelfNodeRemediationConfig) validateSingleton() error {
188233
if r.Name != ConfigCRName {
189234
return fmt.Errorf("to enforce only one SelfNodeRemediationConfig in the cluster, a name other than %s is not allowed", ConfigCRName)

api/v1alpha1/selfnoderemediationconfig_webhook_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const (
1818
peerApiServerTimeoutDefault = 5 * time.Second
1919
apiServerTimeoutDefault = 5 * time.Second
2020
peerDialTimeoutDefault = 5 * time.Second
21-
peerRequestTimeoutDefault = 5 * time.Second
21+
peerRequestTimeoutDefault = 7 * time.Second
2222
apiCheckIntervalDefault = 15 * time.Second
2323
peerUpdateIntervalDefault = 15 * time.Minute
2424
)
@@ -139,6 +139,42 @@ var _ = Describe("SelfNodeRemediationConfig Validation", func() {
139139
})
140140
})
141141

142+
Describe("PeerRequestTimeout Safety Validation", func() {
143+
var snrc *SelfNodeRemediationConfig
144+
BeforeEach(func() {
145+
snrc = createTestSelfNodeRemediationConfigCR()
146+
snrc.Name = ConfigCRName
147+
_ = os.Setenv("DEPLOYMENT_NAMESPACE", snrc.Namespace)
148+
})
149+
It("should produce warning when PeerRequestTimeout is too low", func() {
150+
// Set ApiServerTimeout to 5s and PeerRequestTimeout to 6s (less than 5s + 2s buffer)
151+
snrc.Spec.ApiServerTimeout = &metav1.Duration{Duration: 5 * time.Second}
152+
snrc.Spec.PeerRequestTimeout = &metav1.Duration{Duration: 6 * time.Second}
153+
154+
warnings, err := snrc.ValidateCreate()
155+
Expect(err).To(BeNil())
156+
Expect(len(warnings)).To(Equal(1))
157+
Expect(warnings[0]).To(ContainSubstring("PeerRequestTimeout (6s) is less than ApiServerTimeout + MinimumBuffer"))
158+
})
159+
160+
It("should not produce warning when PeerRequestTimeout is safe", func() {
161+
// Set ApiServerTimeout to 5s and PeerRequestTimeout to 8s (greater than 5s + 2s buffer)
162+
snrc.Spec.ApiServerTimeout = &metav1.Duration{Duration: 5 * time.Second}
163+
snrc.Spec.PeerRequestTimeout = &metav1.Duration{Duration: 8 * time.Second}
164+
165+
warnings, err := snrc.ValidateCreate()
166+
Expect(err).To(BeNil())
167+
Expect(len(warnings)).To(Equal(0))
168+
})
169+
170+
It("should not produce warning when using default values", func() {
171+
// Use default values: ApiServerTimeout=5s, PeerRequestTimeout=7s (which is safe)
172+
173+
warnings, err := snrc.ValidateCreate()
174+
Expect(err).To(BeNil())
175+
Expect(len(warnings)).To(Equal(0))
176+
})
177+
})
142178
})
143179

144180
func testSingleInvalidField(validationType validationType) {

bundle/manifests/self-node-remediation.medik8s.io_selfnoderemediationconfigs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ spec:
153153
pattern: ^([0-9]+(\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$
154154
type: string
155155
peerRequestTimeout:
156-
default: 5s
156+
default: 7s
157157
description: |-
158158
Timeout for each peer request.
159159
Valid time units are "ms", "s", "m", "h".

config/crd/bases/self-node-remediation.medik8s.io_selfnoderemediationconfigs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ spec:
151151
pattern: ^([0-9]+(\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$
152152
type: string
153153
peerRequestTimeout:
154-
default: 5s
154+
default: 7s
155155
description: |-
156156
Timeout for each peer request.
157157
Valid time units are "ms", "s", "m", "h".

controllers/tests/config/selfnoderemediationconfig_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ var _ = Describe("SNR Config Test", func() {
233233
Expect(createdConfig.Spec.MaxApiErrorThreshold).To(Equal(3))
234234

235235
Expect(createdConfig.Spec.PeerApiServerTimeout.Seconds()).To(BeEquivalentTo(5))
236-
Expect(createdConfig.Spec.PeerRequestTimeout.Seconds()).To(BeEquivalentTo(5))
236+
Expect(createdConfig.Spec.PeerRequestTimeout.Seconds()).To(BeEquivalentTo(7))
237237
Expect(createdConfig.Spec.PeerDialTimeout.Seconds()).To(BeEquivalentTo(5))
238238
Expect(createdConfig.Spec.ApiServerTimeout.Seconds()).To(BeEquivalentTo(5))
239239
Expect(createdConfig.Spec.ApiCheckInterval.Seconds()).To(BeEquivalentTo(15))

main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
344344
PeerRequestTimeout: peerRequestTimeout,
345345
PeerHealthPort: peerHealthDefaultPort,
346346
MaxTimeForNoPeersResponse: reboot.MaxTimeForNoPeersResponse,
347+
Recorder: mgr.GetEventRecorderFor("ApiConnectivityCheck"),
347348
}
348349

349350
controlPlaneManager := controlplane.NewManager(myNodeName, mgr.GetClient())
@@ -377,7 +378,7 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
377378

378379
setupLog.Info("init grpc server")
379380
// TODO make port configurable?
380-
server, err := peerhealth.NewServer(mgr.GetClient(), mgr.GetAPIReader(), ctrl.Log.WithName("peerhealth").WithName("server"), peerHealthDefaultPort, certReader)
381+
server, err := peerhealth.NewServer(mgr.GetClient(), mgr.GetAPIReader(), ctrl.Log.WithName("peerhealth").WithName("server"), peerHealthDefaultPort, certReader, apiServerTimeout)
381382
if err != nil {
382383
setupLog.Error(err, "failed to init grpc server")
383384
os.Exit(1)

pkg/apicheck/check.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@ import (
88
"time"
99

1010
"github.com/go-logr/logr"
11+
"github.com/medik8s/common/pkg/events"
1112
"google.golang.org/grpc/credentials"
1213

1314
corev1 "k8s.io/api/core/v1"
1415
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1517
"k8s.io/apimachinery/pkg/util/wait"
1618
"k8s.io/client-go/rest"
19+
"k8s.io/client-go/tools/record"
1720
"sigs.k8s.io/controller-runtime/pkg/client"
1821

1922
selfNodeRemediation "github.com/medik8s/self-node-remediation/api"
23+
"github.com/medik8s/self-node-remediation/api/v1alpha1"
2024
"github.com/medik8s/self-node-remediation/pkg/certificates"
2125
"github.com/medik8s/self-node-remediation/pkg/controlplane"
2226
"github.com/medik8s/self-node-remediation/pkg/peerhealth"
@@ -25,6 +29,10 @@ import (
2529
"github.com/medik8s/self-node-remediation/pkg/utils"
2630
)
2731

32+
const (
33+
eventReasonPeerTimeoutAdjusted = "PeerTimeoutAdjusted"
34+
)
35+
2836
type ApiConnectivityCheck struct {
2937
client.Reader
3038
config *ApiConnectivityCheckConfig
@@ -51,6 +59,7 @@ type ApiConnectivityCheckConfig struct {
5159
PeerHealthPort int
5260
MaxTimeForNoPeersResponse time.Duration
5361
MinPeersForRemediation int
62+
Recorder record.EventRecorder
5463
}
5564

5665
func New(config *ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) *ApiConnectivityCheck {
@@ -275,6 +284,25 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP
275284
return c.sumPeersResponses(nrAddresses, responsesChan)
276285
}
277286

287+
// getEffectivePeerRequestTimeout calculates the effective peer request timeout
288+
// ensuring it's safe relative to the API server timeout by enforcing a minimum buffer
289+
func (c *ApiConnectivityCheck) getEffectivePeerRequestTimeout() time.Duration {
290+
minimumSafeTimeout := c.config.ApiServerTimeout + v1alpha1.MinimumBuffer
291+
292+
if c.config.PeerRequestTimeout < minimumSafeTimeout {
293+
// Log warning about timeout adjustment
294+
c.config.Log.Info("PeerRequestTimeout is too low, using adjusted value for safety",
295+
"configuredTimeout", c.config.PeerRequestTimeout,
296+
"apiServerTimeout", c.config.ApiServerTimeout,
297+
"minimumBuffer", v1alpha1.MinimumBuffer,
298+
"effectiveTimeout", minimumSafeTimeout)
299+
events.WarningEventf(c.config.Recorder, &v1alpha1.SelfNodeRemediationConfig{ObjectMeta: metav1.ObjectMeta{Name: v1alpha1.ConfigCRName}}, eventReasonPeerTimeoutAdjusted, "PeerRequestTimeout (%s) was too low compared to ApiServerTimeout (%s), using safe value (%s) instead", c.config.PeerRequestTimeout, c.config.ApiServerTimeout, minimumSafeTimeout)
300+
return minimumSafeTimeout
301+
}
302+
303+
return c.config.PeerRequestTimeout
304+
}
305+
278306
// getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel
279307
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) {
280308

@@ -296,7 +324,8 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP,
296324
}
297325
defer phClient.Close()
298326

299-
ctx, cancel := context.WithTimeout(context.Background(), c.config.PeerRequestTimeout)
327+
effectiveTimeout := c.getEffectivePeerRequestTimeout()
328+
ctx, cancel := context.WithTimeout(context.Background(), effectiveTimeout)
300329
defer cancel()
301330

302331
resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{

pkg/apicheck/check_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package apicheck
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/go-logr/logr"
8+
. "github.com/onsi/ginkgo/v2"
9+
. "github.com/onsi/gomega"
10+
11+
"k8s.io/client-go/tools/record"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
14+
"github.com/medik8s/self-node-remediation/api/v1alpha1"
15+
)
16+
17+
func TestApiCheck(t *testing.T) {
18+
RegisterFailHandler(Fail)
19+
RunSpecs(t, "ApiCheck Suite")
20+
}
21+
22+
var _ = Describe("ApiConnectivityCheck", func() {
23+
var (
24+
apiCheck *ApiConnectivityCheck
25+
config *ApiConnectivityCheckConfig
26+
fakeRecorder *record.FakeRecorder
27+
log logr.Logger
28+
)
29+
30+
BeforeEach(func() {
31+
log = ctrl.Log.WithName("test")
32+
fakeRecorder = record.NewFakeRecorder(10)
33+
34+
config = &ApiConnectivityCheckConfig{
35+
Log: log,
36+
MyNodeName: "test-node",
37+
ApiServerTimeout: 5 * time.Second,
38+
PeerRequestTimeout: 7 * time.Second,
39+
Recorder: fakeRecorder,
40+
}
41+
42+
apiCheck = &ApiConnectivityCheck{
43+
config: config,
44+
}
45+
})
46+
47+
Describe("getEffectivePeerRequestTimeout", func() {
48+
Context("when PeerRequestTimeout is safe", func() {
49+
It("should return the configured PeerRequestTimeout", func() {
50+
// ApiServerTimeout=5s, PeerRequestTimeout=7s, MinimumBuffer=2s
51+
// 7s >= (5s + 2s), so it's safe
52+
effectiveTimeout := apiCheck.getEffectivePeerRequestTimeout()
53+
54+
Expect(effectiveTimeout).To(Equal(7 * time.Second))
55+
56+
// Should not emit any events
57+
Expect(len(fakeRecorder.Events)).To(Equal(0))
58+
})
59+
})
60+
61+
Context("when PeerRequestTimeout is unsafe", func() {
62+
It("should return adjusted timeout and emit warning event", func() {
63+
config.PeerRequestTimeout = 6 * time.Second // Less than 5s + 2s = 7s
64+
65+
effectiveTimeout := apiCheck.getEffectivePeerRequestTimeout()
66+
67+
expectedMinimumTimeout := config.ApiServerTimeout + v1alpha1.MinimumBuffer // 7s
68+
Expect(effectiveTimeout).To(Equal(expectedMinimumTimeout))
69+
70+
// Should emit a warning event
71+
Expect(len(fakeRecorder.Events)).To(Equal(1))
72+
event := <-fakeRecorder.Events
73+
Expect(event).To(ContainSubstring("Warning"))
74+
Expect(event).To(ContainSubstring("PeerTimeoutAdjusted"))
75+
Expect(event).To(ContainSubstring("6s")) // configured timeout
76+
Expect(event).To(ContainSubstring("5s")) // API server timeout
77+
Expect(event).To(ContainSubstring("7s")) // safe timeout
78+
})
79+
80+
})
81+
82+
})
83+
})

pkg/peerhealth/client_server_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ var _ = Describe("Checking health using grpc client and server", func() {
2323
var phServer *Server
2424
var cancel context.CancelFunc
2525
var phClient *Client
26+
var apiServerTimeout = 5 * time.Second
2627

27-
BeforeEach(func() {
28+
JustBeforeEach(func() {
2829

2930
By("creating test node")
3031
node := &v1.Node{
@@ -49,7 +50,7 @@ var _ = Describe("Checking health using grpc client and server", func() {
4950
}
5051

5152
By("Creating server")
52-
phServer, err = NewServer(k8sClient, reader, ctrl.Log.WithName("peerhealth test").WithName("phServer"), 9000, certReader)
53+
phServer, err = NewServer(k8sClient, reader, ctrl.Log.WithName("peerhealth test").WithName("phServer"), 9000, certReader, apiServerTimeout)
5354
Expect(err).ToNot(HaveOccurred())
5455

5556
By("Starting server")
@@ -132,15 +133,18 @@ var _ = Describe("Checking health using grpc client and server", func() {
132133

133134
BeforeEach(func() {
134135
reader.delay = &apiCallDelay
136+
apiServerTimeout = 3 * time.Second
137+
135138
})
136139

137140
AfterEach(func() {
138141
reader.delay = nil
142+
apiServerTimeout = 5 * time.Second
139143
})
140144

141145
It("should return API error", func() {
142146
By("calling isHealthy")
143-
// The health server code has a hardcoded timeout of 3s for the API call!
147+
// The health server code has a timeout of 3s for the API call!
144148
// When we add a delay of > 3s to the API call, that 3s timeout needs to be respected,
145149
// to not exceed the peerRequestTimeout (5s) of the client.
146150
ctx, cancel := context.WithTimeout(context.Background(), peerRequestTimeout)

0 commit comments

Comments
 (0)