diff --git a/builder/deploy/deployer.go b/builder/deploy/deployer.go index 86b2a524a..edb6437bf 100644 --- a/builder/deploy/deployer.go +++ b/builder/deploy/deployer.go @@ -1058,11 +1058,38 @@ func (d *deployer) startAcctMeteringRequest(resMap map[string]string, clusterMap return } + var hardware types.HardWare + if err := json.Unmarshal([]byte(deploy.Hardware), &hardware); err != nil { + slog.Error("Deploy hardware is invalid format", "hardware", deploy.Hardware, "deploy_id", deploy.ID) + return + } + + // Get replica count and multiply value by instance count + replicaCount := 1 + //only check for single node deploy, muti-node don't support replica + if hardware.Replicas < 2 { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Namespace and Name are not used in GetReplica call chain, only SvcName and ClusterID are needed + dr := types.DeployRepo{ + DeployID: deploy.ID, + SpaceID: deploy.SpaceID, + SvcName: deploy.SvcName, + ClusterID: deploy.ClusterID, + } + actualReplica, _, _, err := d.GetReplica(ctx, dr) + if err != nil { + slog.Warn("fail to get deploy replica for metering", slog.Any("deploy_id", deploy.ID), slog.Any("error", err)) + } else if actualReplica > 0 { + replicaCount = actualReplica + } + } + extra := startAcctRequestFeeExtra(deploy, d.deployConfig.UniqueServiceName) event := types.MeteringEvent{ Uuid: uuid.New(), //v4 UserUUID: deploy.UserUUID, - Value: int64(d.eventPub.SyncInterval), + Value: int64(d.eventPub.SyncInterval) * int64(replicaCount), ValueType: types.TimeDurationMinType, Scene: int(sceneType), OpUID: "", diff --git a/builder/deploy/deployer_test.go b/builder/deploy/deployer_test.go index 478806236..3ce3cc758 100644 --- a/builder/deploy/deployer_test.go +++ b/builder/deploy/deployer_test.go @@ -18,9 +18,11 @@ import ( mockbuilder "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagebuilder" mockrunner "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner" mockScheduler "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/scheduler" + mockmq "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/mq" mockdb "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/builder/deploy/common" "opencsg.com/csghub-server/builder/deploy/scheduler" + "opencsg.com/csghub-server/builder/event" "opencsg.com/csghub-server/builder/loki" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/tests" @@ -1071,3 +1073,358 @@ func TestDeployer_GetWorkflowLogsNonStream(t *testing.T) { require.Nil(t, err) require.NotNil(t, resp) } + +func TestDeployer_startAcctMeteringRequest(t *testing.T) { + now := time.Now() + eventTime := now + + t.Run("skip when cluster does not exist", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + clusterMap := map[string]database.ClusterInfo{} + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + } + + d := &deployer{} + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("skip when cluster is not running", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + clusterMap := map[string]database.ClusterInfo{ + "cluster1": { + ClusterID: "cluster1", + Status: types.ClusterStatusUnavailable, + }, + } + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + } + + d := &deployer{} + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("skip when cluster heartbeat timeout", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now.Add(-30 * time.Minute) // Very old update + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + } + + d := &deployer{ + deployConfig: common.DeployConfig{ + HeartBeatTimeInSec: 300, // 5 minutes + }, + } + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("skip when deploy has no SKU", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "", // Empty SKU + Type: types.InferenceType, + } + + d := &deployer{} + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("skip when SKU not found in resMap", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "unknown_sku", + Type: types.InferenceType, + } + + d := &deployer{} + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("skip when invalid deploy type", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: -1, // Invalid type + } + + d := &deployer{} + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("skip when ModelServerless scene", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + hardwareJSON := `{"replicas":1}` + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.ServerlessType, + Hardware: hardwareJSON, + } + + d := &deployer{} + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("skip when hardware format is invalid", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + Hardware: "invalid json", + } + + d := &deployer{} + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should return early without error + }) + + t.Run("success with single node deploy and replica count", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + hardwareJSON := `{"replicas":1}` + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + Hardware: hardwareJSON, + SvcName: "test-svc", + SpaceID: 0, + UserUUID: "user1", + } + + mockRunner := mockrunner.NewMockRunner(t) + mockRunner.EXPECT().GetReplica(mock.Anything, mock.Anything). + Return(&types.ReplicaResponse{ + ActualReplica: 3, + DesiredReplica: 3, + }, nil) + + mockMQ := mockmq.NewMockMessageQueue(t) + mockMQ.EXPECT().Publish(mock.Anything, mock.Anything).Return(nil) + + d := &deployer{ + imageRunner: mockRunner, + eventPub: &event.EventPublisher{ + SyncInterval: 5, // 5 minutes + MQ: mockMQ, + }, + deployConfig: common.DeployConfig{ + HeartBeatTimeInSec: 300, + UniqueServiceName: "test-service", + }, + } + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should complete without error + }) + + t.Run("success with multi-node deploy (replicas >= 2)", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + hardwareJSON := `{"replicas":2}` + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + Hardware: hardwareJSON, + SvcName: "test-svc", + UserUUID: "user1", + } + + mockMQ := mockmq.NewMockMessageQueue(t) + mockMQ.EXPECT().Publish(mock.Anything, mock.Anything).Return(nil) + + d := &deployer{ + eventPub: &event.EventPublisher{ + SyncInterval: 5, + MQ: mockMQ, + }, + deployConfig: common.DeployConfig{ + HeartBeatTimeInSec: 300, + UniqueServiceName: "test-service", + }, + } + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should complete without calling GetReplica + }) + + t.Run("success with GetReplica returning multiple replicas", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + hardwareJSON := `{"replicas":1}` + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + Hardware: hardwareJSON, + SvcName: "test-svc", + SpaceID: 0, + UserUUID: "user1", + } + + mockRunner := mockrunner.NewMockRunner(t) + mockRunner.EXPECT().GetReplica(mock.Anything, mock.Anything). + Return(&types.ReplicaResponse{ + ActualReplica: 2, + DesiredReplica: 2, + }, nil) + + mockMQ := mockmq.NewMockMessageQueue(t) + mockMQ.EXPECT().Publish(mock.Anything, mock.Anything).Return(nil) + + d := &deployer{ + imageRunner: mockRunner, + eventPub: &event.EventPublisher{ + SyncInterval: 5, + MQ: mockMQ, + }, + deployConfig: common.DeployConfig{ + HeartBeatTimeInSec: 300, + UniqueServiceName: "test-service", + }, + } + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should complete without error + }) + + t.Run("handle GetReplica error gracefully", func(t *testing.T) { + resMap := map[string]string{"sku1": "resource1"} + cluster := database.ClusterInfo{ + ClusterID: "cluster1", + Status: types.ClusterStatusRunning, + } + cluster.UpdatedAt = now + clusterMap := map[string]database.ClusterInfo{ + "cluster1": cluster, + } + hardwareJSON := `{"replicas":1}` + deploy := database.Deploy{ + ID: 1, + ClusterID: "cluster1", + SKU: "sku1", + Type: types.InferenceType, + Hardware: hardwareJSON, + SvcName: "test-svc", + SpaceID: 0, + UserUUID: "user1", + } + + mockRunner := mockrunner.NewMockRunner(t) + mockRunner.EXPECT().GetReplica(mock.Anything, mock.Anything). + Return(nil, errors.New("replica error")) + + mockMQ := mockmq.NewMockMessageQueue(t) + mockMQ.EXPECT().Publish(mock.Anything, mock.Anything).Return(nil) + + d := &deployer{ + imageRunner: mockRunner, + eventPub: &event.EventPublisher{ + SyncInterval: 5, + MQ: mockMQ, + }, + deployConfig: common.DeployConfig{ + HeartBeatTimeInSec: 300, + UniqueServiceName: "test-service", + }, + } + d.startAcctMeteringRequest(resMap, clusterMap, deploy, eventTime) + // Should use default replicaCount of 1 + }) +}