Skip to content

Commit 5bb5c83

Browse files
Merge pull request #235 from swanchain/feature/expose-metrics-endpoints
Expose metrics endpoints
2 parents 859f196 + 51f18d6 commit 5bb5c83

File tree

8 files changed

+212
-34
lines changed

8 files changed

+212
-34
lines changed

build/version.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ const UBITaskImageIntelCpu = "swanhub/ubi-worker-cpu-intel:latest"
1616
const UBITaskImageIntelGpu = "swanhub/ubi-worker-gpu-intel:latest"
1717
const UBITaskImageAmdCpu = "swanhub/ubi-worker-cpu-amd:latest"
1818
const UBITaskImageAmdGpu = "swanhub/ubi-worker-gpu-amd:latest"
19-
const UBIResourceExporterDockerImage = "swanhub/resource-exporter:v12.0.0"
19+
const UBIResourceExporterDockerImage = "swanhub/resource-exporter:v13.0.0"
2020
const TraefikServerDockerImage = "traefik:v2.10"
2121

22-
const ResourceExporterVersion = "v12.0.0"
22+
const ResourceExporterVersion = "v13.0.0"
2323

2424
func UserVersion() string {
2525
return BuildVersion + "+" + NetWorkTag + CurrentCommit

cmd/computing-provider/run.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,18 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"os"
7+
"path/filepath"
8+
"regexp"
9+
"strconv"
10+
"strings"
11+
"time"
12+
613
"github.com/ethereum/go-ethereum/common"
714
"github.com/filswan/go-mcs-sdk/mcs/api/common/logs"
815
"github.com/gin-contrib/pprof"
916
"github.com/gin-gonic/gin"
10-
"github.com/itsjamie/gin-cors"
17+
cors "github.com/itsjamie/gin-cors"
1118
"github.com/olekukonko/tablewriter"
1219
"github.com/swanchain/go-computing-provider/build"
1320
"github.com/swanchain/go-computing-provider/conf"
@@ -21,12 +28,6 @@ import (
2128
"github.com/swanchain/go-computing-provider/util"
2229
"github.com/swanchain/go-computing-provider/wallet"
2330
"github.com/urfave/cli/v2"
24-
"os"
25-
"path/filepath"
26-
"regexp"
27-
"strconv"
28-
"strings"
29-
"time"
3031
)
3132

3233
var runCmd = &cli.Command{
@@ -76,6 +77,7 @@ var runCmd = &cli.Command{
7677
func cpManager(router *gin.RouterGroup) {
7778
router.GET("/cp", computing.StatisticalSources)
7879
router.GET("/host/info", computing.GetServiceProviderInfo)
80+
router.GET("/cp/metrics", computing.GetResourceExporterMetrics)
7981
router.POST("/lagrange/jobs", computing.ReceiveJob)
8082
router.DELETE("/lagrange/jobs", computing.CancelJob)
8183
router.POST("/lagrange/jobs/renew", computing.ReNewJob)

cmd/computing-provider/ubi.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ package main
33
import (
44
_ "embed"
55
"fmt"
6+
"os"
7+
"path/filepath"
8+
"strconv"
9+
"time"
10+
611
"github.com/filswan/go-mcs-sdk/mcs/api/common/logs"
712
"github.com/gin-contrib/pprof"
813
"github.com/gin-gonic/gin"
@@ -14,10 +19,6 @@ import (
1419
"github.com/swanchain/go-computing-provider/internal/models"
1520
"github.com/swanchain/go-computing-provider/util"
1621
"github.com/urfave/cli/v2"
17-
"os"
18-
"path/filepath"
19-
"strconv"
20-
"time"
2122
)
2223

2324
var ubiTaskCmd = &cli.Command{
@@ -219,6 +220,7 @@ var daemonCmd = &cli.Command{
219220

220221
router := r.Group("/api/v1/computing")
221222
router.GET("/cp", computing.GetCpResource)
223+
router.GET("/cp/metrics", computing.GetUbiResourceExporterMetrics)
222224
router.POST("/cp/ubi", computing.DoUbiTaskForDocker)
223225
router.POST("/cp/docker/receive/ubi", computing.ReceiveUbiProof)
224226

internal/computing/deploy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ func (d *Deploy) ModelInferenceToK8s() error {
527527
func (d *Deploy) configureSshAccess(k8sService *K8sService, podName string) error {
528528

529529
sshkeyCmd := []string{"sh", "-c", fmt.Sprintf("echo '%s' > /root/.ssh/authorized_keys", d.sshKey)}
530-
if err := k8sService.PodDoCommand(d.k8sNameSpace, podName, "", sshkeyCmd); err != nil {
530+
if _, _, err := k8sService.PodDoCommand(d.k8sNameSpace, podName, "", sshkeyCmd); err != nil {
531531
return fmt.Errorf("failed to add sshkey, job_uuid: %s error: %v", d.jobUuid, err)
532532
}
533533

internal/computing/docker_service.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ import (
99
"encoding/json"
1010
"errors"
1111
"fmt"
12+
"io"
13+
"os"
14+
"path/filepath"
15+
"regexp"
16+
"strings"
17+
"time"
18+
1219
"github.com/containerd/containerd/namespaces"
1320
"github.com/docker/docker/api/types/container"
1421
"github.com/docker/docker/api/types/filters"
@@ -18,12 +25,6 @@ import (
1825
"github.com/filswan/go-mcs-sdk/mcs/api/common/logs"
1926
"github.com/swanchain/go-computing-provider/build"
2027
"github.com/swanchain/go-computing-provider/constants"
21-
"io"
22-
"os"
23-
"path/filepath"
24-
"regexp"
25-
"strings"
26-
"time"
2728

2829
"github.com/swanchain/go-computing-provider/conf"
2930

@@ -396,6 +397,41 @@ func (ds *DockerService) ContainerLogs(containerName string) (string, error) {
396397
}
397398
}
398399

400+
func (ds *DockerService) ContainerExec(containerID string, cmd []string) (string, error) {
401+
ctx := context.Background()
402+
execConfig := container.ExecOptions{
403+
AttachStdout: true,
404+
AttachStderr: true,
405+
Cmd: cmd,
406+
}
407+
execIDResponse, err := ds.c.ContainerExecCreate(ctx, containerID, execConfig)
408+
if err != nil {
409+
return "", fmt.Errorf("failed to create exec command: %w", err)
410+
}
411+
412+
if execIDResponse.ID == "" {
413+
return "", fmt.Errorf("exec ID is empty")
414+
}
415+
416+
resp, err := ds.c.ContainerExecAttach(ctx, execIDResponse.ID, container.ExecAttachOptions{})
417+
if err != nil {
418+
return "", fmt.Errorf("failed to attach to exec command: %w", err)
419+
}
420+
defer resp.Close()
421+
422+
var outBuf, errBuf bytes.Buffer
423+
_, err = io.Copy(&outBuf, resp.Reader)
424+
if err != nil {
425+
return "", fmt.Errorf("failed to read exec output: %w", err)
426+
}
427+
428+
if errBuf.Len() > 0 {
429+
return "", fmt.Errorf("exec command stderr: %s", errBuf.String())
430+
}
431+
432+
return outBuf.String(), nil
433+
}
434+
399435
func (ds *DockerService) GetContainerLogStream(ctx context.Context, containerName string) (io.ReadCloser, error) {
400436
return ds.c.ContainerLogs(ctx, containerName, container.LogsOptions{
401437
ShowStdout: true,

internal/computing/k8s_service.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package computing
22

33
import "C"
44
import (
5+
"bytes"
56
"context"
67
"encoding/json"
78
"errors"
@@ -556,6 +557,7 @@ func (s *K8sService) GetResourceExporterVersion() (string, error) {
556557
}
557558
return version, nil
558559
}
560+
559561
func (s *K8sService) GetPodLogByPodName(namespace, podName string, podLogOptions *coreV1.PodLogOptions) (string, error) {
560562
req := s.k8sClient.CoreV1().Pods(namespace).GetLogs(podName, podLogOptions)
561563
buf, err := readLog(req)
@@ -675,8 +677,8 @@ func (s *K8sService) WaitForPodRunningByTcp(namespace, jobUuid string, labelSele
675677
return podName, nil
676678
}
677679

678-
func (s *K8sService) PodDoCommand(namespace, podName, containerName string, podCmd []string) error {
679-
reader, writer := io.Pipe()
680+
func (s *K8sService) PodDoCommand(namespace, podName, containerName string, podCmd []string) (string, string, error) {
681+
var stdout, stderr bytes.Buffer
680682
req := s.k8sClient.CoreV1().RESTClient().
681683
Post().
682684
Resource("pods").
@@ -686,28 +688,30 @@ func (s *K8sService) PodDoCommand(namespace, podName, containerName string, podC
686688
VersionedParams(&coreV1.PodExecOptions{
687689
Container: containerName,
688690
Command: podCmd,
689-
Stdin: true,
691+
Stdin: false,
690692
Stdout: true,
691693
Stderr: true,
692-
TTY: true,
694+
TTY: false,
693695
}, scheme.ParameterCodec)
694696

695697
executor, err := remotecommand.NewSPDYExecutor(s.config, "POST", req.URL())
696698
if err != nil {
697-
return fmt.Errorf("failed to create spdy client: %w", err)
699+
logs.GetLogger().Errorf("Failed to create spdy client for pod %s: %v", podName, err)
700+
return "", "", fmt.Errorf("failed to create spdy client: %w", err)
698701
}
699702

700-
err = executor.Stream(remotecommand.StreamOptions{
701-
Stdin: reader,
702-
Stdout: writer,
703-
Stderr: writer,
704-
Tty: true,
703+
err = executor.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
704+
Stdin: nil,
705+
Stdout: &stdout,
706+
Stderr: &stderr,
707+
Tty: false,
705708
})
706709
if err != nil {
707-
return fmt.Errorf("failed to create stream: %w", err)
710+
logs.GetLogger().Errorf("Failed to create stream for pod %s: %v", podName, err)
711+
return "", "", fmt.Errorf("failed to create stream: %w", err)
708712
}
709713

710-
return nil
714+
return stdout.String(), stderr.String(), nil
711715
}
712716

713717
type GpuData struct {

internal/computing/space_service.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package computing
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
67
"encoding/base64"
78
"encoding/json"
@@ -295,6 +296,75 @@ func ReceiveJob(c *gin.Context) {
295296
c.JSON(http.StatusOK, util.CreateSuccessResponse(jobData))
296297
}
297298

299+
func GetResourceExporterMetrics(c *gin.Context) {
300+
logs.GetLogger().Info("Starting GetResourceExporterMetrics function.")
301+
k8sService := NewK8sService()
302+
if k8sService == nil {
303+
logs.GetLogger().Info("Failed to create k8s service client.")
304+
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.ServerError, "failed to create k8s service client"))
305+
return
306+
}
307+
logs.GetLogger().Info("Successfully created k8s service client.")
308+
309+
logs.GetLogger().Info("Attempting to list resource-exporter pods in kube-system namespace.")
310+
podList, err := k8sService.k8sClient.CoreV1().Pods("kube-system").List(context.Background(), metaV1.ListOptions{
311+
LabelSelector: "app=resource-exporter",
312+
})
313+
if err != nil {
314+
logs.GetLogger().Errorf("Failed to list resource-exporter pods, error: %v", err)
315+
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.ServerError, fmt.Sprintf("failed to list resource-exporter pods: %v", err)))
316+
return
317+
}
318+
logs.GetLogger().Infof("Successfully listed %d resource-exporter pods.", len(podList.Items))
319+
320+
if len(podList.Items) == 0 {
321+
logs.GetLogger().Info("No resource-exporter pods found.")
322+
c.JSON(http.StatusNotFound, util.CreateErrorResponse(util.ServerError, "resource-exporter pod not found"))
323+
return
324+
}
325+
logs.GetLogger().Info("Resource-exporter pod found.")
326+
327+
// Assuming we only need metrics from one resource-exporter pod, pick the first one
328+
resourceExporterPodName := podList.Items[0].Name
329+
if resourceExporterPodName == "" {
330+
logs.GetLogger().Info("Resource-exporter pod name is empty.")
331+
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.ServerError, "resource-exporter pod name not found"))
332+
return
333+
}
334+
logs.GetLogger().Infof("Resource-exporter pod name: %s", resourceExporterPodName)
335+
336+
var mergedMetrics bytes.Buffer
337+
338+
// Get /node/metrics using PodDoCommand
339+
logs.GetLogger().Infof("Attempting to get /node/metrics from resource-exporter pod %s.", resourceExporterPodName)
340+
// wget -q -O - localhost:9000/node/metrics
341+
nodeMetricsCmd := []string{"wget", "-q", "-O", "-", "localhost:9000/node/metrics"}
342+
nodeMetricsStdout, _, err := k8sService.PodDoCommand("kube-system", resourceExporterPodName, "", nodeMetricsCmd)
343+
if err != nil {
344+
logs.GetLogger().Errorf("Failed to get /node/metrics from resource-exporter pod %s: %v", resourceExporterPodName, err)
345+
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.ServerError, fmt.Sprintf("failed to get node metrics: %v", err)))
346+
return
347+
}
348+
logs.GetLogger().Info("Successfully retrieved /node/metrics.")
349+
mergedMetrics.WriteString(nodeMetricsStdout)
350+
mergedMetrics.WriteString("\n")
351+
352+
// Get /dcgm/metrics using PodDoCommand
353+
logs.GetLogger().Infof("Attempting to get /dcgm/metrics from resource-exporter pod %s.", resourceExporterPodName)
354+
dcgmMetricsCmd := []string{"wget", "-q", "-O", "-", "localhost:9000/dcgm/metrics"}
355+
dcgmMetricsStdout, _, err := k8sService.PodDoCommand("kube-system", resourceExporterPodName, "", dcgmMetricsCmd)
356+
if err != nil {
357+
logs.GetLogger().Errorf("Failed to get /dcgm/metrics from resource-exporter pod %s: %v", resourceExporterPodName, err)
358+
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.ServerError, fmt.Sprintf("failed to get dcgm metrics: %v", err)))
359+
return
360+
}
361+
logs.GetLogger().Info("Successfully retrieved /dcgm/metrics.")
362+
mergedMetrics.WriteString(dcgmMetricsStdout)
363+
364+
logs.GetLogger().Info("Returning merged resource-exporter metrics.")
365+
c.String(http.StatusOK, mergedMetrics.String())
366+
}
367+
298368
func submitJob(jobData *models.JobData) error {
299369
cpRepoPath, ok := os.LookupEnv("CP_PATH")
300370
if !ok {
@@ -1775,7 +1845,7 @@ func downloadModelUrl(namespace, jobUuid, serviceIp string, podCmd []string) {
17751845
return
17761846
}
17771847

1778-
if err = k8sService.PodDoCommand(namespace, podName, "", podCmd); err != nil {
1848+
if _, _, err = k8sService.PodDoCommand(namespace, podName, "", podCmd); err != nil {
17791849
logs.GetLogger().Error(err)
17801850
return
17811851
}

0 commit comments

Comments
 (0)