Skip to content

Commit 28e72ff

Browse files
author
Normalnoise
authored
Merge pull request #218 from swanchain/fix-fcp-zk-task
Add zk_task API
2 parents 08e2dab + 9d09690 commit 28e72ff

File tree

4 files changed

+817
-16
lines changed

4 files changed

+817
-16
lines changed

cmd/computing-provider/run.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func cpManager(router *gin.RouterGroup) {
9191

9292
router.POST("/cp/ubi", computing.DoUbiTaskForK8s)
9393
router.POST("/cp/receive/ubi", computing.ReceiveUbiProof)
94+
router.POST("/cp/zk_task", computing.DoZkTaskForK8s)
9495

9596
}
9697

internal/computing/space_service.go

Lines changed: 294 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/swanchain/go-computing-provider/internal/contract/account"
1010
"github.com/swanchain/go-computing-provider/internal/yaml"
1111
"io"
12+
coreV1 "k8s.io/api/core/v1"
13+
"k8s.io/apimachinery/pkg/api/resource"
1214
"math/rand"
1315
"net/http"
1416
"os"
@@ -968,7 +970,7 @@ func DeployImage(c *gin.Context) {
968970
}
969971
}
970972

971-
available, nodeName, gpuIndex, prepareGpu, noAvailableMsgs, err := checkResourceAvailableForImage(deployJob.Uuid, hardwareType, deployJob.Resource)
973+
_, available, nodeName, gpuIndex, prepareGpu, noAvailableMsgs, err := checkResourceAvailableForImage(deployJob.Uuid, hardwareType, deployJob.Resource)
972974
if err != nil {
973975
NewJobService().UpdateJobEntityStatusByJobUuid(jobEntity.JobUuid, models.JOB_FAILED_STATUS)
974976
logs.GetLogger().Errorf("failed to check job resource, error: %+v", err)
@@ -1000,6 +1002,280 @@ func DeployImage(c *gin.Context) {
10001002

10011003
}
10021004

1005+
func doMiningTaskForK8s(c *gin.Context, zkTask models.ZkTaskReq, taskEntity *models.TaskEntity) {
1006+
if strings.TrimSpace(zkTask.Uuid) == "" {
1007+
c.JSON(http.StatusBadRequest, util.CreateErrorResponse(util.UbiTaskParamError, "missing required field: [uuid]"))
1008+
return
1009+
}
1010+
if strings.TrimSpace(zkTask.Name) == "" {
1011+
c.JSON(http.StatusBadRequest, util.CreateErrorResponse(util.UbiTaskParamError, "missing required field: [name]"))
1012+
return
1013+
}
1014+
if zkTask.Resource == nil {
1015+
c.JSON(http.StatusBadRequest, util.CreateErrorResponse(util.UbiTaskParamError, "missing required field: [resource]"))
1016+
return
1017+
}
1018+
if zkTask.Image == "" {
1019+
c.JSON(http.StatusBadRequest, util.CreateErrorResponse(util.UbiTaskParamError, "missing required field: [image]"))
1020+
}
1021+
1022+
var hardwareType = "CPU"
1023+
if taskEntity.ResourceType == models.RESOURCE_TYPE_GPU {
1024+
hardwareType = "GPU"
1025+
}
1026+
1027+
var k8sResource models.K8sResourceForImage
1028+
k8sResource.Memory = formatGiB(zkTask.Resource.Memory)
1029+
k8sResource.Cpu = zkTask.Resource.CPU
1030+
k8sResource.Storage = formatGiB(zkTask.Resource.Storage)
1031+
var reqGpus []models.ReqGpu
1032+
for _, g := range zkTask.Resource.Gpus {
1033+
reqGpus = append(reqGpus, models.ReqGpu{
1034+
GpuModel: g.GPUModel,
1035+
GPU: g.GPU,
1036+
})
1037+
}
1038+
k8sResource.Gpus = reqGpus
1039+
1040+
_, available, nodeName, gpuIndex, prepareGpu, noAvailableMsgs, err := checkResourceAvailableForImage(zkTask.Uuid, hardwareType, k8sResource)
1041+
if err != nil {
1042+
taskEntity.Status = models.TASK_FAILED_STATUS
1043+
NewTaskService().SaveTaskEntity(taskEntity)
1044+
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.CheckResourcesError))
1045+
return
1046+
}
1047+
if !available {
1048+
taskEntity.Status = models.TASK_REJECTED_STATUS
1049+
NewTaskService().SaveTaskEntity(taskEntity)
1050+
logs.GetLogger().Warnf("job_uuid: %s, name: %s, msg: %s", zkTask.Uuid, zkTask.Name, strings.Join(noAvailableMsgs, ";"))
1051+
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.NoAvailableResourcesError, strings.Join(noAvailableMsgs, ";")))
1052+
return
1053+
}
1054+
1055+
needMemory := k8sResource.Memory
1056+
needStorage := k8sResource.Storage
1057+
memQuantity, err := resource.ParseQuantity(fmt.Sprintf("%.fGi", needMemory))
1058+
if err != nil {
1059+
taskEntity.Status = models.TASK_FAILED_STATUS
1060+
NewTaskService().SaveTaskEntity(taskEntity)
1061+
logs.GetLogger().Error("get memory failed, error: %+v", err)
1062+
return
1063+
}
1064+
1065+
storageQuantity, err := resource.ParseQuantity(fmt.Sprintf("%.fGi", needStorage))
1066+
if err != nil {
1067+
taskEntity.Status = models.TASK_FAILED_STATUS
1068+
NewTaskService().SaveTaskEntity(taskEntity)
1069+
logs.GetLogger().Error("get storage failed, error: %+v", err)
1070+
return
1071+
}
1072+
1073+
maxMemQuantity, err := resource.ParseQuantity(fmt.Sprintf("%.fGi", needMemory*2))
1074+
if err != nil {
1075+
taskEntity.Status = models.TASK_FAILED_STATUS
1076+
NewTaskService().SaveTaskEntity(taskEntity)
1077+
logs.GetLogger().Error("get memory failed, error: %+v", err)
1078+
return
1079+
}
1080+
1081+
maxStorageQuantity, err := resource.ParseQuantity(fmt.Sprintf("%.fGi", needStorage*2))
1082+
if err != nil {
1083+
taskEntity.Status = models.TASK_FAILED_STATUS
1084+
NewTaskService().SaveTaskEntity(taskEntity)
1085+
logs.GetLogger().Error("get storage failed, error: %+v", err)
1086+
return
1087+
}
1088+
1089+
var total int
1090+
for _, g := range zkTask.Resource.Gpus {
1091+
total += g.GPU
1092+
}
1093+
1094+
gpuResourceQuantity := resource.MustParse(fmt.Sprintf("%d", total))
1095+
resourceRequirements := coreV1.ResourceRequirements{
1096+
Limits: coreV1.ResourceList{
1097+
coreV1.ResourceCPU: *resource.NewQuantity(zkTask.Resource.CPU*2, resource.DecimalSI),
1098+
coreV1.ResourceMemory: maxMemQuantity,
1099+
coreV1.ResourceEphemeralStorage: maxStorageQuantity,
1100+
"nvidia.com/gpu": gpuResourceQuantity,
1101+
},
1102+
Requests: coreV1.ResourceList{
1103+
coreV1.ResourceCPU: *resource.NewQuantity(zkTask.Resource.CPU, resource.DecimalSI),
1104+
coreV1.ResourceMemory: memQuantity,
1105+
coreV1.ResourceEphemeralStorage: storageQuantity,
1106+
"nvidia.com/gpu": gpuResourceQuantity,
1107+
},
1108+
}
1109+
1110+
go func() {
1111+
var namespace = "ubi-task-" + taskEntity.Uuid
1112+
var err error
1113+
defer func() {
1114+
if err := recover(); err != nil {
1115+
logs.GetLogger().Errorf("do zk mining task painc, error: %+v", err)
1116+
return
1117+
}
1118+
1119+
ubiTaskRun, err := NewTaskService().GetTaskEntity(taskEntity.Id)
1120+
if err != nil {
1121+
logs.GetLogger().Errorf("get ubi task detail from db failed, ubiTaskId: %d, error: %+v", taskEntity.Id, err)
1122+
return
1123+
}
1124+
ubiTaskRun.Status = models.TASK_FAILED_STATUS
1125+
k8sService := NewK8sService()
1126+
k8sService.k8sClient.CoreV1().Namespaces().Delete(context.TODO(), namespace, metaV1.DeleteOptions{})
1127+
err = NewTaskService().SaveTaskEntity(ubiTaskRun)
1128+
}()
1129+
1130+
k8sService := NewK8sService()
1131+
if _, err = k8sService.GetNameSpace(context.TODO(), namespace, metaV1.GetOptions{}); err != nil {
1132+
if errors.IsNotFound(err) {
1133+
k8sNamespace := &v1.Namespace{
1134+
ObjectMeta: metaV1.ObjectMeta{
1135+
Name: namespace,
1136+
},
1137+
}
1138+
_, err = k8sService.CreateNameSpace(context.TODO(), k8sNamespace, metaV1.CreateOptions{})
1139+
if err != nil {
1140+
logs.GetLogger().Errorf("create namespace failed, error: %v", err)
1141+
return
1142+
}
1143+
}
1144+
}
1145+
1146+
JobName := strings.ToLower(models.UbiTaskTypeStr(zkTask.TaskType)) + "-" + zkTask.Uuid
1147+
1148+
var defaultEnv []coreV1.EnvVar
1149+
for k, v := range zkTask.Envs {
1150+
defaultEnv = append(defaultEnv, coreV1.EnvVar{
1151+
Name: k,
1152+
Value: v,
1153+
})
1154+
}
1155+
1156+
if len(gpuIndex) > 0 {
1157+
defaultEnv = append(defaultEnv, coreV1.EnvVar{
1158+
Name: "NVIDIA_VISIBLE_DEVICES",
1159+
Value: strings.Join(gpuIndex, ","),
1160+
})
1161+
}
1162+
1163+
job := &batchv1.Job{
1164+
ObjectMeta: metaV1.ObjectMeta{
1165+
Name: JobName,
1166+
Namespace: namespace,
1167+
Annotations: generateGpuAnnotation(prepareGpu),
1168+
},
1169+
Spec: batchv1.JobSpec{
1170+
Template: v1.PodTemplateSpec{
1171+
Spec: v1.PodSpec{
1172+
NodeName: nodeName,
1173+
NodeSelector: map[string]string{
1174+
"kubernetes.io/hostname": nodeName,
1175+
},
1176+
Containers: []v1.Container{
1177+
{
1178+
Name: JobName + generateString(5),
1179+
Image: zkTask.Image,
1180+
Env: defaultEnv,
1181+
Resources: resourceRequirements,
1182+
ImagePullPolicy: coreV1.PullIfNotPresent,
1183+
},
1184+
},
1185+
RestartPolicy: v1.RestartPolicyNever,
1186+
},
1187+
},
1188+
BackoffLimit: new(int32),
1189+
TTLSecondsAfterFinished: new(int32),
1190+
},
1191+
}
1192+
1193+
*job.Spec.BackoffLimit = 1
1194+
*job.Spec.TTLSecondsAfterFinished = 300
1195+
1196+
if _, err = k8sService.k8sClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, metaV1.CreateOptions{}); err != nil {
1197+
logs.GetLogger().Errorf("Failed creating ubi task job: %v", err)
1198+
return
1199+
}
1200+
1201+
err = wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
1202+
pods, err := k8sService.k8sClient.CoreV1().Pods(namespace).List(context.TODO(), metaV1.ListOptions{
1203+
LabelSelector: fmt.Sprintf("job-name=%s", JobName),
1204+
})
1205+
if err != nil {
1206+
logs.GetLogger().Errorf("failed get pod, taskId: %d, error: %v,retrying", zkTask.Id, err)
1207+
return false, err
1208+
}
1209+
1210+
if len(pods.Items) == 0 {
1211+
return false, nil
1212+
}
1213+
1214+
for _, p := range pods.Items {
1215+
for _, condition := range p.Status.Conditions {
1216+
if condition.Type != coreV1.PodReady && condition.Status != coreV1.ConditionTrue {
1217+
return false, nil
1218+
}
1219+
}
1220+
}
1221+
return true, nil
1222+
})
1223+
if err != nil {
1224+
logs.GetLogger().Errorf("Failed waiting pods create: %v", err)
1225+
return
1226+
}
1227+
1228+
pods, err := k8sService.k8sClient.CoreV1().Pods(namespace).List(context.TODO(), metaV1.ListOptions{
1229+
LabelSelector: fmt.Sprintf("job-name=%s", JobName),
1230+
})
1231+
if err != nil {
1232+
logs.GetLogger().Errorf("Failed list ubi pods: %v", err)
1233+
return
1234+
}
1235+
1236+
var podName string
1237+
for _, pod := range pods.Items {
1238+
podName = pod.Name
1239+
break
1240+
}
1241+
if podName == "" {
1242+
logs.GetLogger().Errorf("failed get pod name, taskId: %d", zkTask.Id)
1243+
return
1244+
}
1245+
logs.GetLogger().Infof("successfully get pod name, taskId: %d, podName: %s", zkTask.Id, podName)
1246+
1247+
NewTaskService().UpdateTaskStatusById(zkTask.Id, models.TASK_RUNNING_STATUS)
1248+
req := k8sService.k8sClient.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{
1249+
Container: "",
1250+
Follow: true,
1251+
})
1252+
1253+
podLogs, err := req.Stream(context.Background())
1254+
if err != nil {
1255+
logs.GetLogger().Errorf("Error opening log stream: %v", err)
1256+
return
1257+
}
1258+
defer podLogs.Close()
1259+
1260+
cpRepoPath, _ := os.LookupEnv("CP_PATH")
1261+
ubiLogFileName := filepath.Join(cpRepoPath, "ubi-fcp.log")
1262+
logFile, err := os.OpenFile(ubiLogFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
1263+
if err != nil {
1264+
logs.GetLogger().Errorf("opening ubi-fcp log file failed, error: %v", err)
1265+
return
1266+
}
1267+
defer logFile.Close()
1268+
1269+
if _, err = io.Copy(logFile, podLogs); err != nil {
1270+
logs.GetLogger().Errorf("write ubi-fcp log to file failed, error: %v", err)
1271+
return
1272+
}
1273+
}()
1274+
1275+
c.JSON(http.StatusOK, util.CreateSuccessResponse("success"))
1276+
return
1277+
}
1278+
10031279
func handlePodEvent(conn *websocket.Conn, jobUuid string, walletAddress string) {
10041280
client := NewWsClient(conn)
10051281

@@ -1662,22 +1938,22 @@ func checkResourceAvailableForSpace(jobUuid string, jobType int, resourceConfig
16621938
}
16631939
}
16641940

1665-
func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourceConfig models.K8sResourceForImage) (bool, string, []string, []models.PodGpu, []string, error) {
1941+
func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourceConfig models.K8sResourceForImage) (string, bool, string, []string, []models.PodGpu, []string, error) {
16661942
k8sService := NewK8sService()
16671943
activePods, err := k8sService.GetAllActivePod(context.TODO())
16681944
if err != nil {
1669-
return false, "", nil, nil, nil, err
1945+
return "", false, "", nil, nil, nil, err
16701946
}
16711947

16721948
nodes, err := k8sService.k8sClient.CoreV1().Nodes().List(context.TODO(), metaV1.ListOptions{})
16731949
if err != nil {
1674-
return false, "", nil, nil, nil, err
1950+
return "", false, "", nil, nil, nil, err
16751951
}
16761952

16771953
nodeGpuSummary, nodeNameMachineId, err := k8sService.GetNodeGpuSummary(context.TODO())
16781954
if err != nil {
16791955
logs.GetLogger().Errorf("Failed collect k8s gpu, error: %+v", err)
1680-
return false, "", nil, nil, nil, err
1956+
return "", false, "", nil, nil, nil, err
16811957
}
16821958

16831959
var reqGpuMap = make(map[string]int)
@@ -1699,9 +1975,17 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
16991975
}
17001976

17011977
var noAvailableStrMap = make(map[string][]string)
1978+
var nodeName, architecture string
17021979
for _, node := range nodes.Items {
1980+
if _, ok := node.Labels[constants.CPU_INTEL]; ok {
1981+
architecture = constants.CPU_INTEL
1982+
}
1983+
if _, ok := node.Labels[constants.CPU_AMD]; ok {
1984+
architecture = constants.CPU_AMD
1985+
}
1986+
17031987
var noAvailableStr []string
1704-
var nodeName = node.Name
1988+
nodeName = node.Name
17051989
var nodeGpuInfo = nodeGpuSummary[nodeName]
17061990
nodeGpu, remainderResource, _ := GetNodeResource(activePods, &node)
17071991
remainderCpu := remainderResource[ResourceCpu]
@@ -1731,7 +2015,7 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
17312015

17322016
if hardwareType == "CPU" {
17332017
if len(noAvailableStr) == 0 {
1734-
return true, nodeName, nil, nil, nil, nil
2018+
return architecture, true, nodeName, nil, nil, nil, nil
17352019
} else {
17362020
noAvailableStrMap[nodeName] = noAvailableStr
17372021
logs.GetLogger().Warnf("the job_uuid: %s is not available for this node=%s resource. Reason: %s",
@@ -1778,7 +2062,7 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
17782062
}
17792063

17802064
if flag {
1781-
return true, nodeName, newGpuIndex, prepare, nil, nil
2065+
return architecture, true, nodeName, newGpuIndex, prepare, nil, nil
17822066
} else {
17832067
if gpuNoAvailableStr != nil {
17842068
noAvailableStr = append(gpuNoAvailableStr, gpuNoAvailableStr...)
@@ -1797,10 +2081,10 @@ func checkResourceAvailableForImage(jobUuid string, hardwareType string, resourc
17972081
noAvailableSummary = append(noAvailableSummary, fmt.Sprintf("needGpu: %v", reqGpuMap))
17982082
}
17992083
noAvailableSummary = append(noAvailableSummary, "not found available node")
1800-
return false, "", nil, nil, noAvailableSummary, nil
2084+
return architecture, false, "", nil, nil, noAvailableSummary, nil
18012085
} else {
18022086
nodeName := nodes.Items[0].Name
1803-
return false, "", nil, nil, noAvailableStrMap[nodeName], nil
2087+
return architecture, false, "", nil, nil, noAvailableStrMap[nodeName], nil
18042088
}
18052089
}
18062090

0 commit comments

Comments
 (0)