Skip to content

Commit d50cb70

Browse files
feat: implement stale lock detection and enhance node lock management
1 parent 4521121 commit d50cb70

File tree

4 files changed

+184
-9
lines changed

4 files changed

+184
-9
lines changed

pkg/device/nvidia/device.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"strconv"
2424
"strings"
25+
"time"
2526

2627
corev1 "k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/api/resource"
@@ -189,10 +190,39 @@ func (dev *NvidiaGPUDevices) LockNode(n *corev1.Node, p *corev1.Pod) error {
189190
if !found {
190191
return nil
191192
}
193+
194+
if stale, err := dev.checkStaleLock(n); err == nil && stale {
195+
klog.Warningf("Detected stale lock on node %s, forcing release", n.Name)
196+
_ = nodelock.ReleaseNodeLock(n.Name, NodeLockNvidia, p, true)
197+
}
198+
192199
return nodelock.LockNode(n.Name, NodeLockNvidia, p)
193200
}
194201

202+
func (dev *NvidiaGPUDevices) checkStaleLock(n *corev1.Node) (bool, error) {
203+
lockStr := n.Annotations[nodelock.NodeLockKey]
204+
if lockStr == "" {
205+
return false, nil
206+
}
207+
208+
parts := strings.Split(lockStr, "|")
209+
if len(parts) < 4 {
210+
return true, nil // stale lock
211+
}
212+
213+
createTime, err := time.Parse(time.RFC3339Nano, parts[0])
214+
if err != nil {
215+
return true, nil
216+
}
217+
218+
return time.Since(createTime) > nodelock.LockTTL, nil
219+
}
220+
195221
func (dev *NvidiaGPUDevices) ReleaseNodeLock(n *corev1.Node, p *corev1.Pod) error {
222+
defer func() {
223+
klog.Errorf("Final fallback release for node %s", n.Name)
224+
_ = nodelock.ReleaseNodeLock(n.Name, NodeLockNvidia, p, true)
225+
}()
196226
found := false
197227
for _, val := range p.Spec.Containers {
198228
if (dev.GenerateResourceRequests(&val).Nums) > 0 {

pkg/device/nvidia/device_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package nvidia
1919
import (
2020
"errors"
2121
"testing"
22+
"time"
2223

2324
"gotest.tools/v3/assert"
2425
corev1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/api/resource"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728

2829
"github.com/Project-HAMi/HAMi/pkg/util"
30+
"github.com/Project-HAMi/HAMi/pkg/util/nodelock"
2931
)
3032

3133
func Test_DefaultResourceNum(t *testing.T) {
@@ -614,3 +616,84 @@ func Test_GetNodeDevices(t *testing.T) {
614616
})
615617
}
616618
}
619+
func Test_checkStaleLock(t *testing.T) {
620+
tests := []struct {
621+
name string
622+
node corev1.Node
623+
wantStale bool
624+
wantErr bool
625+
}{
626+
{
627+
name: "no lock annotation",
628+
node: corev1.Node{
629+
ObjectMeta: metav1.ObjectMeta{
630+
Annotations: map[string]string{},
631+
},
632+
},
633+
wantStale: false,
634+
wantErr: false,
635+
},
636+
{
637+
name: "stale lock with insufficient parts",
638+
node: corev1.Node{
639+
ObjectMeta: metav1.ObjectMeta{
640+
Annotations: map[string]string{
641+
nodelock.NodeLockKey: "2023-10-10T10:10:10Z|part2|part3",
642+
},
643+
},
644+
},
645+
wantStale: true,
646+
wantErr: false,
647+
},
648+
{
649+
name: "stale lock with invalid time format",
650+
node: corev1.Node{
651+
ObjectMeta: metav1.ObjectMeta{
652+
Annotations: map[string]string{
653+
nodelock.NodeLockKey: "invalid-time|part2|part3|part4",
654+
},
655+
},
656+
},
657+
wantStale: true,
658+
wantErr: false,
659+
},
660+
{
661+
name: "stale lock with expired time",
662+
node: corev1.Node{
663+
ObjectMeta: metav1.ObjectMeta{
664+
Annotations: map[string]string{
665+
nodelock.NodeLockKey: time.Now().Add(-2*nodelock.LockTTL).Format(time.RFC3339Nano) + "|part2|part3|part4",
666+
},
667+
},
668+
},
669+
wantStale: true,
670+
wantErr: false,
671+
},
672+
{
673+
name: "valid lock",
674+
node: corev1.Node{
675+
ObjectMeta: metav1.ObjectMeta{
676+
Annotations: map[string]string{
677+
nodelock.NodeLockKey: time.Now().Format(time.RFC3339Nano) + "|part2|part3|part4",
678+
},
679+
},
680+
},
681+
wantStale: false,
682+
wantErr: false,
683+
},
684+
}
685+
686+
for _, test := range tests {
687+
t.Run(test.name, func(t *testing.T) {
688+
gpuDevices := &NvidiaGPUDevices{}
689+
gotStale, err := gpuDevices.checkStaleLock(&test.node)
690+
if (err != nil) != test.wantErr {
691+
t.Errorf("checkStaleLock() error = %v, wantErr %v", err, test.wantErr)
692+
return
693+
}
694+
if gotStale != test.wantStale {
695+
t.Errorf("checkStaleLock() gotStale = %v, want %v", gotStale, test.wantStale)
696+
}
697+
})
698+
}
699+
}

pkg/util/nodelock/nodelock.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,25 @@ package nodelock
1919
import (
2020
"context"
2121
"fmt"
22+
"math"
2223
"strings"
2324
"sync"
2425
"time"
2526

26-
"github.com/Project-HAMi/HAMi/pkg/util/client"
27-
2827
corev1 "k8s.io/api/core/v1"
28+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/klog/v2"
31+
32+
"github.com/Project-HAMi/HAMi/pkg/util/client"
3133
)
3234

3335
const (
34-
NodeLockKey = "hami.io/mutex.lock"
35-
MaxLockRetry = 5
36-
NodeLockSep = ","
36+
NodeLockKey = "hami.io/mutex.lock"
37+
MaxLockRetry = 8
38+
NodeLockSep = ","
39+
LockTTL = 1 * time.Minute
40+
RetryBaseInterval = 50 * time.Millisecond
3741
)
3842

3943
var lock sync.Mutex
@@ -53,9 +57,14 @@ func SetNodeLock(nodeName string, lockname string, pods *corev1.Pod) error {
5357
newNode.ObjectMeta.Annotations[NodeLockKey] = GenerateNodeLockKeyByPod(pods)
5458
_, err = client.GetClient().CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
5559
for i := 0; i < MaxLockRetry && err != nil; i++ {
56-
klog.ErrorS(err, "Failed to update node", "node", nodeName, "retry", i)
57-
time.Sleep(100 * time.Millisecond)
60+
sleepDuration := time.Duration(math.Pow(2, float64(i))) * RetryBaseInterval
61+
time.Sleep(sleepDuration)
62+
5863
node, err = client.GetClient().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
64+
if apierrors.IsConflict(err) {
65+
klog.Warningf("Version conflict on node %s, retrying...", nodeName)
66+
continue
67+
}
5968
if err != nil {
6069
klog.ErrorS(err, "Failed to get node when retry to update", "node", nodeName)
6170
continue
@@ -152,6 +161,9 @@ func GenerateNodeLockKeyByPod(pods *corev1.Pod) string {
152161
if pods == nil {
153162
return time.Now().Format(time.RFC3339)
154163
}
155-
ns, name := pods.Namespace, pods.Name
156-
return fmt.Sprintf("%s%s%s%s%s", time.Now().Format(time.RFC3339), NodeLockSep, ns, NodeLockSep, name)
164+
return fmt.Sprintf("%s|%s|%s|%s",
165+
time.Now().Format(time.RFC3339Nano), // use nano second as lock time
166+
string(pods.UID), // use pod UID as lock namespace
167+
pods.Namespace,
168+
pods.Name)
157169
}

pkg/util/nodelock/nodelock_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package nodelock
1818

1919
import (
2020
"context"
21+
"strings"
2122
"testing"
23+
"time"
2224

2325
corev1 "k8s.io/api/core/v1"
2426
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -235,3 +237,51 @@ func TestReleaseNodeLock(t *testing.T) {
235237
})
236238
}
237239
}
240+
func TestGenerateNodeLockKeyByPod(t *testing.T) {
241+
tests := []struct {
242+
name string
243+
pods *corev1.Pod
244+
}{
245+
{
246+
name: "nil pod",
247+
pods: nil,
248+
},
249+
{
250+
name: "valid pod",
251+
pods: &corev1.Pod{
252+
ObjectMeta: metav1.ObjectMeta{
253+
UID: "12345",
254+
Name: "hami",
255+
Namespace: "hami-ns",
256+
},
257+
},
258+
},
259+
}
260+
for _, tt := range tests {
261+
t.Run(tt.name, func(t *testing.T) {
262+
got := GenerateNodeLockKeyByPod(tt.pods)
263+
if tt.pods == nil {
264+
if _, err := time.Parse(time.RFC3339, got); err != nil {
265+
t.Errorf("GenerateNodeLockKeyByPod() = %v, want valid RFC3339 time", got)
266+
}
267+
} else {
268+
parts := strings.Split(got, "|")
269+
if len(parts) != 4 {
270+
t.Errorf("GenerateNodeLockKeyByPod() = %v, want 4 parts", got)
271+
}
272+
if _, err := time.Parse(time.RFC3339Nano, parts[0]); err != nil {
273+
t.Errorf("GenerateNodeLockKeyByPod() = %v, want valid RFC3339Nano time", got)
274+
}
275+
if parts[1] != string(tt.pods.UID) {
276+
t.Errorf("GenerateNodeLockKeyByPod() = %v, want UID %v", got, tt.pods.UID)
277+
}
278+
if parts[2] != tt.pods.Namespace {
279+
t.Errorf("GenerateNodeLockKeyByPod() = %v, want Namespace %v", got, tt.pods.Namespace)
280+
}
281+
if parts[3] != tt.pods.Name {
282+
t.Errorf("GenerateNodeLockKeyByPod() = %v, want Name %v", got, tt.pods.Name)
283+
}
284+
}
285+
})
286+
}
287+
}

0 commit comments

Comments
 (0)