Skip to content

Commit 96b56dd

Browse files
authored
fix: unit test bug, init/reconcile gpu store order issue (#231)
* fix: unit test bug, init/reconcile gpu store order issue * fix: mock db won't require close() function
1 parent 4c58287 commit 96b56dd

File tree

2 files changed

+40
-31
lines changed

2 files changed

+40
-31
lines changed

internal/alert/evaluator_test.go

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
)
1717

1818
// Mock data for testing
19-
func setupMockDB(t *testing.T) (*sql.DB, sqlmock.Sqlmock, *metrics.TimeSeriesDB) {
19+
func setupMockDB(t *testing.T) (sqlmock.Sqlmock, *metrics.TimeSeriesDB) {
2020
db, mock, err := sqlmock.New()
2121
require.NoError(t, err)
2222

@@ -27,7 +27,7 @@ func setupMockDB(t *testing.T) (*sql.DB, sqlmock.Sqlmock, *metrics.TimeSeriesDB)
2727
require.NoError(t, err)
2828

2929
tsdb := &metrics.TimeSeriesDB{DB: gormDB}
30-
return db, mock, tsdb
30+
return mock, tsdb
3131
}
3232

3333
func createTestRule(name string) *Rule {
@@ -48,12 +48,7 @@ func createTestRule(name string) *Rule {
4848
}
4949

5050
func TestAlertEvaluator(t *testing.T) {
51-
db, mock, tsdb := setupMockDB(t)
52-
defer func() {
53-
if err := db.Close(); err != nil {
54-
t.Error("failed to close db", err)
55-
}
56-
}()
51+
mock, tsdb := setupMockDB(t)
5752

5853
evaluator := newAlertEvaluator(tsdb)
5954

@@ -155,13 +150,7 @@ func TestAlertEvaluator(t *testing.T) {
155150
})
156151

157152
t.Run("process query results empty results", func(t *testing.T) {
158-
db, mock, tsdb := setupMockDB(t)
159-
defer func() {
160-
if err := db.Close(); err != nil {
161-
t.Error("failed to close db", err)
162-
}
163-
}()
164-
153+
mock, tsdb := setupMockDB(t)
165154
rule := createTestRule("test-rule")
166155
evaluator := newAlertEvaluator(tsdb)
167156

@@ -218,20 +207,14 @@ func TestAlertEvaluator(t *testing.T) {
218207

219208
// Test edge cases
220209
t.Run("evaluate database error", func(t *testing.T) {
221-
db, mock, tsdb := setupMockDB(t)
222-
defer func() {
223-
if err := db.Close(); err != nil {
224-
t.Error("failed to close db", err)
225-
}
226-
}()
227-
210+
mock, tsdb := setupMockDB(t)
228211
rule := createTestRule("test-rule")
229212

230213
evaluator := newAlertEvaluator(tsdb)
231214

232215
// Mock database error
233216
mock.ExpectQuery("SELECT value, instance, job FROM metrics").
234-
WillReturnError(sql.ErrConnDone)
217+
WillReturnError(sql.ErrNoRows)
235218

236219
alerts, err := evaluator.evaluate(rule)
237220
assert.Error(t, err)

internal/gpuallocator/gpuallocator.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,12 +327,13 @@ func (s *GpuAllocator) SetupWithManager(ctx context.Context, mgr manager.Manager
327327
log.Error(err, "Failed to initialize GPU store")
328328
return err
329329
}
330-
readyCh <- struct{}{}
330+
close(readyCh)
331331
return nil
332332
}))
333333

334334
go func() {
335335
<-mgr.Elected()
336+
<-readyCh
336337
// reconcile allocation state based on existing workers, run only when it's elected as leader
337338
// and only if it's leader, it will start allocating resources to workers, and start sync loop here
338339
s.reconcileAllocationState(ctx)
@@ -435,12 +436,37 @@ func (s *GpuAllocator) syncToK8s(ctx context.Context) {
435436
}
436437

437438
for nodeName := range dirtyNodes {
438-
// Refer https://datatracker.ietf.org/doc/html/rfc6901#section-3 encode `/` as `~1`
439-
patch := []byte(`[{
440-
"op": "add",
441-
"path": "/metadata/annotations/` + strings.ReplaceAll(constants.GPULastReportTimeAnnotationKey, "/", "~1") + `",
442-
"value": "` + time.Now().Format(time.RFC3339) + `"
443-
}]`)
439+
// First, get the current node to check if annotations exist
440+
node := &tfv1.GPUNode{}
441+
nodeKey := client.ObjectKey{Name: nodeName}
442+
if err := s.Get(ctx, nodeKey, node); err != nil {
443+
log.Error(err, "Failed to get GPU node for updating last report time", "node", nodeName)
444+
continue
445+
}
446+
447+
var patch []byte
448+
timeValue := time.Now().Format(time.RFC3339)
449+
encodedKey := strings.ReplaceAll(constants.GPULastReportTimeAnnotationKey, "/", "~1")
450+
451+
// Check if annotations already exist
452+
if node.Annotations == nil {
453+
// Create annotations if they don't exist
454+
patch = []byte(`[{
455+
"op": "add",
456+
"path": "/metadata/annotations",
457+
"value": {
458+
"` + constants.GPULastReportTimeAnnotationKey + `": "` + timeValue + `"
459+
}
460+
}]`)
461+
} else {
462+
// Add to existing annotations
463+
patch = []byte(`[{
464+
"op": "add",
465+
"path": "/metadata/annotations/` + encodedKey + `",
466+
"value": "` + timeValue + `"
467+
}]`)
468+
}
469+
444470
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
445471
return s.Patch(ctx, &tfv1.GPUNode{
446472
ObjectMeta: metav1.ObjectMeta{
@@ -449,7 +475,7 @@ func (s *GpuAllocator) syncToK8s(ctx context.Context) {
449475
}, client.RawPatch(types.JSONPatchType, patch))
450476
})
451477
if err != nil {
452-
log.Error(err, "Failed to update GPU node last report time, will retry later", "node", nodeName)
478+
log.Error(err, "Failed to update GPU node last report time, allocation state may be inconsistent", "node", nodeName)
453479
}
454480
}
455481
}

0 commit comments

Comments
 (0)