Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions pkg/amdgpu/gpuagent/gpuagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,17 @@ func (ga *GPUAgentClient) Init() error {
ga.Lock()
defer ga.Unlock()
ga.initializeContext()
err := ga.initclients()
if err != nil {
logger.Log.Printf("gpu client init failure err :%v", err)
return err

// Initialize scheduler first - it is file-based and independent of gRPC.
// This ensures Slurm job data is available even when the GPU agent gRPC
// service is temporarily unavailable (e.g. during reconnect cycles).
if err := ga.initalizeScheduler(); err != nil {
logger.Log.Printf("gpu client scheduler init failure err :%v", err)
}

err = ga.initalizeScheduler()
err := ga.initclients()
if err != nil {
logger.Log.Printf("gpu client scheduler init failure err :%v", err)
logger.Log.Printf("gpu client init failure err :%v", err)
return err
}

Expand Down Expand Up @@ -272,8 +274,28 @@ func (ga *GPUAgentClient) initializeContext() {
}

func (ga *GPUAgentClient) reconnect() error {
// Preserve Slurm scheduler across reconnects since it is file-based
// and independent of the gRPC connection to the GPU agent service.
ga.Lock()
savedSlurm := ga.slurmScheduler
ga.slurmScheduler = nil // prevent Close() from closing it
ga.Unlock()

ga.Close()
return ga.Init()
err := ga.Init()

// If Init() did not create a new Slurm scheduler (e.g. because
// initalizeScheduler failed), restore the saved one so that
// accumulated workload data is not lost.
ga.Lock()
if ga.slurmScheduler == nil && savedSlurm != nil {
ga.slurmScheduler = savedSlurm
} else if savedSlurm != nil {
// Init() created a new scheduler, close the old one
savedSlurm.Close()
}
ga.Unlock()
return err
}

func (ga *GPUAgentClient) isActive() bool {
Expand Down
95 changes: 95 additions & 0 deletions pkg/amdgpu/gpuagent/gpuagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,3 +439,98 @@ func TestMetricFieldMappingMI2xxEmptyBusyInst(t *testing.T) {
t.Logf("✓ All MI2xx BusyInst empty fields correctly marked as unsupported")
t.Logf("✓ Validated correct behavior for platforms without BusyInst support")
}

// TestSlurmLabelsInPrometheusMetrics verifies that when a Slurm scheduler is
// configured with job workloads, the Prometheus labels produced by
// populateLabelsFromGPU contain the Slurm job_id, job_user, and job_partition.
func TestSlurmLabelsInPrometheusMetrics(t *testing.T) {
teardownSuite := setupTest(t)
defer teardownSuite(t)

ga := getNewAgent(t)
defer ga.Close()

// Set up Slurm scheduler BEFORE metrics collection so that the
// workload lookup can match GPU indices to Slurm job data.
ga.slurmScheduler = newSlurmMockClient()

err := ga.InitConfigs()
assert.Assert(t, err == nil, "expecting success config init")

// Build workload map the same way getMetricsAll does
wls, err := ga.ListWorkloads()
assert.Assert(t, err == nil, "expecting success workload list")
assert.Assert(t, len(wls) == 2, "expecting 2 Slurm workloads")

// Create a GPU with Index=0 which should match Slurm workload key "0"
gpu := &amdgpu.GPU{
Spec: &amdgpu.GPUSpec{
Id: []byte(uuid.New().String()),
},
Status: &amdgpu.GPUStatus{
Index: 0,
SerialNum: "test-serial",
CardModel: "test-model",
PCIeStatus: &amdgpu.GPUPCIeStatus{
PCIeBusId: "0000:01:00.0",
},
},
Stats: &amdgpu.GPUStats{
PackagePower: 100,
},
}

// Call populateLabelsFromGPU and check Slurm labels are populated
for _, client := range ga.clients {
if client.GetDeviceType() == globals.GPUDevice {
gpuclient := client.(*GPUAgentGPUClient)
labels := gpuclient.populateLabelsFromGPU(wls, gpu, nil)

t.Logf("labels: %+v", labels)

// Verify Slurm job labels are populated (not empty)
jobID := labels["job_id"]
jobUser := labels["job_user"]
jobPartition := labels["job_partition"]

assert.Assert(t, jobID == "SLURM_JOB_ID0",
"expected job_id=SLURM_JOB_ID0, got %q", jobID)
assert.Assert(t, jobUser == "SLURM_JOB_USER0",
"expected job_user=SLURM_JOB_USER0, got %q", jobUser)
assert.Assert(t, jobPartition == "SLURM_JOB_PARTITION0",
"expected job_partition=SLURM_JOB_PARTITION0, got %q", jobPartition)

t.Logf("✓ Slurm labels correctly populated in Prometheus labels")
break
}
}
}

// TestSlurmSchedulerSurvivesReconnect verifies that the Slurm scheduler
// is preserved across gRPC reconnect cycles. This tests the fix for the
// issue where reconnect() would destroy the Slurm scheduler and
// initclients() failure would prevent it from being recreated.
func TestSlurmSchedulerSurvivesReconnect(t *testing.T) {
teardownSuite := setupTest(t)
defer teardownSuite(t)

ga := getNewAgent(t)

// Set up Slurm scheduler
ga.slurmScheduler = newSlurmMockClient()

// Verify workloads are available
wls, err := ga.ListWorkloads()
assert.Assert(t, err == nil, "expecting success workload list before reconnect")
assert.Assert(t, len(wls) == 2, "expecting 2 Slurm workloads before reconnect")

// Simulate reconnect - this should preserve the Slurm scheduler
_ = ga.reconnect()

// After reconnect, Slurm scheduler should still be available
// (either the saved one was restored or a new one was created)
assert.Assert(t, ga.slurmScheduler != nil,
"Slurm scheduler should not be nil after reconnect")

ga.Close()
}