From 8d0980723a834ecb5e5da29b6133856d003fe83c Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 31 Dec 2025 09:04:26 +0000 Subject: [PATCH] refactor: use clusterRes instead of clusterResp --- .mockery.yaml | 1 + .tool-versions | 7 +- .../builder/deploy/imagerunner/mock_Runner.go | 28 +-- .../runner/component/mock_ClusterComponent.go | 162 ++++++++++++++++++ builder/deploy/deploy_ce_test.go | 20 +-- builder/deploy/deployer.go | 49 ++++-- builder/deploy/deployer_ce.go | 8 +- builder/deploy/deployer_test.go | 18 +- builder/deploy/imagerunner/local_runner.go | 4 +- builder/deploy/imagerunner/remote_runner.go | 20 +-- .../deploy/imagerunner/remote_runner_test.go | 4 +- builder/deploy/imagerunner/runner.go | 4 +- common/types/cluster.go | 17 +- runner/handler/cluster.go | 6 +- runner/handler/cluster_test.go | 90 ++++++++++ 15 files changed, 348 insertions(+), 90 deletions(-) create mode 100644 _mocks/opencsg.com/csghub-server/runner/component/mock_ClusterComponent.go create mode 100644 runner/handler/cluster_test.go diff --git a/.mockery.yaml b/.mockery.yaml index 45e21a71c..7db4ad999 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -289,6 +289,7 @@ packages: ImagebuilderComponent: WorkFlowComponent: ServiceComponent: + ClusterComponent: opencsg.com/csghub-server/logcollector/component: config: all: true \ No newline at end of file diff --git a/.tool-versions b/.tool-versions index bd2bbb89d..9c24a0d45 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,5 @@ -golang 1.24.6 -mockery 2.53.0 \ No newline at end of file +golang 1.25.5 +mockery 2.53.5 +minikube 1.34.0 +kubectl 1.28.3 +argo 3.6.10 diff --git a/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner/mock_Runner.go b/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner/mock_Runner.go index c34f75c0b..e2d7d5de8 100644 --- a/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner/mock_Runner.go +++ b/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner/mock_Runner.go @@ -240,23 +240,23 @@ func (_c *MockRunner_Exist_Call) RunAndReturn(run func(context.Context, *types.C } // GetClusterById provides a mock function with given fields: ctx, clusterId -func (_m *MockRunner) GetClusterById(ctx context.Context, clusterId string) (*types.ClusterResponse, error) { +func (_m *MockRunner) GetClusterById(ctx context.Context, clusterId string) (*types.ClusterRes, error) { ret := _m.Called(ctx, clusterId) if len(ret) == 0 { panic("no return value specified for GetClusterById") } - var r0 *types.ClusterResponse + var r0 *types.ClusterRes var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (*types.ClusterResponse, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, string) (*types.ClusterRes, error)); ok { return rf(ctx, clusterId) } - if rf, ok := ret.Get(0).(func(context.Context, string) *types.ClusterResponse); ok { + if rf, ok := ret.Get(0).(func(context.Context, string) *types.ClusterRes); ok { r0 = rf(ctx, clusterId) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.ClusterResponse) + r0 = ret.Get(0).(*types.ClusterRes) } } @@ -288,12 +288,12 @@ func (_c *MockRunner_GetClusterById_Call) Run(run func(ctx context.Context, clus return _c } -func (_c *MockRunner_GetClusterById_Call) Return(_a0 *types.ClusterResponse, _a1 error) *MockRunner_GetClusterById_Call { +func (_c *MockRunner_GetClusterById_Call) Return(_a0 *types.ClusterRes, _a1 error) *MockRunner_GetClusterById_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockRunner_GetClusterById_Call) RunAndReturn(run func(context.Context, string) (*types.ClusterResponse, error)) *MockRunner_GetClusterById_Call { +func (_c *MockRunner_GetClusterById_Call) RunAndReturn(run func(context.Context, string) (*types.ClusterRes, error)) *MockRunner_GetClusterById_Call { _c.Call.Return(run) return _c } @@ -476,23 +476,23 @@ func (_c *MockRunner_InstanceLogs_Call) RunAndReturn(run func(context.Context, * } // ListCluster provides a mock function with given fields: ctx -func (_m *MockRunner) ListCluster(ctx context.Context) ([]types.ClusterResponse, error) { +func (_m *MockRunner) ListCluster(ctx context.Context) ([]types.ClusterRes, error) { ret := _m.Called(ctx) if len(ret) == 0 { panic("no return value specified for ListCluster") } - var r0 []types.ClusterResponse + var r0 []types.ClusterRes var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]types.ClusterResponse, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context) ([]types.ClusterRes, error)); ok { return rf(ctx) } - if rf, ok := ret.Get(0).(func(context.Context) []types.ClusterResponse); ok { + if rf, ok := ret.Get(0).(func(context.Context) []types.ClusterRes); ok { r0 = rf(ctx) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.ClusterResponse) + r0 = ret.Get(0).([]types.ClusterRes) } } @@ -523,12 +523,12 @@ func (_c *MockRunner_ListCluster_Call) Run(run func(ctx context.Context)) *MockR return _c } -func (_c *MockRunner_ListCluster_Call) Return(_a0 []types.ClusterResponse, _a1 error) *MockRunner_ListCluster_Call { +func (_c *MockRunner_ListCluster_Call) Return(_a0 []types.ClusterRes, _a1 error) *MockRunner_ListCluster_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockRunner_ListCluster_Call) RunAndReturn(run func(context.Context) ([]types.ClusterResponse, error)) *MockRunner_ListCluster_Call { +func (_c *MockRunner_ListCluster_Call) RunAndReturn(run func(context.Context) ([]types.ClusterRes, error)) *MockRunner_ListCluster_Call { _c.Call.Return(run) return _c } diff --git a/_mocks/opencsg.com/csghub-server/runner/component/mock_ClusterComponent.go b/_mocks/opencsg.com/csghub-server/runner/component/mock_ClusterComponent.go new file mode 100644 index 000000000..884c11caa --- /dev/null +++ b/_mocks/opencsg.com/csghub-server/runner/component/mock_ClusterComponent.go @@ -0,0 +1,162 @@ +// Code generated by mockery v2.53.5. DO NOT EDIT. + +package component + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + database "opencsg.com/csghub-server/builder/store/database" + + types "opencsg.com/csghub-server/common/types" +) + +// MockClusterComponent is an autogenerated mock type for the ClusterComponent type +type MockClusterComponent struct { + mock.Mock +} + +type MockClusterComponent_Expecter struct { + mock *mock.Mock +} + +func (_m *MockClusterComponent) EXPECT() *MockClusterComponent_Expecter { + return &MockClusterComponent_Expecter{mock: &_m.Mock} +} + +// ByClusterID provides a mock function with given fields: ctx, clusterId +func (_m *MockClusterComponent) ByClusterID(ctx context.Context, clusterId string) (database.ClusterInfo, error) { + ret := _m.Called(ctx, clusterId) + + if len(ret) == 0 { + panic("no return value specified for ByClusterID") + } + + var r0 database.ClusterInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (database.ClusterInfo, error)); ok { + return rf(ctx, clusterId) + } + if rf, ok := ret.Get(0).(func(context.Context, string) database.ClusterInfo); ok { + r0 = rf(ctx, clusterId) + } else { + r0 = ret.Get(0).(database.ClusterInfo) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, clusterId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockClusterComponent_ByClusterID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ByClusterID' +type MockClusterComponent_ByClusterID_Call struct { + *mock.Call +} + +// ByClusterID is a helper method to define mock.On call +// - ctx context.Context +// - clusterId string +func (_e *MockClusterComponent_Expecter) ByClusterID(ctx interface{}, clusterId interface{}) *MockClusterComponent_ByClusterID_Call { + return &MockClusterComponent_ByClusterID_Call{Call: _e.mock.On("ByClusterID", ctx, clusterId)} +} + +func (_c *MockClusterComponent_ByClusterID_Call) Run(run func(ctx context.Context, clusterId string)) *MockClusterComponent_ByClusterID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockClusterComponent_ByClusterID_Call) Return(clusterInfo database.ClusterInfo, err error) *MockClusterComponent_ByClusterID_Call { + _c.Call.Return(clusterInfo, err) + return _c +} + +func (_c *MockClusterComponent_ByClusterID_Call) RunAndReturn(run func(context.Context, string) (database.ClusterInfo, error)) *MockClusterComponent_ByClusterID_Call { + _c.Call.Return(run) + return _c +} + +// GetResourceByID provides a mock function with given fields: ctx, clusterId +func (_m *MockClusterComponent) GetResourceByID(ctx context.Context, clusterId string) (types.ResourceStatus, map[string]types.NodeResourceInfo, error) { + ret := _m.Called(ctx, clusterId) + + if len(ret) == 0 { + panic("no return value specified for GetResourceByID") + } + + var r0 types.ResourceStatus + var r1 map[string]types.NodeResourceInfo + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) (types.ResourceStatus, map[string]types.NodeResourceInfo, error)); ok { + return rf(ctx, clusterId) + } + if rf, ok := ret.Get(0).(func(context.Context, string) types.ResourceStatus); ok { + r0 = rf(ctx, clusterId) + } else { + r0 = ret.Get(0).(types.ResourceStatus) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) map[string]types.NodeResourceInfo); ok { + r1 = rf(ctx, clusterId) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]types.NodeResourceInfo) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, clusterId) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockClusterComponent_GetResourceByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetResourceByID' +type MockClusterComponent_GetResourceByID_Call struct { + *mock.Call +} + +// GetResourceByID is a helper method to define mock.On call +// - ctx context.Context +// - clusterId string +func (_e *MockClusterComponent_Expecter) GetResourceByID(ctx interface{}, clusterId interface{}) *MockClusterComponent_GetResourceByID_Call { + return &MockClusterComponent_GetResourceByID_Call{Call: _e.mock.On("GetResourceByID", ctx, clusterId)} +} + +func (_c *MockClusterComponent_GetResourceByID_Call) Run(run func(ctx context.Context, clusterId string)) *MockClusterComponent_GetResourceByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockClusterComponent_GetResourceByID_Call) Return(_a0 types.ResourceStatus, _a1 map[string]types.NodeResourceInfo, _a2 error) *MockClusterComponent_GetResourceByID_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockClusterComponent_GetResourceByID_Call) RunAndReturn(run func(context.Context, string) (types.ResourceStatus, map[string]types.NodeResourceInfo, error)) *MockClusterComponent_GetResourceByID_Call { + _c.Call.Return(run) + return _c +} + +// NewMockClusterComponent creates a new instance of MockClusterComponent. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockClusterComponent(t interface { + mock.TestingT + Cleanup(func()) +}) *MockClusterComponent { + mock := &MockClusterComponent{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/builder/deploy/deploy_ce_test.go b/builder/deploy/deploy_ce_test.go index c0436a73d..4a7d217bd 100644 --- a/builder/deploy/deploy_ce_test.go +++ b/builder/deploy/deploy_ce_test.go @@ -101,12 +101,12 @@ func TestDeployer_CheckResourceAvailable(t *testing.T) { tester := newTestDeployer(t) ctx := context.TODO() - tester.mocks.runner.EXPECT().ListCluster(ctx).Return([]types.ClusterResponse{ + tester.mocks.runner.EXPECT().ListCluster(ctx).Return([]types.ClusterRes{ {ClusterID: "c1"}, }, nil) - tester.mocks.runner.EXPECT().GetClusterById(ctx, "c1").Return(&types.ClusterResponse{ - Nodes: map[string]types.NodeResourceInfo{ - "n1": {AvailableMem: 100}, + tester.mocks.runner.EXPECT().GetClusterById(ctx, "c1").Return(&types.ClusterRes{ + Resources: []types.NodeResourceInfo{ + {AvailableMem: 100}, }, }, nil) @@ -209,17 +209,17 @@ func TestDeployer_GetClusterById(t *testing.T) { tester := newTestDeployer(t) t.Run("success", func(t *testing.T) { ctx := context.TODO() - tester.mocks.runner.EXPECT().GetClusterById(ctx, "1").Once().Return(&types.ClusterResponse{ + tester.mocks.runner.EXPECT().GetClusterById(ctx, "1").Once().Return(&types.ClusterRes{ ClusterID: "1", Region: "test-region", Zone: "test-zone", Enable: true, - Nodes: map[string]types.NodeResourceInfo{ - "1": { + Resources: []types.NodeResourceInfo{ + { AvailableCPU: 1, AvailableMem: 3, }, - "2": { + { AvailableCPU: 2, AvailableMem: 5, AvailableXPU: 4, @@ -234,9 +234,9 @@ func TestDeployer_GetClusterById(t *testing.T) { }) t.Run("empty nodes", func(t *testing.T) { ctx := context.TODO() - tester.mocks.runner.EXPECT().GetClusterById(ctx, "1").Once().Return(&types.ClusterResponse{ + tester.mocks.runner.EXPECT().GetClusterById(ctx, "1").Once().Return(&types.ClusterRes{ ClusterID: "1", - Nodes: nil, + Resources: nil, }, nil) clusterRes, err := tester.GetClusterById(ctx, "1") require.Nil(t, err) diff --git a/builder/deploy/deployer.go b/builder/deploy/deployer.go index edb6437bf..93452c671 100644 --- a/builder/deploy/deployer.go +++ b/builder/deploy/deployer.go @@ -17,7 +17,6 @@ import ( "github.com/google/uuid" "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/loki" "opencsg.com/csghub-server/builder/redis" "opencsg.com/csghub-server/builder/store/database" @@ -214,7 +213,7 @@ func (d *deployer) Deploy(ctx context.Context, dr types.DeployRepo) (int64, erro imgStrLen := len(strings.Trim(deploy.ImageID, " ")) slog.Debug("do deployer.Deploy check image", slog.Any("deploy.ImageID", deploy.ImageID), slog.Any("imgStrLen", imgStrLen)) if imgStrLen > 0 { - bldTaskStatus = scheduler.BuildSkip + bldTaskStatus = common.TaskStatusBuildSkip bldTaskMsg = "Skip" } slog.Debug("create build task", slog.Any("bldTaskStatus", bldTaskStatus), slog.Any("bldTaskMsg", bldTaskMsg)) @@ -237,8 +236,10 @@ func (d *deployer) Deploy(ctx context.Context, dr types.DeployRepo) (int64, erro return -1, fmt.Errorf("create deploy task failed: %w", err) } + // go func() { _ = d.scheduler.Queue(buildTask.ID) }() buildTask.Deploy = deploy runTask.Deploy = deploy + go DeployWorkflow(buildTask, runTask) d.logReporter.Report(types.LogEntry{ @@ -548,15 +549,21 @@ func (d *deployer) InstanceLogs(ctx context.Context, dr types.DeployRepo) (*Mult types.LogLabelTypeKey: types.LogLabelDeploy, types.StreamKeyDeployID: fmt.Sprintf("%d", deploy.ID), } + if dr.CommitID != "" { + labels[types.StreamKeyDeployCommitID] = dr.CommitID + } + if dr.InstanceName != "" { labels[types.StreamKeyInstanceName] = dr.InstanceName } deployId := fmt.Sprintf("%d", deploy.ID) + var startTime = deploy.CreatedAt if dr.Since != "" { startTime = parseSinceTime(dr.Since) } + runLog, err := d.readLogsFromLoki(ctx, types.ReadLogRequest{ DeployID: deployId, StartTime: startTime, @@ -577,17 +584,13 @@ func (d *deployer) ListCluster(ctx context.Context) ([]types.ClusterRes, error) } var result []types.ClusterRes for _, c := range resp { - resources := make([]types.NodeResourceInfo, 0) - for _, node := range c.Nodes { - resources = append(resources, node) - } result = append(result, types.ClusterRes{ ClusterID: c.ClusterID, Region: c.Region, Zone: c.Zone, Provider: c.Provider, - Resources: resources, - LastUpdateTime: c.UpdatedAt.Unix(), + Resources: c.Resources, + LastUpdateTime: c.LastUpdateTime, }) } return result, err @@ -646,7 +649,7 @@ func (d *deployer) GetClusterUsageById(ctx context.Context, clusterId string) (* } var vendorSet = make(map[string]struct{}, 0) var modelsSet = make(map[string]struct{}, 0) - for _, node := range resp.Nodes { + for _, node := range resp.Resources { res.TotalCPU += node.TotalCPU res.AvailableCPU += node.AvailableCPU res.TotalMem += float64(node.TotalMem) @@ -680,10 +683,16 @@ func (d *deployer) GetClusterUsageById(ctx context.Context, clusterId string) (* res.AvailableCPU = math.Floor(res.AvailableCPU) res.TotalMem = math.Floor(res.TotalMem) res.AvailableMem = math.Floor(res.AvailableMem) - res.NodeNumber = len(resp.Nodes) - res.CPUUsage = math.Round((res.TotalCPU-res.AvailableCPU)/res.TotalCPU*100) / 100 - res.MemUsage = math.Round((res.TotalMem-res.AvailableMem)/res.TotalMem*100) / 100 - res.GPUUsage = math.Round(float64(res.TotalGPU-res.AvailableGPU)/float64(res.TotalGPU)*100) / 100 + res.NodeNumber = len(resp.Resources) + if res.TotalCPU != 0 { + res.CPUUsage = math.Round((res.TotalCPU-res.AvailableCPU)/res.TotalCPU*100) / 100 + } + if res.TotalMem != 0 { + res.MemUsage = math.Round((res.TotalMem-res.AvailableMem)/res.TotalMem*100) / 100 + } + if res.TotalGPU != 0 { + res.GPUUsage = math.Round(float64(res.TotalGPU-res.AvailableGPU)/float64(res.TotalGPU)*100) / 100 + } return &res, err } @@ -828,8 +837,10 @@ func (d *deployer) StartDeploy(ctx context.Context, deploy *database.Deploy) err return fmt.Errorf("create deploy task failed: %w", err) } + // go func() { _ = d.scheduler.Queue(runTask.ID) }() runTask.Deploy = deploy go DeployWorkflow(nil, runTask) // runTask is the only task + // update resource if it's a order case err = d.updateUserResourceByOrder(ctx, deploy) if err != nil { @@ -1134,13 +1145,19 @@ func (d *deployer) CheckResourceAvailable(ctx context.Context, clusterId string, } if clusterResources.Status == types.ClusterStatusUnavailable { - return false, fmt.Errorf("failed to check cluster available resource due to cluster %s status is %s", + err := fmt.Errorf("failed to check cluster available resource due to cluster %s status is %s", clusterId, clusterResources.Status) + return false, errorx.ClusterUnavailable(err, errorx.Ctx(). + Set("cluster ID", clusterId). + Set("region", clusterResources.Region)) } if clusterResources.ResourceStatus != types.StatusUncertain && !CheckResource(clusterResources, hardWare) { - return false, fmt.Errorf("required resource on cluster %s is not enough with resource status %s", + err := fmt.Errorf("required resource on cluster %s is not enough with resource status %s", clusterId, clusterResources.ResourceStatus) + return false, errorx.NotEnoughResource(err, errorx.Ctx(). + Set("cluster ID", clusterId). + Set("region", clusterResources.Region)) } return true, nil @@ -1151,7 +1168,7 @@ func CheckResource(clusterResources *types.ClusterRes, hardware *types.HardWare) slog.Error("hardware is empty for check resource", slog.Any("clusterResources", clusterResources)) return false } - mem, err := strconv.Atoi(strings.Replace(hardware.Memory, "Gi", "", -1)) + mem, err := strconv.Atoi(strings.ReplaceAll(hardware.Memory, "Gi", "")) if err != nil { slog.Error("failed to parse hardware memory ", slog.Any("error", err)) return false diff --git a/builder/deploy/deployer_ce.go b/builder/deploy/deployer_ce.go index 6d4fb7b2d..275d5d200 100644 --- a/builder/deploy/deployer_ce.go +++ b/builder/deploy/deployer_ce.go @@ -146,12 +146,8 @@ func checkNodeResource(node types.NodeResourceInfo, hardware *types.HardWare) bo return true } -func (d *deployer) getResources(ctx context.Context, clusterId string, clusterResponse *types.ClusterResponse) ([]types.NodeResourceInfo, error) { - resources := make([]types.NodeResourceInfo, 0) - for _, node := range clusterResponse.Nodes { - resources = append(resources, node) - } - return resources, nil +func (d *deployer) getResources(ctx context.Context, clusterId string, clusterResponse *types.ClusterRes) ([]types.NodeResourceInfo, error) { + return clusterResponse.Resources, nil } func startAcctRequestFeeExtra(deploy database.Deploy, source string) string { diff --git a/builder/deploy/deployer_test.go b/builder/deploy/deployer_test.go index 3ce3cc758..daa1a82ba 100644 --- a/builder/deploy/deployer_test.go +++ b/builder/deploy/deployer_test.go @@ -598,15 +598,15 @@ func TestDeployer_InstanceLogs(t *testing.T) { func TestDeployer_ListCluster(t *testing.T) { - clusterResp := []types.ClusterResponse{ + clusterResp := []types.ClusterRes{ { ClusterID: "cluster1", Region: "us-east-1", Zone: "us-east-1a", Provider: "aws", Enable: false, - Nodes: map[string]types.NodeResourceInfo{ - "node1": { + Resources: []types.NodeResourceInfo{ + { NodeName: "node1", XPUModel: "", TotalCPU: 1, @@ -888,13 +888,13 @@ func TestDeployer_GetClusterUsageById(t *testing.T) { t.Run("success", func(t *testing.T) { clusterID := "test_cluster" mockRunner := mockrunner.NewMockRunner(t) - mockRunner.EXPECT().GetClusterById(mock.Anything, clusterID).Return(&types.ClusterResponse{ + mockRunner.EXPECT().GetClusterById(mock.Anything, clusterID).Return(&types.ClusterRes{ ClusterID: clusterID, Region: "test_region", Zone: "test_zone", Provider: "test_provider", - Nodes: map[string]types.NodeResourceInfo{ - "node1": { + Resources: []types.NodeResourceInfo{ + { TotalCPU: 4, AvailableCPU: 2, TotalMem: 8192, @@ -902,7 +902,7 @@ func TestDeployer_GetClusterUsageById(t *testing.T) { TotalXPU: 1, AvailableXPU: 0, }, - "node2": { + { TotalCPU: 8, AvailableCPU: 1.5, TotalMem: 16384, @@ -943,9 +943,9 @@ func TestDeployer_GetClusterUsageById(t *testing.T) { t.Run("no nodes", func(t *testing.T) { clusterID := "test_cluster_no_nodes" mockRunner := mockrunner.NewMockRunner(t) - mockRunner.EXPECT().GetClusterById(mock.Anything, clusterID).Return(&types.ClusterResponse{ + mockRunner.EXPECT().GetClusterById(mock.Anything, clusterID).Return(&types.ClusterRes{ ClusterID: clusterID, - Nodes: map[string]types.NodeResourceInfo{}, + Resources: []types.NodeResourceInfo{}, }, nil) d := &deployer{ diff --git a/builder/deploy/imagerunner/local_runner.go b/builder/deploy/imagerunner/local_runner.go index e41a1bfae..158a16fda 100644 --- a/builder/deploy/imagerunner/local_runner.go +++ b/builder/deploy/imagerunner/local_runner.go @@ -71,11 +71,11 @@ func (r *LocalRunner) Purge(ctx context.Context, req *types.PurgeRequest) (*type return nil, nil } -func (h *LocalRunner) ListCluster(ctx context.Context) ([]types.ClusterResponse, error) { +func (h *LocalRunner) ListCluster(ctx context.Context) ([]types.ClusterRes, error) { return nil, nil } -func (h *LocalRunner) GetClusterById(ctx context.Context, clusterId string) (*types.ClusterResponse, error) { +func (h *LocalRunner) GetClusterById(ctx context.Context, clusterId string) (*types.ClusterRes, error) { return nil, nil } diff --git a/builder/deploy/imagerunner/remote_runner.go b/builder/deploy/imagerunner/remote_runner.go index f3ee5f726..000777a5b 100644 --- a/builder/deploy/imagerunner/remote_runner.go +++ b/builder/deploy/imagerunner/remote_runner.go @@ -322,29 +322,29 @@ func (h *RemoteRunner) InstanceLogs(ctx context.Context, req *types.InstanceLogs return h.readToChannel(rc), nil } -func (h *RemoteRunner) ListCluster(ctx context.Context) ([]types.ClusterResponse, error) { +func (h *RemoteRunner) ListCluster(ctx context.Context) ([]types.ClusterRes, error) { clusters, err := h.clusterStore.List(ctx) if err != nil { slog.Error("failed to list clusters from db", slog.Any("error", err)) return nil, err } - var resp []types.ClusterResponse + var resp []types.ClusterRes for _, cluster := range clusters { if !cluster.Enable { continue } - resp = append(resp, types.ClusterResponse{ - ClusterID: cluster.ClusterID, - Region: cluster.Region, - Zone: cluster.Zone, - Provider: cluster.Provider, - UpdatedAt: cluster.UpdatedAt, + resp = append(resp, types.ClusterRes{ + ClusterID: cluster.ClusterID, + Region: cluster.Region, + Zone: cluster.Zone, + Provider: cluster.Provider, + LastUpdateTime: cluster.UpdatedAt.Unix(), }) } return resp, nil } -func (h *RemoteRunner) GetClusterById(ctx context.Context, clusterId string) (*types.ClusterResponse, error) { +func (h *RemoteRunner) GetClusterById(ctx context.Context, clusterId string) (*types.ClusterRes, error) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -359,7 +359,7 @@ func (h *RemoteRunner) GetClusterById(ctx context.Context, clusterId string) (*t return nil, err } defer response.Body.Close() - var resp types.ClusterResponse + var resp types.ClusterRes if err := json.NewDecoder(response.Body).Decode(&resp); err != nil { return nil, errorx.InternalServerError(err, nil) } diff --git a/builder/deploy/imagerunner/remote_runner_test.go b/builder/deploy/imagerunner/remote_runner_test.go index 56fb0d5b9..5f590fea2 100644 --- a/builder/deploy/imagerunner/remote_runner_test.go +++ b/builder/deploy/imagerunner/remote_runner_test.go @@ -198,7 +198,7 @@ func TestRemoteRunner_GetRemoteRunnerHost(t *testing.T) { func TestRemoteRunner_GetClusterById(t *testing.T) { clusterID := "test-cluster" - expectedCluster := &types.ClusterResponse{ + expectedCluster := &types.ClusterRes{ ClusterID: clusterID, Region: "test-region", Zone: "test-zone", @@ -270,7 +270,7 @@ func TestRemoteRunner_GetClusterById_ServerError(t *testing.T) { func TestRemoteRunner_GetClusterById_OutsideCluster(t *testing.T) { clusterID := "test-cluster" - expectedCluster := &types.ClusterResponse{ + expectedCluster := &types.ClusterRes{ ClusterID: clusterID, Region: "test-region", Zone: "test-zone", diff --git a/builder/deploy/imagerunner/runner.go b/builder/deploy/imagerunner/runner.go index d02c6f330..606bd3e9e 100644 --- a/builder/deploy/imagerunner/runner.go +++ b/builder/deploy/imagerunner/runner.go @@ -16,8 +16,8 @@ type Runner interface { Exist(context.Context, *types.CheckRequest) (*types.StatusResponse, error) GetReplica(context.Context, *types.StatusRequest) (*types.ReplicaResponse, error) InstanceLogs(context.Context, *types.InstanceLogsRequest) (<-chan string, error) - ListCluster(ctx context.Context) ([]types.ClusterResponse, error) - GetClusterById(ctx context.Context, clusterId string) (*types.ClusterResponse, error) + ListCluster(ctx context.Context) ([]types.ClusterRes, error) + GetClusterById(ctx context.Context, clusterId string) (*types.ClusterRes, error) UpdateCluster(ctx context.Context, data *types.ClusterRequest) (*types.UpdateClusterResponse, error) SubmitWorkFlow(context.Context, *types.ArgoWorkFlowReq) (*types.ArgoWorkFlowRes, error) DeleteWorkFlow(context.Context, types.ArgoWorkFlowDeleteReq) (*httpbase.R, error) diff --git a/common/types/cluster.go b/common/types/cluster.go index 7b57a47cf..61b58b54a 100644 --- a/common/types/cluster.go +++ b/common/types/cluster.go @@ -2,21 +2,6 @@ package types import "time" -type ClusterResponse struct { - ClusterID string `json:"cluster_id"` - Region string `json:"region"` - Zone string `json:"zone"` //cn-beijing - Provider string `json:"provider"` //ali - Enable bool `json:"enable"` - Nodes map[string]NodeResourceInfo `json:"nodes"` - - ClusterName string `json:"cluster_name"` - StorageClass string `json:"storage_class"` - ResourceStatus ResourceStatus `json:"resource_status"` - - UpdatedAt time.Time `json:"updated_at"` -} - type ClusterRequest struct { ClusterID string `json:"cluster_id"` ClusterConfig string `json:"cluster_config"` @@ -67,6 +52,8 @@ type ClusterRes struct { LastUpdateTime int64 `json:"last_update_time"` XPUVendors string `json:"xpu_vendors"` // NVIDIA, AMD XPUModels string `json:"xpu_models"` // A10(32 GB),H100(80 GB) + + Enable bool `json:"enable"` } type DeployRes struct { ClusterID string `json:"cluster_id"` diff --git a/runner/handler/cluster.go b/runner/handler/cluster.go index 9e689b436..fb2e59ff5 100644 --- a/runner/handler/cluster.go +++ b/runner/handler/cluster.go @@ -29,7 +29,7 @@ func NewClusterHandler(config *config.Config, clusterPool *cluster.ClusterPool) func (s *ClusterHandler) GetClusterInfoByID(c *gin.Context) { clusterId := c.Params.ByName("id") cInfo, _ := s.clusterComponent.ByClusterID(c.Request.Context(), clusterId) - clusterInfo := types.ClusterResponse{} + clusterInfo := types.ClusterRes{} clusterInfo.Region = cInfo.Region clusterInfo.Zone = cInfo.Zone clusterInfo.Provider = cInfo.Provider @@ -41,7 +41,9 @@ func (s *ClusterHandler) GetClusterInfoByID(c *gin.Context) { c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } - clusterInfo.Nodes = resourceAvaliable + for _, v := range resourceAvaliable { + clusterInfo.Resources = append(clusterInfo.Resources, v) + } clusterInfo.ResourceStatus = availabilityStatus c.JSON(http.StatusOK, clusterInfo) } diff --git a/runner/handler/cluster_test.go b/runner/handler/cluster_test.go new file mode 100644 index 000000000..f5ec46c74 --- /dev/null +++ b/runner/handler/cluster_test.go @@ -0,0 +1,90 @@ +package handler + +import ( + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/mock" + mockcomponent "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/runner/component" + "opencsg.com/csghub-server/builder/store/database" + "opencsg.com/csghub-server/common/types" +) + +func TestGetClusterInfoByID_Success(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + req := httptest.NewRequest(http.MethodGet, "/clusters/c1", nil) + c.Request = req + c.Params = gin.Params{{Key: "id", Value: "c1"}} + + cluster := mockcomponent.NewMockClusterComponent(t) + + h := &ClusterHandler{clusterComponent: cluster} + cluster.EXPECT().ByClusterID(mock.Anything, "c1").Return(database.ClusterInfo{ + ClusterID: "c1", + Region: "region1", + Zone: "zone1", + Provider: "provider1", + StorageClass: "standard", + }, nil) + cluster.EXPECT().GetResourceByID(mock.Anything, "c1").Return(types.StatusClusterWide, map[string]types.NodeResourceInfo{ + "node1": { + TotalCPU: 4, + AvailableCPU: 2, + TotalMem: 8192, + AvailableMem: 4096, + TotalXPU: 1, + AvailableXPU: 0, + }, + }, nil) + h.GetClusterInfoByID(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d, body: %s", w.Code, w.Body.String()) + } + + var resp types.ClusterRes + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + + if resp.ClusterID != "c1" { + t.Fatalf("expected cluster id c1, got %s", resp.ClusterID) + } + if len(resp.Resources) != 1 { + t.Fatalf("expected 1 resource entry, got %d", len(resp.Resources)) + } + if resp.ResourceStatus != types.StatusClusterWide { + t.Fatalf("unexpected resource status: %v", resp.ResourceStatus) + } +} + +func TestGetClusterInfoByID_GetResourceError(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + req := httptest.NewRequest(http.MethodGet, "/clusters/c2", nil) + c.Request = req + c.Params = gin.Params{{Key: "id", Value: "c2"}} + + cluster := mockcomponent.NewMockClusterComponent(t) + cluster.EXPECT().ByClusterID(mock.Anything, "c2").Return(database.ClusterInfo{ + ClusterID: "c2", + Region: "region2", + Zone: "zone2", + Provider: "provider2", + StorageClass: "standard", + }, nil) + cluster.EXPECT().GetResourceByID(mock.Anything, "c2").Return(types.StatusUncertain, nil, errors.New("cluster not found")) + h := &ClusterHandler{clusterComponent: cluster} + h.GetClusterInfoByID(c) + + if w.Code != http.StatusNotFound { + t.Fatalf("expected status 404, got %d, body: %s", w.Code, w.Body.String()) + } +}