Skip to content

Commit 3314249

Browse files
author
Normalnoise
authored
Merge pull request #216 from swanchain/hotfix-check-resource-log
fix FCP fail to create the sampling task when K8s have multi-node
2 parents cae6f6e + 7bb0e08 commit 3314249

File tree

5 files changed

+73
-25
lines changed

5 files changed

+73
-25
lines changed

internal/computing/k8s_service.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -717,15 +717,17 @@ type GpuData struct {
717717
UsedIndex []string
718718
}
719719

720-
func (s *K8sService) GetNodeGpuSummary(ctx context.Context) (map[string]map[string]GpuData, error) {
720+
func (s *K8sService) GetNodeGpuSummary(ctx context.Context) (map[string]map[string]GpuData, map[string]string, error) {
721721
nodeGpuInfoMap, err := s.GetResourceExporterPodLog(ctx)
722722
if err != nil {
723723
logs.GetLogger().Errorf("Collect cluster gpu info Failed, if have available gpu, please check resource-exporter. error: %+v", err)
724-
return nil, err
724+
return nil, nil, err
725725
}
726726

727+
var nodeMachineId = make(map[string]string)
727728
var nodeGpuSummary = make(map[string]map[string]GpuData)
728729
for nodeName, gpu := range nodeGpuInfoMap {
730+
nodeMachineId[nodeName] = gpu.MachineId + "&" + gpu.ProductUuid
729731
if gpu.Gpu.AttachedGpus > 0 {
730732
var nodeGpu = make(map[string]GpuData)
731733
for _, g := range gpu.Gpu.Details {
@@ -756,7 +758,7 @@ func (s *K8sService) GetNodeGpuSummary(ctx context.Context) (map[string]map[stri
756758
nodeGpuSummary[nodeName] = nodeGpu
757759
}
758760
}
759-
return nodeGpuSummary, nil
761+
return nodeGpuSummary, nodeMachineId, nil
760762
}
761763

762764
func (s *K8sService) GetAllActivePod(ctx context.Context) ([]coreV1.Pod, error) {

internal/computing/resources_sumary.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ func GetNodeResource(allPods []corev1.Pod, node *corev1.Node) (map[string]GpuDat
6161
for _, pod := range getPodsFromNode(allPods, node) {
6262
usedCpu += cpuInPod(&pod)
6363
usedMem += memInPod(&pod)
64-
usedStorage += storageInPod(&pod)
64+
podUsedStorage := storageInPod(&pod)
65+
logs.GetLogger().Infof("podNmae: %s, storage: %d", pod.Name, podUsedStorage)
66+
67+
usedStorage += podUsedStorage
6568

6669
for _, g := range gpuInPod(&pod) {
6770
if g.Gname == "kubernetes.io/os" || g.Gname == "" {
@@ -104,6 +107,7 @@ func GetNodeResource(allPods []corev1.Pod, node *corev1.Node) (map[string]GpuDat
104107
nodeResource.Storage.RemainderNum = freeStorage
105108
remainderResource[ResourceStorage] = freeStorage
106109

110+
logs.GetLogger().Infof("nodeName: %s, remainderResource: %v", node.Name, remainderResource)
107111
return nodeGpu, remainderResource, nodeResource
108112
}
109113

@@ -162,9 +166,44 @@ func gpuInPod(pod *corev1.Pod) []models.PodGpu {
162166
Gindex: gIndex,
163167
})
164168
}
169+
170+
if len(podGpus) == 0 {
171+
gpuName, gpuCount, indexs := gpuInPodByNodeSelector(pod)
172+
podGpus = append(podGpus, models.PodGpu{
173+
Gname: gpuName,
174+
Guse: gpuCount,
175+
Gindex: indexs,
176+
})
177+
}
165178
return podGpus
166179
}
167180

181+
func gpuInPodByNodeSelector(pod *corev1.Pod) (gpuName string, gpuCount int, indexs []string) {
182+
containers := pod.Spec.Containers
183+
for _, container := range containers {
184+
var gIndex string
185+
for _, envVaule := range container.Env {
186+
if envVaule.Name == "NVIDIA_VISIBLE_DEVICES" {
187+
gIndex = envVaule.Value
188+
indexs = append(indexs, strings.Split(gIndex, ",")...)
189+
break
190+
}
191+
}
192+
if gIndex != "" {
193+
gpuCount += len(strings.Split(gIndex, ","))
194+
}
195+
}
196+
197+
if pod.Spec.NodeSelector != nil {
198+
for k := range pod.Spec.NodeSelector {
199+
if k != "" {
200+
gpuName = k
201+
}
202+
}
203+
}
204+
return gpuName, gpuCount, indexs
205+
}
206+
168207
func checkClusterProviderStatus(nodeResources []*models.NodeResource) {
169208
var policy models.ResourcePolicy
170209
cpPath, _ := os.LookupEnv("CP_PATH")

internal/computing/space_service.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func ReceiveJob(c *gin.Context) {
190190
}
191191

192192
if !available {
193+
NewJobService().UpdateJobEntityStatusByJobUuid(jobEntity.JobUuid, models.JOB_REJECTED_STATUS)
193194
logs.GetLogger().Warnf("job_uuid: %s, name: %s, msg: %s", jobData.UUID, jobData.Name, strings.Join(noAvailableMsgs, ";"))
194195
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.NoAvailableResourcesError, strings.Join(noAvailableMsgs, ";")))
195196
return
@@ -1566,7 +1567,7 @@ func checkResourceAvailableForSpace(jobUuid string, jobType int, resourceConfig
15661567
return false, "", nil, 0, nil, err
15671568
}
15681569

1569-
nodeGpuSummary, err := k8sService.GetNodeGpuSummary(context.TODO())
1570+
nodeGpuSummary, nodeNameMachineId, err := k8sService.GetNodeGpuSummary(context.TODO())
15701571
if err != nil {
15711572
logs.GetLogger().Errorf("Failed collect k8s gpu, error: %+v", err)
15721573
return false, "", nil, 0, nil, err
@@ -1577,16 +1578,17 @@ func checkResourceAvailableForSpace(jobUuid string, jobType int, resourceConfig
15771578
needCpu := hardwareDetail.Cpu.Quantity
15781579
needMemory := float64(hardwareDetail.Memory.Quantity)
15791580
needStorage := float64(hardwareDetail.Storage.Quantity)
1580-
logs.GetLogger().Infof("checkResourceForSpace: needCpu: %d, needMemory: %.2f, needStorage: %.2f, needGpu: %s, gpuNum: %d", needCpu, needMemory, needStorage, gpuName, gpuNum)
1581+
logs.GetLogger().Infof("job_uuid: %s, checkResourceForSpace: needCpu: %d, needMemory: %.2f, needStorage: %.2f, needGpu: %s, gpuNum: %d", jobUuid, needCpu, needMemory, needStorage, gpuName, gpuNum)
15811582

15821583
type gpuData struct {
15831584
Total int
15841585
Free int
15851586
FreeIndex []string
15861587
}
15871588

1588-
var noAvailableStr []string
1589+
var noAvailableStrMap = make(map[string][]string)
15891590
for _, node := range nodes.Items {
1591+
var noAvailableStr []string
15901592
var nodeName = node.Name
15911593
var nodeGpuInfo = nodeGpuSummary[nodeName]
15921594
nodeGpu, remainderResource, _ := GetNodeResource(activePods, &node)
@@ -1603,7 +1605,7 @@ func checkResourceAvailableForSpace(jobUuid string, jobType int, resourceConfig
16031605
}
16041606
}
16051607

1606-
logs.GetLogger().Infof("checkResourceForSpace: nodeName: %s,remainingCpu: %d, remainingMemory: %.2f, remainingStorage: %.2f, remainingGpu: %+v", node.Name, remainderCpu, remainderMemory, remainderStorage, freeGpuMap)
1608+
logs.GetLogger().Infof("nodeName: %s, machineId&productUuid: %s, remainingCpu: %d, remainingMemory: %.2f, remainingStorage: %.2f, remainingGpu: %+v", node.Name, nodeNameMachineId[node.Name], remainderCpu, remainderMemory, remainderStorage, freeGpuMap)
16071609

16081610
if remainderCpu < needCpu {
16091611
noAvailableStr = append(noAvailableStr, fmt.Sprintf("cpu need: %d, remainder: %d", needCpu, remainderCpu))
@@ -1619,6 +1621,7 @@ func checkResourceAvailableForSpace(jobUuid string, jobType int, resourceConfig
16191621
if len(noAvailableStr) == 0 {
16201622
return true, "", nil, 0, nil, nil
16211623
} else {
1624+
noAvailableStrMap[nodeName] = noAvailableStr
16221625
logs.GetLogger().Warnf("the job_uuid: %s is not available for this node=%s resource. Reason: %s",
16231626
jobUuid, node.Name, strings.Join(noAvailableStr, ";"))
16241627
}
@@ -1632,16 +1635,14 @@ func checkResourceAvailableForSpace(jobUuid string, jobType int, resourceConfig
16321635
if int64(len(remainingGpu)) < gpuNum {
16331636
noAvailableStr = append(noAvailableStr, fmt.Sprintf("gpu need name:%s, num:%d, remainder: %d", hardwareDetail.Gpu.Unit, hardwareDetail.Gpu.Quantity, len(remainingGpu)))
16341637
}
1635-
16361638
if len(noAvailableStr) == 0 {
16371639
return true, gpuName, remainingGpu, gpuNum, nil, nil
1638-
} else {
1639-
logs.GetLogger().Warnf("the job_uuid: %s is not available for this node=%s resource. Reason: %s",
1640-
jobUuid, node.Name, strings.Join(noAvailableStr, ";"))
16411640
}
16421641
}
16431642
}
1644-
continue
1643+
noAvailableStrMap[nodeName] = noAvailableStr
1644+
logs.GetLogger().Warnf("the job_uuid: %s is not available for this node=%s resource. Reason: %s",
1645+
jobUuid, node.Name, strings.Join(noAvailableStr, ";"))
16451646
}
16461647
}
16471648

@@ -1656,7 +1657,8 @@ func checkResourceAvailableForSpace(jobUuid string, jobType int, resourceConfig
16561657
noAvailableSummary = append(noAvailableSummary, "not found available node")
16571658
return false, "", nil, 0, noAvailableSummary, nil
16581659
} else {
1659-
return false, "", nil, 0, noAvailableStr, nil
1660+
nodeName := nodes.Items[0].Name
1661+
return false, "", nil, 0, noAvailableStrMap[nodeName], nil
16601662
}
16611663
}
16621664

@@ -1672,7 +1674,7 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
16721674
return false, "", nil, nil, nil, err
16731675
}
16741676

1675-
nodeGpuSummary, err := k8sService.GetNodeGpuSummary(context.TODO())
1677+
nodeGpuSummary, nodeNameMachineId, err := k8sService.GetNodeGpuSummary(context.TODO())
16761678
if err != nil {
16771679
logs.GetLogger().Errorf("Failed collect k8s gpu, error: %+v", err)
16781680
return false, "", nil, nil, nil, err
@@ -1696,8 +1698,9 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
16961698
FreeIndex []string
16971699
}
16981700

1699-
var noAvailableStr []string
1701+
var noAvailableStrMap = make(map[string][]string)
17001702
for _, node := range nodes.Items {
1703+
var noAvailableStr []string
17011704
var nodeName = node.Name
17021705
var nodeGpuInfo = nodeGpuSummary[nodeName]
17031706
nodeGpu, remainderResource, _ := GetNodeResource(activePods, &node)
@@ -1714,7 +1717,7 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
17141717
}
17151718
}
17161719

1717-
logs.GetLogger().Infof("checkResourceForSpace: nodeName: %s,remainingCpu: %d, remainingMemory: %.2f, remainingStorage: %.2f, remainingGpu: %+v", node.Name, remainderCpu, remainderMemory, remainderStorage, freeGpuMap)
1720+
logs.GetLogger().Infof("nodeName: %s, machineId&productUuid: %s, remainingCpu: %d, remainingMemory: %.2f, remainingStorage: %.2f, remainingGpu: %+v", node.Name, nodeNameMachineId[node.Name], remainderCpu, remainderMemory, remainderStorage, freeGpuMap)
17181721

17191722
if remainderCpu < needCpu {
17201723
noAvailableStr = append(noAvailableStr, fmt.Sprintf("cpu need: %d, remainder: %d", needCpu, remainderCpu))
@@ -1730,6 +1733,7 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
17301733
if len(noAvailableStr) == 0 {
17311734
return true, nodeName, nil, nil, nil, nil
17321735
} else {
1736+
noAvailableStrMap[nodeName] = noAvailableStr
17331737
logs.GetLogger().Warnf("the job_uuid: %s is not available for this node=%s resource. Reason: %s",
17341738
jobUuid, node.Name, strings.Join(noAvailableStr, ";"))
17351739
}
@@ -1778,6 +1782,7 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
17781782
} else {
17791783
if gpuNoAvailableStr != nil {
17801784
noAvailableStr = append(gpuNoAvailableStr, gpuNoAvailableStr...)
1785+
noAvailableStrMap[node.Name] = noAvailableStr
17811786
}
17821787
}
17831788
}
@@ -1794,7 +1799,8 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
17941799
noAvailableSummary = append(noAvailableSummary, "not found available node")
17951800
return false, "", nil, nil, noAvailableSummary, nil
17961801
} else {
1797-
return false, "", nil, nil, noAvailableStr, nil
1802+
nodeName := nodes.Items[0].Name
1803+
return false, "", nil, nil, noAvailableStrMap[nodeName], nil
17981804
}
17991805
}
18001806

@@ -1810,7 +1816,7 @@ func checkResourceAvailableForUbi(taskId, taskType int, gpuName string, resource
18101816
return "", "", 0, 0, 0, nil, nil, err
18111817
}
18121818

1813-
nodeGpuSummary, err := k8sService.GetNodeGpuSummary(context.TODO())
1819+
nodeGpuSummary, nodeNameMachineId, err := k8sService.GetNodeGpuSummary(context.TODO())
18141820
if err != nil {
18151821
logs.GetLogger().Errorf("Failed collect k8s gpu, error: %+v", err)
18161822
return "", "", 0, 0, 0, nil, nil, err
@@ -1842,7 +1848,7 @@ func checkResourceAvailableForUbi(taskId, taskType int, gpuName string, resource
18421848
remainderStorage := float64(remainderResource[ResourceStorage] / 1024 / 1024 / 1024)
18431849

18441850
logs.GetLogger().Infof("checkResourceAvailableForUbi: needCpu: %d, needMemory: %.2f, needStorage: %.2f", needCpu, needMemory, needStorage)
1845-
logs.GetLogger().Infof("checkResourceAvailableForUbi: remainingCpu: %d, remainingMemory: %.2f, remainingStorage: %.2f", remainderCpu, remainderMemory, remainderStorage)
1851+
logs.GetLogger().Infof("nodeName: %s, machineId&productUuid: %s, remainingCpu: %d, remainingMemory: %.2f, remainingStorage: %.2f", node.Name, nodeNameMachineId[node.Name], remainderCpu, remainderMemory, remainderStorage)
18461852

18471853
if remainderCpu < needCpu {
18481854
noAvailableStr = append(noAvailableStr, fmt.Sprintf("cpu need: %d, remainder: %d", needCpu, remainderCpu))

internal/models/resources.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ type NodeResource struct {
2323
}
2424

2525
type CollectNodeInfo struct {
26-
Gpu Gpu `json:"gpu"`
27-
CpuName string `json:"cpu_name"`
26+
Gpu Gpu `json:"gpu"`
27+
CpuName string `json:"cpu_name"`
28+
ProductUuid string `json:"product_uuid"`
29+
MachineId string `json:"machine_id"`
2830
}
2931
type Gpu struct {
3032
DriverVersion string `json:"driver_version"`

test/sequencer_service_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package test
22

33
import (
4-
"github.com/swanchain/go-computing-provider/internal/computing"
54
"testing"
65
)
76

87
func TestQueryTask(t *testing.T) {
9-
taskIds := []int64{35913, 35915, 35921}
10-
computing.NewSequencer().QueryTask(0, taskIds...)
8+
//taskIds := []int64{35913, 35915, 35921}
9+
//computing.NewSequencer().QueryTask(0, taskIds...)
1110
}

0 commit comments

Comments
 (0)