diff --git a/api/workflow/activity/deploy_activity.go b/api/workflow/activity/deploy_activity.go index 71ff7814f..c64d59984 100644 --- a/api/workflow/activity/deploy_activity.go +++ b/api/workflow/activity/deploy_activity.go @@ -17,7 +17,6 @@ import ( "opencsg.com/csghub-server/builder/deploy/common" "opencsg.com/csghub-server/builder/deploy/imagebuilder" "opencsg.com/csghub-server/builder/deploy/imagerunner" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/git/gitserver" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/errorx" @@ -32,12 +31,12 @@ const ( ) const ( - DeployStatusPending = 0 - DeployStatusDeploying = 1 - DeployStatusFailed = 2 - DeployStatusStartUp = 3 - DeployStatusRunning = 4 - DeployStatusRunTimeError = 5 + DeployStatusPending = common.TaskStatusDeployPending + DeployStatusDeploying = common.TaskStatusDeploying + DeployStatusFailed = common.TaskStatusDeployFailed + DeployStatusStartUp = common.TaskStatusDeployStartUp + DeployStatusRunning = common.TaskStatusDeployRunning + DeployStatusRunTimeError = common.TaskStatusDeployRunTimeError ) type DeployActivity struct { @@ -145,7 +144,7 @@ func (a *DeployActivity) Build(ctx context.Context, taskId int64) error { if err != nil { return fmt.Errorf("failed to get deploy task: %w", err) } - if task.Status == scheduler.BuildSkip { + if task.Status == common.TaskStatusBuildSkip { return nil } repoInfo, err := a.getRepositoryInfo(ctx, task) @@ -173,7 +172,7 @@ func (a *DeployActivity) getLogger(ctx context.Context) log.Logger { } // pollBuildStatus -func (a *DeployActivity) pollBuildStatus(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo, buildRequest *types.ImageBuilderRequest) error { +func (a *DeployActivity) pollBuildStatus(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo, buildRequest *types.ImageBuilderRequest) error { continueLoop, err := a.checkBuildStatus(ctx, task, buildRequest) if err != nil { return err @@ -219,23 +218,28 @@ func (a *DeployActivity) checkBuildStatus(ctx context.Context, task *database.De } switch { - case updatedTask.Status == scheduler.BuildPending: + case updatedTask.Status == common.TaskStatusBuildPending: if err := a.ib.Build(ctx, buildRequest); err != nil { if herr := a.handleBuildError(task, err); herr != nil { + a.getLogger(ctx).Error("Build failed", "task_id", task.ID, "error", err) return false, herr } + a.getLogger(ctx).Error("Build failed", "task_id", task.ID, "error", err) a.reportLog(types.BuildFailed.String()+": \n"+err.Error(), types.StepBuildFailed, task) return false, fmt.Errorf("build failed: %w", err) } if err := a.handleBuildTaskToBuildInQueue(task); err != nil { + a.getLogger(ctx).Error("Failed to handle build task to build in queue", "task_id", task.ID, "error", err) return false, err } a.reportLog(types.BuildInProgress.String(), types.StepBuildInProgress, task) return true, nil - case updatedTask.Status == scheduler.BuildFailed: + case updatedTask.Status == common.TaskStatusBuildFailed: + a.getLogger(ctx).Info("Build task failed", "task_id", task.ID, "status", updatedTask.Status) return false, fmt.Errorf("build task failed: %s", updatedTask.Message) - case updatedTask.Status == scheduler.BuildSucceed: + case updatedTask.Status == common.TaskStatusBuildSucceed: + a.getLogger(ctx).Info("Build task succeed", "task_id", task.ID, "status", updatedTask.Status) return false, nil case a.isTaskTimedOut(updatedTask): a.reportLog("build task timeout", types.StepBuildFailed, task) @@ -264,8 +268,8 @@ func (a *DeployActivity) isTaskTimedOut(task *database.DeployTask) bool { } // getRepositoryInfo -func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.DeployTask) (scheduler.RepoInfo, error) { - var repoInfo scheduler.RepoInfo +func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.DeployTask) (common.RepoInfo, error) { + var repoInfo common.RepoInfo if task.Deploy.SpaceID > 0 { space, err := a.ss.ByID(ctx, task.Deploy.SpaceID) @@ -288,10 +292,10 @@ func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.D } // createSpaceRepoInfo -func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int64) scheduler.RepoInfo { +func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int64) common.RepoInfo { cloneInfo := utilcommon.BuildCloneInfoByDomain(a.cfg.PublicDomain, a.cfg.SSHDomain, space.Repository) - return scheduler.RepoInfo{ + return common.RepoInfo{ Path: space.Repository.Path, Name: space.Repository.Name, Sdk: space.Sdk, @@ -308,8 +312,8 @@ func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int } // createModelRepoInfo -func (a *DeployActivity) createModelRepoInfo(model *database.Model, deployID int64) scheduler.RepoInfo { - return scheduler.RepoInfo{ +func (a *DeployActivity) createModelRepoInfo(model *database.Model, deployID int64) common.RepoInfo { + return common.RepoInfo{ Path: model.Repository.Path, Name: model.Repository.Name, ModelID: model.ID, @@ -358,7 +362,7 @@ func (a *DeployActivity) updateTaskStatus(task *database.DeployTask) error { // handleRepositoryNotFound func (a *DeployActivity) handleRepositoryNotFound(task *database.DeployTask) error { - task.Status = scheduler.BuildFailed + task.Status = common.TaskStatusBuildFailed task.Message = "repository not found, please check the repository path" task.Deploy.Status = common.BuildFailed if err := a.updateTaskStatus(task); err != nil { @@ -370,7 +374,7 @@ func (a *DeployActivity) handleRepositoryNotFound(task *database.DeployTask) err func (a *DeployActivity) handleBuildCancelled(task *database.DeployTask) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(5)) defer cancel() - task.Status = scheduler.Cancelled + task.Status = common.TaskStatusCancelled task.Message = "Cancelled" if err := a.ds.UpdateDeployTask(ctx, task); err != nil { return fmt.Errorf("handleBuildCancelled failed to update deploy task status: %w", err) @@ -380,7 +384,7 @@ func (a *DeployActivity) handleBuildCancelled(task *database.DeployTask) error { } func (a *DeployActivity) handleBuildTaskTimeout(task *database.DeployTask) error { - task.Status = scheduler.BuildFailed + task.Status = common.TaskStatusBuildFailed task.Message = "build task timeout" task.Deploy.Status = common.BuildFailed @@ -393,7 +397,7 @@ func (a *DeployActivity) handleBuildTaskTimeout(task *database.DeployTask) error // handleBuildError func (a *DeployActivity) handleBuildError(task *database.DeployTask, err error) error { - task.Status = scheduler.BuildFailed + task.Status = common.TaskStatusBuildFailed task.Message = fmt.Sprintf("build task failed: %s", err.Error()) task.Deploy.Status = common.BuildFailed @@ -405,7 +409,7 @@ func (a *DeployActivity) handleBuildError(task *database.DeployTask, err error) // updateTaskStatusToBuildInQueue func (a *DeployActivity) handleBuildTaskToBuildInQueue(task *database.DeployTask) error { - task.Status = scheduler.BuildInQueue + task.Status = common.TaskStatusBuildInQueue task.Message = "build in queue" task.Deploy.Status = common.BuildInQueue @@ -455,7 +459,7 @@ func (a *DeployActivity) reportLog(message string, step types.Step, task *databa } // createBuildRequest -func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo) (*types.ImageBuilderRequest, error) { +func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo) (*types.ImageBuilderRequest, error) { accessToken, err := a.ts.FindByUID(ctx, task.Deploy.UserID) if err != nil { return nil, fmt.Errorf("failed to get git access token: %w", err) @@ -501,7 +505,7 @@ func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database. } // createDeployRequest -func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo) (*types.RunRequest, error) { +func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo) (*types.RunRequest, error) { logger := a.getLogger(ctx) accessToken, err := a.ts.FindByUID(ctx, task.Deploy.UserID) @@ -584,7 +588,7 @@ func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database }, nil } -func (a *DeployActivity) determineSDKVersion(repoInfo scheduler.RepoInfo) string { +func (a *DeployActivity) determineSDKVersion(repoInfo common.RepoInfo) string { if repoInfo.SdkVersion != "" { return repoInfo.SdkVersion } @@ -606,7 +610,7 @@ func (a *DeployActivity) parseHardware(input string) string { return "cpu" } -func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo scheduler.RepoInfo) { +func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo common.RepoInfo) { stopCtx, stopCancel := context.WithTimeout(context.Background(), 5*time.Second) defer stopCancel() paths := strings.Split(repoInfo.Path, "/") @@ -624,7 +628,7 @@ func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo sche } // makeDeployEnv -func (a *DeployActivity) makeDeployEnv(ctx context.Context, hardware types.HardWare, accessToken *database.AccessToken, deployInfo *database.Deploy, engineArgsTemplates []types.EngineArg, toolCallParsers map[string]string, repoInfo scheduler.RepoInfo) (map[string]string, error) { +func (a *DeployActivity) makeDeployEnv(ctx context.Context, hardware types.HardWare, accessToken *database.AccessToken, deployInfo *database.Deploy, engineArgsTemplates []types.EngineArg, toolCallParsers map[string]string, repoInfo common.RepoInfo) (map[string]string, error) { logger := a.getLogger(ctx) envMap, err := utilcommon.JsonStrToMap(deployInfo.Env) diff --git a/api/workflow/activity/deploy_activity_test.go b/api/workflow/activity/deploy_activity_test.go index 0b515b4d4..c9f5012f8 100644 --- a/api/workflow/activity/deploy_activity_test.go +++ b/api/workflow/activity/deploy_activity_test.go @@ -7,7 +7,6 @@ import ( "time" "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/config" "opencsg.com/csghub-server/common/types" @@ -43,6 +42,7 @@ type testEnv struct { func setupTest(t *testing.T) *testEnv { ctx := context.Background() + ctx = context.WithValue(ctx, "test", "test") // Create mock dependencies mockDeployTaskStore := mockdb.NewMockDeployTaskStore(t) @@ -98,12 +98,12 @@ func TestActivities_determineSDKVersion(t *testing.T) { // Test cases testCases := []struct { name string - repoInfo scheduler.RepoInfo + repoInfo common.RepoInfo expectedSDK string }{ { name: "With explicit SDK version", - repoInfo: scheduler.RepoInfo{ + repoInfo: common.RepoInfo{ SdkVersion: "custom-version", Sdk: "some-other-sdk", }, @@ -111,7 +111,7 @@ func TestActivities_determineSDKVersion(t *testing.T) { }, { name: "With GRADIO SDK", - repoInfo: scheduler.RepoInfo{ + repoInfo: common.RepoInfo{ SdkVersion: "", Sdk: types.GRADIO.Name, }, @@ -119,7 +119,7 @@ func TestActivities_determineSDKVersion(t *testing.T) { }, { name: "With STREAMLIT SDK", - repoInfo: scheduler.RepoInfo{ + repoInfo: common.RepoInfo{ SdkVersion: "", Sdk: types.STREAMLIT.Name, }, @@ -127,7 +127,7 @@ func TestActivities_determineSDKVersion(t *testing.T) { }, { name: "With unknown SDK", - repoInfo: scheduler.RepoInfo{ + repoInfo: common.RepoInfo{ SdkVersion: "", Sdk: "unknown-sdk", }, diff --git a/api/workflow/deployer_test.go b/api/workflow/deployer_test.go index 0c51c06b7..de2717525 100644 --- a/api/workflow/deployer_test.go +++ b/api/workflow/deployer_test.go @@ -15,7 +15,6 @@ import ( "go.temporal.io/sdk/testsuite" "opencsg.com/csghub-server/api/workflow/activity" "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/config" "opencsg.com/csghub-server/common/types" @@ -120,7 +119,7 @@ func TestDeployWorkflowSuccess(t *testing.T) { ID: 1, DeployID: deploy.ID, Deploy: deploy, - Status: scheduler.BuildSkip, + Status: common.TaskStatusBuildSkip, } runTask := &database.DeployTask{ @@ -137,7 +136,7 @@ func TestDeployWorkflowSuccess(t *testing.T) { }, nil) mockDeployTaskStore.EXPECT().GetDeployTask(mock.Anything, buildTask.ID).Return(buildTask, nil) - buildTask.Status = scheduler.BuildSucceed + buildTask.Status = common.TaskStatusBuildSucceed // deploy mockLogReporter.EXPECT().Report(mock.Anything).Return().Maybe() diff --git a/builder/deploy/scheduler/config.go b/builder/deploy/common/config.go similarity index 95% rename from builder/deploy/scheduler/config.go rename to builder/deploy/common/config.go index fff9167b7..55b35ff25 100644 --- a/builder/deploy/scheduler/config.go +++ b/builder/deploy/common/config.go @@ -1,4 +1,4 @@ -package scheduler +package common type Config struct { ImageBuilderURL string `json:"image_builder_url"` diff --git a/builder/deploy/common/status.go b/builder/deploy/common/status.go index 66a656bd9..a325105b2 100644 --- a/builder/deploy/common/status.go +++ b/builder/deploy/common/status.go @@ -1,7 +1,27 @@ package common // sub build task status +const ( + TaskStatusCancelled = -1 + TaskStatusBuildPending = 0 + TaskStatusBuildInProgress = 1 + TaskStatusBuildFailed = 2 + TaskStatusBuildSucceed = 3 + TaskStatusBuildSkip = 4 // export for other package + TaskStatusBuildInQueue = 5 +) + +// sub deploy task status +const ( + TaskStatusDeployPending = 0 + TaskStatusDeploying = 1 + TaskStatusDeployFailed = 2 + TaskStatusDeployStartUp = 3 + TaskStatusDeployRunning = 4 + TaskStatusDeployRunTimeError = 5 +) +// deploy status const ( Pending = 0 // step one: build diff --git a/builder/deploy/deploy_ce_test.go b/builder/deploy/deploy_ce_test.go index c0436a73d..46395464b 100644 --- a/builder/deploy/deploy_ce_test.go +++ b/builder/deploy/deploy_ce_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" mockbuilder "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagebuilder" mockrunner "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner" - mockScheduler "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/scheduler" mockdb "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/builder/deploy/common" "opencsg.com/csghub-server/builder/store/database" @@ -36,8 +35,6 @@ func newTestDeployer(t *testing.T) *testDepolyerWithMocks { }, } s.mocks.stores = mockStores - s.mocks.scheduler = mockScheduler.NewMockScheduler(t) - s.scheduler = s.mocks.scheduler s.mocks.builder = mockbuilder.NewMockBuilder(t) s.imageBuilder = s.mocks.builder s.mocks.runner = mockrunner.NewMockRunner(t) diff --git a/builder/deploy/deployer.go b/builder/deploy/deployer.go index e3bbbdc3a..4320d3d24 100644 --- a/builder/deploy/deployer.go +++ b/builder/deploy/deployer.go @@ -17,7 +17,6 @@ import ( "github.com/google/uuid" "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/loki" "opencsg.com/csghub-server/builder/redis" "opencsg.com/csghub-server/builder/store/database" @@ -214,7 +213,7 @@ func (d *deployer) Deploy(ctx context.Context, dr types.DeployRepo) (int64, erro imgStrLen := len(strings.Trim(deploy.ImageID, " ")) slog.Debug("do deployer.Deploy check image", slog.Any("deploy.ImageID", deploy.ImageID), slog.Any("imgStrLen", imgStrLen)) if imgStrLen > 0 { - bldTaskStatus = scheduler.BuildSkip + bldTaskStatus = common.TaskStatusBuildSkip bldTaskMsg = "Skip" } slog.Debug("create build task", slog.Any("bldTaskStatus", bldTaskStatus), slog.Any("bldTaskMsg", bldTaskMsg)) diff --git a/builder/deploy/deployer_ce.go b/builder/deploy/deployer_ce.go index 6d4fb7b2d..f6ec8a7de 100644 --- a/builder/deploy/deployer_ce.go +++ b/builder/deploy/deployer_ce.go @@ -16,7 +16,6 @@ import ( "opencsg.com/csghub-server/builder/deploy/common" "opencsg.com/csghub-server/builder/deploy/imagebuilder" "opencsg.com/csghub-server/builder/deploy/imagerunner" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/event" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/config" @@ -25,7 +24,6 @@ import ( ) type deployer struct { - scheduler scheduler.Scheduler imageBuilder imagebuilder.Builder imageRunner imagerunner.Runner @@ -45,7 +43,7 @@ type deployer struct { logReporter reporter.LogCollector } -func newDeployer(s scheduler.Scheduler, ib imagebuilder.Builder, ir imagerunner.Runner, c common.DeployConfig, logReporter reporter.LogCollector, cfg *config.Config, startJobs bool) (*deployer, error) { +func newDeployer(ib imagebuilder.Builder, ir imagerunner.Runner, c common.DeployConfig, logReporter reporter.LogCollector, cfg *config.Config, startJobs bool) (*deployer, error) { store := database.NewDeployTaskStore() node, err := snowflake.NewNode(1) @@ -55,7 +53,6 @@ func newDeployer(s scheduler.Scheduler, ib imagebuilder.Builder, ir imagerunner. } d := &deployer{ - scheduler: s, imageBuilder: ib, imageRunner: ir, deployTaskStore: store, diff --git a/builder/deploy/deployer_test.go b/builder/deploy/deployer_test.go index 3ce3cc758..2016cdc63 100644 --- a/builder/deploy/deployer_test.go +++ b/builder/deploy/deployer_test.go @@ -21,7 +21,6 @@ import ( mockmq "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/mq" mockdb "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/event" "opencsg.com/csghub-server/builder/loki" "opencsg.com/csghub-server/builder/store/database" @@ -193,7 +192,7 @@ func TestDeployer_Deploy(t *testing.T) { buildTask := database.DeployTask{ TaskType: 0, - Status: scheduler.BuildSkip, + Status: common.TaskStatusBuildSkip, Message: "Skip", } runTask := database.DeployTask{ diff --git a/builder/deploy/init.go b/builder/deploy/init.go index 63bb7204d..2a4104174 100644 --- a/builder/deploy/init.go +++ b/builder/deploy/init.go @@ -10,12 +10,10 @@ import ( "opencsg.com/csghub-server/builder/deploy/common" "opencsg.com/csghub-server/builder/deploy/imagebuilder" "opencsg.com/csghub-server/builder/deploy/imagerunner" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/common/config" ) var ( - fifoScheduler scheduler.Scheduler defaultDeployer Deployer ) @@ -35,12 +33,7 @@ func Init(c common.DeployConfig, config *config.Config, startJobs bool) error { return fmt.Errorf("failed to create log reporter:%w", err) } - fifoScheduler, err = scheduler.NewFIFOScheduler(ib, ir, c, logReporter) - if err != nil { - return fmt.Errorf("failed to create scheduler:%w", err) - } - - deployer, err := newDeployer(fifoScheduler, ib, ir, c, logReporter, config, startJobs) + deployer, err := newDeployer(ib, ir, c, logReporter, config, startJobs) if err != nil { return fmt.Errorf("failed to create deployer:%w", err) } diff --git a/builder/deploy/scheduler/builder_runner.go b/builder/deploy/scheduler/builder_runner.go deleted file mode 100644 index a57543ed2..000000000 --- a/builder/deploy/scheduler/builder_runner.go +++ /dev/null @@ -1,213 +0,0 @@ -package scheduler - -import ( - "context" - "fmt" - "log/slog" - "strconv" - "strings" - "time" - - "opencsg.com/csghub-server/component/reporter" - - "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/imagebuilder" - "opencsg.com/csghub-server/builder/git/gitserver" - "opencsg.com/csghub-server/builder/store/database" - "opencsg.com/csghub-server/common/types" -) - -// BuilderRunner defines a docker image building task -type BuilderRunner struct { - repo *RepoInfo - task *database.DeployTask - ib imagebuilder.Builder - deployStore database.DeployTaskStore - tokenStore database.AccessTokenStore - logReporter reporter.LogCollector - git gitserver.GitServer - timeout int -} - -func NewBuildRunner(gc gitserver.GitServer, b imagebuilder.Builder, r *RepoInfo, t *database.DeployTask, logReporter reporter.LogCollector, timeout int) (Runner, error) { - return &BuilderRunner{ - repo: r, - task: t, - ib: b, - deployStore: database.NewDeployTaskStore(), - tokenStore: database.NewAccessTokenStore(), - git: gc, - logReporter: logReporter, - timeout: timeout, - }, nil -} - -func (t *BuilderRunner) makeBuildRequest() (*types.ImageBuilderRequest, error) { - token, err := t.tokenStore.FindByUID(context.Background(), t.task.Deploy.UserID) - if err != nil { - return nil, fmt.Errorf("cant get git access token:%w", err) - } - fields := strings.Split(t.repo.Path, "/") - sdkVer := "" - if t.repo.SdkVersion == "" { - slog.Debug("Use SDK default version", slog.Any("repository path", t.repo.Path)) - if t.repo.Sdk == types.GRADIO.Name { - sdkVer = types.GRADIO.Version - } else if t.repo.Sdk == types.STREAMLIT.Name { - sdkVer = types.STREAMLIT.Version - } - } else { - sdkVer = t.repo.SdkVersion - } - - commit, err := t.git.GetRepoLastCommit(context.Background(), gitserver.GetRepoLastCommitReq{ - RepoType: types.RepositoryType(t.repo.RepoType), - Namespace: fields[0], - Name: fields[1], - Ref: t.task.Deploy.GitBranch, - }) - - if err != nil { - return nil, fmt.Errorf("get repo last commit failed: %w", err) - } - - return &types.ImageBuilderRequest{ - OrgName: fields[0], - SpaceName: fields[1], - Hardware: t.parseHardware(t.task.Deploy.Hardware), - // PythonVersion: t.space.PythonVersion, - PythonVersion: "3.10", - // SDKType: "gradio", - // SDKVersion: "3.37.0", - Sdk: t.repo.Sdk, - DriverVersion: t.repo.DriverVersion, - Sdk_version: sdkVer, - SpaceURL: t.repo.HTTPCloneURL, - GitRef: t.task.Deploy.GitBranch, - UserId: token.User.Username, - GitAccessToken: token.Token, - DeployId: strconv.FormatInt(t.task.DeployID, 10), - FactoryBuild: false, - ClusterID: t.task.Deploy.ClusterID, - LastCommitID: commit.ID, - TaskId: t.task.ID, - }, nil -} - -func (t *BuilderRunner) parseHardware(intput string) string { - if strings.Contains(intput, "GPU") || strings.Contains(intput, "NVIDIA") { - return "gpu" - } - - return "cpu" -} - -// Run call image builder service to build a docker image -func (t *BuilderRunner) Run(ctx context.Context) error { - slog.Info("run image build task", slog.Int64("deplopy_task_id", t.task.ID)) - t.reporterLog(types.BuildInProgress.String(), types.StepBuildInProgress) - - if t.task.Status == BuildPending { - req, err := t.makeBuildRequest() - if err != nil { - return fmt.Errorf("make build request failed: %w", err) - } - slog.Debug("make build request", slog.Any("req", req)) - err = t.ib.Build(ctx, req) - if err != nil { - // TODO:return retryable error - return fmt.Errorf("call image builder failed: %w", err) - } - - t.buildInQueue() - } - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - - case <-ticker.C: - task, err := t.deployStore.GetDeployTask(ctx, t.task.ID) - if err != nil { - return fmt.Errorf("get deploy task failed: %w", err) - } - - if task.Status == BuildFailed || task.Status == BuildSucceed { - return nil - } - - if task.CreatedAt.Add(time.Duration(t.timeout) * time.Second * 10).Before(time.Now()) { - t.buildFailed("build task timeout") - } - } - } -} - -func (t *BuilderRunner) buildInQueue() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - deploy, err := t.deployStore.GetDeployByID(ctx, t.task.DeployID) - if err != nil { - slog.Error("failed to get deploy info when build in queue", "deploy_id", t.task.DeployID, "error", err) - return - } - if deploy.Status == common.Building { - //"deploy status is already building, skip setting build in progress status" - return - } - t.task.Status = BuildInQueue - t.task.Message = "build in queue" - // change to building status - t.task.Deploy.Status = common.BuildInQueue - if err := t.deployStore.UpdateInTx(ctx, []string{"status"}, []string{"status", "message"}, t.task.Deploy, t.task); err != nil { - slog.Error("failed to change deploy status to `Building`", "error", err) - } -} - -func (t *BuilderRunner) buildFailed(msg string) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - deploy, err := t.deployStore.GetDeployByID(ctx, t.task.DeployID) - if err != nil { - slog.Error("failed to get deploy info when build failed", "error", err) - return - } - if deploy.Status != common.Building { - slog.Warn("deploy status is not building, skip setting build failed status in builder_runner", - slog.Any("deploy_id", deploy.ID), slog.Any("deploy_status", deploy.Status)) - return - } - - t.task.Status = BuildFailed - t.task.Message = msg - // change to building status - t.task.Deploy.Status = common.BuildFailed - if err := t.deployStore.UpdateInTx(ctx, []string{"status"}, []string{"status", "message"}, t.task.Deploy, t.task); err != nil { - slog.Error("failed to change deploy status to `BuildFailed`", "error", err) - } - t.reporterLog(msg, types.StepBuildFailed) -} - -func (t *BuilderRunner) WatchID() int64 { return t.task.ID } - -func (t *BuilderRunner) reporterLog(msg string, step types.Step) { - logEntry := types.LogEntry{ - Message: msg, - Stage: types.StageBuild, - Step: step, - DeployID: strconv.FormatInt(t.task.DeployID, 10), - Labels: map[string]string{ - types.LogLabelTypeKey: types.LogLabelImageBuilder, - }, - } - if nil != t.task.Deploy { - logEntry.Labels[types.StreamKeyDeployType] = strconv.Itoa(t.task.Deploy.Type) - logEntry.Labels[types.StreamKeyDeployTypeID] = strconv.FormatInt(t.task.ID, 10) - logEntry.Labels[types.StreamKeyDeployTaskID] = strconv.FormatInt(t.task.ID, 10) - } - t.logReporter.Report(logEntry) -} diff --git a/builder/deploy/scheduler/deploy_runner.go b/builder/deploy/scheduler/deploy_runner.go deleted file mode 100644 index cf7b6eb12..000000000 --- a/builder/deploy/scheduler/deploy_runner.go +++ /dev/null @@ -1,521 +0,0 @@ -package scheduler - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "net/url" - "strconv" - "strings" - "time" - - "opencsg.com/csghub-server/component/reporter" - - "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/imagerunner" - "opencsg.com/csghub-server/builder/store/database" - "opencsg.com/csghub-server/common/types" - hubcom "opencsg.com/csghub-server/common/utils/common" -) - -// DeployRunner defines a k8s image running task -type DeployRunner struct { - repo *RepoInfo - task *database.DeployTask - ir imagerunner.Runner - store database.DeployTaskStore - tokenStore database.AccessTokenStore - urs database.UserResourcesStore - deployStartTime time.Time - deployCfg common.DeployConfig - runtimeFrameworksStore database.RuntimeFrameworksStore - logReporter reporter.LogCollector - metadataStore database.MetadataStore -} - -func NewDeployRunner(ir imagerunner.Runner, r *RepoInfo, t *database.DeployTask, deployCfg common.DeployConfig, logReporter reporter.LogCollector) Runner { - return &DeployRunner{ - repo: r, - task: t, - ir: ir, - store: database.NewDeployTaskStore(), - deployStartTime: time.Now(), - tokenStore: database.NewAccessTokenStore(), - urs: database.NewUserResourcesStore(), - deployCfg: deployCfg, - runtimeFrameworksStore: database.NewRuntimeFrameworksStore(), - logReporter: logReporter, - metadataStore: database.NewMetadataStore(), - } -} - -// Run call k8s image runner service to run a docker image -func (t *DeployRunner) Run(ctx context.Context) error { - slog.Info("run image deploy task", slog.Int64("deplopy_task_id", t.task.ID)) - t.reporterLog(types.DeployInProgress.String(), types.StepDeploying) - - // keep checking deploy status - for { - if t.task.Status == deployPending { - req, err := t.makeDeployRequest() - if err != nil { - return fmt.Errorf("fail to make deploy request: %w", err) - } - // check if the building sub-task of deploy is finished and waiting until the image is ready to run - if req.ImageID == "" { - time.Sleep(5 * time.Second) - continue - } - slog.Debug("After build deploy request", slog.Any("req", req)) - resp, err := t.ir.Run(ctx, req) - if err != nil { - // TODO:return retryable error - return fmt.Errorf("call image runner failed: %w", err) - } - // record deploy id to user resource if deploy has order id - if req.OrderDetailID != 0 { - ur, err := t.urs.FindUserResourcesByOrderDetailId(ctx, req.UserID, req.OrderDetailID) - if err != nil { - return fmt.Errorf("fail to find user resource by order detail id: %w", err) - } - ur.DeployId = req.DeployID - err = t.urs.UpdateDeployId(ctx, ur) - if err != nil { - return fmt.Errorf("fail to update deploy id for user resource: %w", err) - } - } - - t.deployInProgress(resp.Message) - // record time of create knative service - t.deployStartTime = time.Now() - } - //wait svc to be created in k8s - time.Sleep(10 * time.Second) - - fields := strings.Split(t.repo.Path, "/") - - targetID := t.task.Deploy.SpaceID - if t.task.Deploy.SpaceID == 0 { - targetID = t.task.Deploy.ID // support model deploy with multi-instance - } - req := &types.StatusRequest{ - ID: targetID, - OrgName: fields[0], - RepoName: fields[1], - SvcName: t.task.Deploy.SvcName, - ClusterID: t.task.Deploy.ClusterID, - NeedDetails: true, // check status of both knative and its pods - } - timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - resp, err := t.ir.Status(timeoutCtx, req) - cancel() - if err != nil { - // return -1, fmt.Errorf("failed to call builder status api,%w", err) - slog.Error("failed to call runner status api", slog.Any("error", err), slog.Any("task", t.task)) - // wait before next check - time.Sleep(10 * time.Second) - continue - } - - if resp.DeployID > t.task.DeployID { - t.deployFailed(fmt.Sprintf("cancel by new deploy:%d", resp.DeployID)) - return nil - } - slog.Debug("[deploy_runner_get_status]", slog.Any("resp", resp)) - switch resp.Code { - case common.Deploying: - isCancel, reason := t.shouldForceCancelDeploy(fields[0], fields[1], resp) - if isCancel { - return t.cancelDeploy(ctx, fields[0], fields[1], reason) - } - t.deployInProgress("") - // waitting for check next time - time.Sleep(10 * time.Second) - case common.DeployFailed: - slog.Error("image deploy failed", slog.String("repo_name", t.repo.Name), slog.Any("deplopy_task_id", t.task.ID), slog.Any("resp", resp)) - t.deployFailed(resp.Message) - - return fmt.Errorf("deploy failed, resp msg:%s", resp.Message) - case common.Startup: - slog.Info("image deploy success", slog.String("repo_name", t.repo.Name), slog.Any("deplopy_task_id", t.task.ID)) - t.deploySuccess() - // wait before next check - time.Sleep(10 * time.Second) - - case common.Running: - slog.Info("image running", slog.String("repo_name", t.repo.Name), slog.Any("deplopy_task_id", t.task.ID)) - t.running(resp.Endpoint) - - return nil - case common.RunTimeError: - slog.Error("image runtime erro", slog.String("repo_name", t.repo.Name), slog.Any("deplopy_task_id", t.task.ID)) - t.runtimeError(resp.Message) - - return fmt.Errorf("runtime error, resp msg:%s", resp.Message) - default: - slog.Error("unknown deploy status", slog.String("repo_name", t.repo.Name), slog.Any("deplopy_task_id", t.task.ID), - slog.Int("status", resp.Code)) - return fmt.Errorf("unknown deploy status, resp-code: %d, resp-msg: %s", resp.Code, resp.Message) - } - } -} - -func (t *DeployRunner) shouldForceCancelDeploy(orgName, repoName string, resp *types.StatusResponse) (bool, string) { - duration := time.Since(t.deployStartTime).Minutes() - limitTime := t.deployCfg.SpaceDeployTimeoutInMin - if t.task.Deploy.SpaceID == 0 && t.task.Deploy.ModelID > 0 { - limitTime = t.deployCfg.ModelDeployTimeoutInMin - } - if duration >= float64(limitTime) { - // space or model deploy duration is greater than timeout defined in env (default is 30 mins) - reason := "This Space/Model has been cancelled automatically by the system due to deployment timeout." - slog.Warn(reason, slog.Any("duration", duration), slog.Any("timeout", limitTime), slog.Any("namespace", orgName), slog.Any("repoName", repoName)) - return true, reason - } - - // Todo: check if pod is pending for too long due to not enough hardware resources - // if t.task.Deploy.SpaceID > 0 && len(resp.Instances) > 0 && resp.Instances[0].Status == string(corev1.PodPending) { - // reason := "The deployment has been cancelled because it took too long to acquire the necessary hardware resources." - // slog.Warn(reason, slog.Any("namespace", orgName), slog.Any("repoName", repoName)) - // return true, reason - // } - - return false, "" -} - -func (t *DeployRunner) WatchID() int64 { return t.task.ID } - -func (t *DeployRunner) deployInProgress(svcName string) { - t.task.Status = deploying - t.task.Message = "deploy in progress" - // change to building status - t.task.Deploy.Status = common.Deploying - if len(svcName) > 0 { - t.task.Deploy.SvcName = svcName - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := t.store.UpdateInTx(ctx, []string{"status", "svc_name"}, []string{"status", "message"}, t.task.Deploy, t.task); err != nil { - slog.Error("failed to change deploy status to `Deploying`", "error", err) - } -} - -func (t *DeployRunner) deploySuccess() { - t.task.Status = deployStartUp - t.task.Message = "deploy succeeded, wati for startup" - // change to building status - t.task.Deploy.Status = common.Startup - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := t.store.UpdateInTx(ctx, []string{"status"}, []string{"status", "message"}, t.task.Deploy, t.task); err != nil { - slog.Error("failed to change deploy status to `Startup`", "error", err) - } - t.reporterLog(types.DeployStarting.String(), types.StepDeployStartUp) -} - -func (t *DeployRunner) deployFailed(msg string) { - t.task.Status = deployFailed - t.task.Message = msg - // change to building status - t.task.Deploy.Status = common.DeployFailed - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := t.store.UpdateInTx(ctx, []string{"status"}, []string{"status", "message"}, t.task.Deploy, t.task); err != nil { - slog.Error("failed to change deploy status to `DeployFailed`", "error", err) - } - t.reporterLog(types.DeployFailed.String(), types.StepDeployFailed) -} - -func (t *DeployRunner) running(endpoint string) { - t.task.Status = deployRunning - t.task.Message = "running" - // change to building status - t.task.Deploy.Status = common.Running - t.task.Deploy.Endpoint = endpoint - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := t.store.UpdateInTx(ctx, []string{"status", "endpoint"}, []string{"status", "message"}, t.task.Deploy, t.task); err != nil { - slog.Error("failed to change deploy status to `Running`", "error", err) - } -} - -func (t *DeployRunner) runtimeError(msg string) { - t.task.Status = deployRunTimeError - t.task.Message = msg - // change to building status - t.task.Deploy.Status = common.RunTimeError - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := t.store.UpdateInTx(ctx, []string{"status"}, []string{"status", "message"}, t.task.Deploy, t.task); err != nil { - slog.Error("failed to change deploy status to `RunTimeError`", "error", err) - } - t.reporterLog(fmt.Sprintf("%s error: %s", msg, types.DeployError.String()), types.StepDeployRunTimeError) - -} - -func (t *DeployRunner) makeDeployRequest() (*types.RunRequest, error) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - token, err := t.tokenStore.FindByUID(ctx, t.task.Deploy.UserID) - if err != nil { - return nil, fmt.Errorf("cant get git access token:%w", err) - } - fields := strings.Split(t.repo.Path, "/") - deploy, err := t.store.GetDeployByID(ctx, t.task.DeployID) - if err != nil { - return nil, fmt.Errorf("fail to get deploy with error: %w", err) - } - - var engineArgsTemplate []types.EngineArg - var toolCallParsers map[string]string - if len(deploy.RuntimeFramework) > 0 { - frame, err := t.runtimeFrameworksStore.FindEnabledByName(ctx, deploy.RuntimeFramework) - if err != nil { - return nil, fmt.Errorf("get runtime framework by name %s error: %w", deploy.RuntimeFramework, err) - } - if len(strings.TrimSpace(frame.EngineArgs)) > 0 { - err = json.Unmarshal([]byte(frame.EngineArgs), &engineArgsTemplate) - if err != nil { - return nil, fmt.Errorf("unmarshal engine args error: %w", err) - } - } - if len(strings.TrimSpace(frame.ToolCallParsers)) > 0 { - err = json.Unmarshal([]byte(frame.ToolCallParsers), &toolCallParsers) - if err != nil { - slog.Error("unmarshal tool call parsers error", slog.Any("error", err)) - } - } - } - - annoMap, err := hubcom.JsonStrToMap(deploy.Annotation) - if err != nil { - slog.Error("deploy annotation is invalid json data", slog.Any("Annotation", deploy.Annotation)) - return nil, err - } - annoMap[types.ResDeployID] = fmt.Sprintf("%v", deploy.ID) - - var hardware = types.HardWare{} - err = json.Unmarshal([]byte(deploy.Hardware), &hardware) - if err != nil { - slog.Error("deploy hardware is invalid format", slog.Any("hardware", deploy.Hardware)) - return nil, err - } - - envMap := t.makeDeployEnv(hardware, token, deploy, engineArgsTemplate, toolCallParsers) - - targetID := deploy.SpaceID - // deployID is unique for space and model - if deploy.SpaceID == 0 && deploy.ModelID > 0 { - targetID = deploy.ID // support model deploy with multi-instance - } - - return &types.RunRequest{ - ID: targetID, - OrgName: fields[0], - RepoName: fields[1], - RepoType: t.repo.RepoType, - UserName: t.repo.UserName, - Annotation: annoMap, - Hardware: hardware, - Env: envMap, - GitPath: deploy.GitPath, - GitRef: deploy.GitBranch, - ImageID: deploy.ImageID, - DeployID: deploy.ID, - MinReplica: deploy.MinReplica, - MaxReplica: deploy.MaxReplica, - Accesstoken: token.Token, - ClusterID: deploy.ClusterID, - SvcName: deploy.SvcName, - DeployType: deploy.Type, - UserID: deploy.UserUUID, - Sku: deploy.SKU, - OrderDetailID: deploy.OrderDetailID, - TaskId: t.task.ID, - }, nil -} - -func (t *DeployRunner) makeDeployEnv( - hardware types.HardWare, - token *database.AccessToken, - deploy *database.Deploy, - engineArgsTemplate []types.EngineArg, - toolCallParsers map[string]string, -) map[string]string { - envMap, err := hubcom.JsonStrToMap(deploy.Env) - if err != nil { - slog.Error("deploy env is invalid json data", slog.Any("deploy", deploy)) - } - - varMap, err := hubcom.JsonStrToMap(deploy.Variables) - if err != nil { - slog.Error("deploy variables is invalid json data", slog.Any("deploy", deploy)) - } else { - for key, value := range varMap { - envMap[key] = value - } - } - - // for space and models - envMap["S3_INTERNAL"] = fmt.Sprintf("%v", t.deployCfg.S3Internal) - envMap["HTTPCloneURL"] = t.getHttpCloneURLWithToken(t.repo.HTTPCloneURL, token.User.Username, token.Token) - envMap["ACCESS_TOKEN"] = token.Token - envMap["REPO_ID"] = t.repo.Path // "namespace/name" - envMap["REVISION"] = deploy.GitBranch // branch - if len(engineArgsTemplate) > 0 { - ENGINE_ARGS := "" - argValuesMap, err := hubcom.JsonStrToMap(deploy.EngineArgs) - if err != nil { - slog.Error("deploy engine args is invalid json data", slog.Any("deploy", *deploy), slog.Any("error", err)) - } else { - for _, arg := range engineArgsTemplate { - if value, ok := argValuesMap[arg.Name]; ok { - ENGINE_ARGS += " " + fmt.Sprintf(arg.Format, value) - } - } - } - - // Process tool-calling arguments - if strings.Contains(ENGINE_ARGS, types.EngineArgEnableToolCalling) && len(toolCallParsers) > 0 { - modelArch := t.getModelArchitecture(deploy.RepoID) - if modelArch != "" { - if parser, ok := toolCallParsers[modelArch]; ok { - ENGINE_ARGS = strings.Replace(ENGINE_ARGS, types.EngineArgEnableToolCalling, types.EngineArgEnableToolCalling+" --tool-call-parser "+parser, 1) - slog.Info("Added tool-call-parser", slog.String("architecture", modelArch), slog.String("parser", parser)) - } else { - slog.Warn("No tool-call-parser found for architecture, removing "+types.EngineArgEnableToolCalling, slog.String("architecture", modelArch)) - ENGINE_ARGS = strings.ReplaceAll(ENGINE_ARGS, types.EngineArgEnableToolCalling, "") - } - } else { - slog.Warn("Model architecture not found, removing "+types.EngineArgEnableToolCalling) - ENGINE_ARGS = strings.ReplaceAll(ENGINE_ARGS, types.EngineArgEnableToolCalling, "") - } - } - - slog.Debug("makeDeployEnv", slog.Any("ENGINE_ARGS", ENGINE_ARGS)) - envMap["ENGINE_ARGS"] = ENGINE_ARGS - } - - common.UpdateEvaluationEnvHardware(envMap, hardware) - - if deploy.SpaceID > 0 { - // sdk port for space - switch t.repo.Sdk { - case types.GRADIO.Name: - envMap["port"] = strconv.Itoa(types.GRADIO.Port) - case types.STREAMLIT.Name: - envMap["port"] = strconv.Itoa(types.STREAMLIT.Port) - case types.NGINX.Name: - envMap["port"] = strconv.Itoa(types.NGINX.Port) - case types.DOCKER.Name: - envMap["port"] = strconv.Itoa(deploy.ContainerPort) - envMap["HF_ENDPOINT"] = t.deployCfg.ModelDownloadEndpoint - case types.MCPSERVER.Name: - envMap["port"] = strconv.Itoa(types.MCPSERVER.Port) - default: - envMap["port"] = strconv.Itoa(types.DefaultContainerPort) - } - } - - if deploy.Type == types.InferenceType || deploy.Type == types.ServerlessType { - // runtime framework port for model - envMap["port"] = strconv.Itoa(deploy.ContainerPort) - envMap["HF_ENDPOINT"] = t.deployCfg.ModelDownloadEndpoint // "https://hub.opencsg-stg.com/" - envMap["HF_HUB_OFFLINE"] = "1" - envMap["HF_TASK"] = string(deploy.Task) - } - - if deploy.Type == types.FinetuneType { - envMap["port"] = strconv.Itoa(deploy.ContainerPort) - envMap["HF_ENDPOINT"], _ = url.JoinPath(t.deployCfg.ModelDownloadEndpoint, "csg") - envMap["HF_TOKEN"] = token.Token - envMap["USE_CSGHUB_MODEL"] = "1" - envMap["USE_CSGHUB_DATASET"] = "1" - envMap["JUPYTER_ENABLE_LAB"] = "yes" - envMap["TERM"] = "xterm-256color" - } - if deploy.Type == types.NotebookType { - envMap["port"] = strconv.Itoa(deploy.ContainerPort) - } - - if t.deployCfg.PublicRootDomain == "" { - if deploy.Type == types.FinetuneType { - envMap["CONTEXT_PATH"] = "/endpoint/" + deploy.SvcName - } - if deploy.Type == types.SpaceType { - envMap["GRADIO_ROOT_PATH"] = "/endpoint/" + deploy.SvcName - envMap["STREAMLIT_SERVER_BASE_URL_PATH"] = "/endpoint/" + deploy.SvcName - } - } - - return envMap -} - -func (t *DeployRunner) cancelDeploy(ctx context.Context, orgName, repoName, reason string) error { - t.reporterLog(types.DeployCancelled.String(), types.StepCancelled) - targetID := t.task.Deploy.SpaceID - if t.task.Deploy.SpaceID == 0 { - // support model deploy with multi-instance - targetID = t.task.Deploy.ID - } - stopReq := &types.StopRequest{ - ID: targetID, - OrgName: orgName, - RepoName: repoName, - SvcName: t.task.Deploy.SvcName, - } - _, err := t.ir.Stop(ctx, stopReq) - if err != nil { - return fmt.Errorf("fail to undeploy space/model with err: %v", err) - } - t.deployFailed(reason) - return nil -} - -func (t *DeployRunner) getHttpCloneURLWithToken(httpCloneUrl, username, token string) string { - num := strings.Index(httpCloneUrl, "://") - if num > -1 { - return fmt.Sprintf("%s%s:%s@%s", httpCloneUrl[0:num+3], username, token, httpCloneUrl[num+3:]) - } - return httpCloneUrl -} - -func (t *DeployRunner) reporterLog(msg string, step types.Step) { - logEntry := types.LogEntry{ - Message: fmt.Sprintf("%s, task statues: %d", msg, t.task.Status), - Stage: types.StageDeploy, - Step: step, - DeployID: strconv.FormatInt(t.task.DeployID, 10), - Labels: map[string]string{ - types.LogLabelTypeKey: types.LogLabelDeploy, - }, - } - if nil != t.task.Deploy { - logEntry.Labels[types.StreamKeyDeployType] = strconv.Itoa(t.task.Deploy.Type) - logEntry.Labels[types.StreamKeyDeployTypeID] = strconv.FormatInt(t.task.ID, 10) - logEntry.Labels[types.StreamKeyDeployTaskID] = strconv.FormatInt(t.task.ID, 10) - } - t.logReporter.Report(logEntry) -} - -// getModelArchitecture reads the model architecture from metadata -func (t *DeployRunner) getModelArchitecture(repoID int64) string { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Get metadata from database - metadata, err := t.metadataStore.FindByRepoID(ctx, repoID) - if err != nil { - slog.Warn("Failed to get metadata from database", slog.String("error", err.Error()), slog.Int64("repo_id", repoID)) - return "" - } - - if metadata.Architecture != "" { - return metadata.Architecture - } - - slog.Warn("Model architecture not found in metadata", slog.Int64("repo_id", repoID)) - return "" -} diff --git a/builder/deploy/scheduler/deploy_runner_test.go b/builder/deploy/scheduler/deploy_runner_test.go deleted file mode 100644 index ef97a3c40..000000000 --- a/builder/deploy/scheduler/deploy_runner_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package scheduler - -import ( - mockReporter "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/component/reporter" - "testing" - "time" - - "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/imagerunner" - "opencsg.com/csghub-server/builder/store/database" -) - -func TestDeployRunner_getHttpCloneURLWithToken(t *testing.T) { - type fields struct { - repo *RepoInfo - task *database.DeployTask - ir imagerunner.Runner - store database.DeployTaskStore - tokenStore database.AccessTokenStore - urs database.UserResourcesStore - deployStartTime time.Time - deployCfg common.DeployConfig - } - type args struct { - httpCloneUrl string - username string - token string - } - tests := []struct { - name string - fields fields - args args - want string - }{ - {name: "test with username and token", fields: fields{}, args: args{httpCloneUrl: "https://opencsg.com/abc/def.git", username: "username", token: "token"}, want: "https://username:token@opencsg.com/abc/def.git"}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tr := &DeployRunner{ - repo: tt.fields.repo, - task: tt.fields.task, - ir: tt.fields.ir, - store: tt.fields.store, - tokenStore: tt.fields.tokenStore, - urs: tt.fields.urs, - deployStartTime: tt.fields.deployStartTime, - deployCfg: tt.fields.deployCfg, - logReporter: mockReporter.NewMockLogCollector(t), - } - if got := tr.getHttpCloneURLWithToken(tt.args.httpCloneUrl, tt.args.username, tt.args.token); got != tt.want { - t.Errorf("DeployRunner.getHttpCloneURLWithToken() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/builder/deploy/scheduler/runner.go b/builder/deploy/scheduler/runner.go deleted file mode 100644 index 4f9429b08..000000000 --- a/builder/deploy/scheduler/runner.go +++ /dev/null @@ -1,50 +0,0 @@ -package scheduler - -import ( - "context" - "log/slog" - "time" -) - -type Runner interface { - Run(context.Context) error - // WatchID is the unique ID for monitor service to watch the running progress - WatchID() int64 -} - -var ( - _ Runner = (*BuilderRunner)(nil) - _ Runner = (*DeployRunner)(nil) -) - -type sleepTask struct { - du time.Duration -} - -func (t *sleepTask) Run(ctx context.Context) error { - slog.Debug("sleeping task running", slog.Duration("time", t.du)) - time.Sleep(t.du) - return nil -} -func (t *sleepTask) WatchID() int64 { return 0 } - -const Cancelled = -1 - -const ( - BuildPending = 0 - BuildInQueue = 5 - BuildInProgress = 1 - BuildFailed = 2 - BuildSucceed = 3 - BuildSkip = 4 // export for other package -) - -// sub deploy task status -const ( - deployPending = 0 - deploying = 1 - deployFailed = 2 - deployStartUp = 3 - deployRunning = 4 - deployRunTimeError = 5 -) diff --git a/builder/deploy/scheduler/scheduler.go b/builder/deploy/scheduler/scheduler.go deleted file mode 100644 index 2360730fd..000000000 --- a/builder/deploy/scheduler/scheduler.go +++ /dev/null @@ -1,317 +0,0 @@ -package scheduler - -import ( - "context" - "database/sql" - "errors" - "fmt" - "log/slog" - "strconv" - "sync" - "time" - - "opencsg.com/csghub-server/component/reporter" - - "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/imagebuilder" - "opencsg.com/csghub-server/builder/deploy/imagerunner" - "opencsg.com/csghub-server/builder/git" - "opencsg.com/csghub-server/builder/git/gitserver" - "opencsg.com/csghub-server/builder/redis" - "opencsg.com/csghub-server/builder/store/database" - "opencsg.com/csghub-server/common/config" - "opencsg.com/csghub-server/common/types" - utilcommon "opencsg.com/csghub-server/common/utils/common" -) - -type Scheduler interface { - Run() error - Queue(deployTaskID int64) error -} - -// a Scheduler will run tasks in their arrival order -type FIFOScheduler struct { - timeout time.Duration - // parallel running tasks - tasks chan Runner - last *database.DeployTask - - store database.DeployTaskStore - spaceStore database.SpaceStore - modelStore database.ModelStore - spaceResourcesStore database.SpaceResourceStore - ib imagebuilder.Builder - ir imagerunner.Runner - - nextLock *sync.Mutex - deployCfg common.DeployConfig - config *config.Config - redisLocker *redis.DistributedLocker - logReporter reporter.LogCollector - git gitserver.GitServer -} - -func NewFIFOScheduler(ib imagebuilder.Builder, ir imagerunner.Runner, c common.DeployConfig, logReporter reporter.LogCollector) (Scheduler, error) { - s := &FIFOScheduler{} - // TODO:allow config - s.timeout = 30 * time.Minute - s.store = database.NewDeployTaskStore() - s.spaceStore = database.NewSpaceStore() - s.modelStore = database.NewModelStore() - s.spaceResourcesStore = database.NewSpaceResourceStore() - // allow concurrent deployment tasks - s.tasks = make(chan Runner, 100) - // s.ib = imagebuilder.NewLocalBuilder() - // s.ir = imagerunner.NewLocalRunner() - s.ib = ib - s.ir = ir - s.nextLock = &sync.Mutex{} - s.deployCfg = c - //TODO: avoid load config, use config from params - s.config, _ = config.LoadConfig() - s.redisLocker = c.RedisLocker - s.logReporter = logReporter - - gc, err := git.NewGitServer(s.config) - if err != nil { - newError := fmt.Errorf("fail to create git server,error:%w", err) - slog.Error(newError.Error()) - return nil, newError - } - - s.git = gc - - return s, nil -} - -// Run will load tasks and run them currently -func (rs *FIFOScheduler) Run() error { - slog.Info("FIFOScheduler run started") - - go func() { - for count := 0; count <= cap(rs.tasks); count++ { - _, err := rs.next() - if err != nil { - slog.Error("failed to get next task", "error", err) - continue - } - } - }() - - slog.Debug("scheduler try to loop through tasks channel") - for t := range rs.tasks { - go func(t Runner) { - slog.Debug("dequeue a task to run", slog.Any("task", t.WatchID())) - ctx, cancel := context.WithTimeout(context.Background(), rs.timeout) - defer cancel() - - if err := t.Run(ctx); err != nil { - slog.Error("failed to run task", slog.Any("error", err), slog.Any("task", t.WatchID())) - rs.failDeployFollowingTasks(t.WatchID(), err.Error()) - } - - _, _ = rs.next() - }(t) - } - - return nil -} - -func (rs *FIFOScheduler) Queue(deployTaskID int64) error { - // simply trigger next task - _, err := rs.next() - - return err -} - -// run next task -func (rs *FIFOScheduler) next() (Runner, error) { - var ( - deployTask *database.DeployTask - t Runner - err error - ) - // get lock here to prevent concurrent access to the same task - errLock := rs.redisLocker.GetDeployTaskSchedulerLock() - if errLock != nil && errors.Is(errLock, redis.ErrLockAcquire) { - slog.Debug("skip schedule deploy task due to fail getting lock", slog.Any("error", errLock)) - t = &sleepTask{ - du: 5 * time.Second, - } - rs.tasks <- t - return t, nil - } - defer func() { - if errLock != nil { - slog.Warn("fail to getting lock in deploy task scheduler", slog.Any("error", errLock)) - } else { - ok, err := rs.redisLocker.ReleaseDeployTaskSchedulerLock() - if !ok || err != nil { - slog.Error("failed to release deploy task scheduler lock", slog.Any("success", ok), slog.Any("error", err)) - } - } - }() - - rs.nextLock.Lock() - slog.Debug("FIFOScheduler try to get next task", slog.Any("last", rs.last)) - defer rs.nextLock.Unlock() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if rs.last == nil { - deployTask, err = rs.store.GetNewTaskFirst(ctx) - slog.Debug("GetNewTaskFirst", slog.Any("deploy_task", deployTask), slog.Any("error", err)) - } else { - deployTask, err = rs.store.GetNewTaskAfter(ctx, rs.last.ID) - slog.Debug("GetNewTaskAfter", slog.Any("deploy_task", deployTask), slog.Any("last", rs.last.ID), slog.Any("error", err)) - } - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - slog.Debug("no more tasks to run, schedule a sleeping task") - // using a sleep task to pause the scheduler - } else { - slog.Error("FIFOScheduler cannot get next task by db error", slog.Any("error", err)) - } - - t = &sleepTask{ - du: 5 * time.Second, - } - rs.tasks <- t - return t, nil - } - - var repo RepoInfo - - if deployTask.Deploy.SpaceID > 0 { - // handle space - var s *database.Space - s, err = rs.spaceStore.ByID(ctx, deployTask.Deploy.SpaceID) - if err == nil { - repoCloneInfo := utilcommon.BuildCloneInfo(rs.config, s.Repository) - repo.Path = s.Repository.Path - repo.Name = s.Repository.Name - repo.Sdk = s.Sdk - repo.SdkVersion = s.SdkVersion - repo.DriverVersion = s.DriverVersion - repo.HTTPCloneURL = repoCloneInfo.HTTPCloneURL - repo.SpaceID = s.ID - repo.RepoID = s.Repository.ID - repo.UserName = s.Repository.User.Username - repo.DeployID = deployTask.Deploy.ID - repo.ModelID = 0 - repo.RepoType = string(types.SpaceRepo) - } - } else if deployTask.Deploy.ModelID > 0 { - // handle model - var m *database.Model - m, err = rs.modelStore.ByID(ctx, deployTask.Deploy.ModelID) - if err == nil { - repo.Path = m.Repository.Path - repo.Name = m.Repository.Name - repo.ModelID = m.ID - repo.RepoID = m.Repository.ID - repo.UserName = m.Repository.User.Username - repo.DeployID = deployTask.Deploy.ID - repo.SpaceID = 0 - repo.RepoType = string(types.ModelRepo) - } - } else { - // for no repo case, e.g. notebook deploy - repo.Path = "/" - } - - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - slog.Warn("cancel deploy task as repo not found", slog.Any("deploy_task", deployTask)) - // mark task as cancelled - deployTask.Status = Cancelled - deployTask.Message = "repo not found" - err = rs.store.UpdateDeployTask(ctx, deployTask) - if err != nil { - slog.Error("update deploy task failed", "error", err) - } - } - t = &sleepTask{ - du: 5 * time.Second, - } - rs.last = deployTask - rs.tasks <- t - return t, nil - } - // for build task - if deployTask.TaskType == 0 { - t, err = NewBuildRunner(rs.git, rs.ib, &repo, deployTask, rs.logReporter, rs.config.Runner.HearBeatIntervalInSec) - if err != nil { - slog.Error("failed to create build runner", slog.Any("error", err)) - return nil, err - } - } else { - t = NewDeployRunner(rs.ir, &repo, deployTask, rs.deployCfg, rs.logReporter) - } - - rs.last = deployTask - rs.tasks <- t - slog.Info("enqueue next task", slog.Any("task", t.WatchID())) - return t, err -} - -func (rs *FIFOScheduler) failDeployFollowingTasks(deploytaskID int64, reason string) { - slog.Info("scheduler fail following tasks", slog.Any("deploy_task_id", deploytaskID)) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - t, _ := rs.store.GetDeployTask(ctx, deploytaskID) - - dps, err := rs.store.GetDeployTasksOfDeploy(ctx, t.DeployID) - if err != nil { - slog.Error("failed to get tasks of deploy when check build status", slog.Any("error", err), - slog.Int64("deploy_id", t.DeployID)) - return - } - - // update following tasks to be failed to stop scheduler to run it - for _, dp := range dps { - // fail current task - if dp.ID == t.ID { - dp.Status = BuildFailed - dp.Message = reason - continue - } - // tasks after current task - if dp.ID > t.ID { - dp.Status = Cancelled - dp.Message = "cancel as previous task failed" - } else { - dp.Status = deployFailed - dp.Message = reason - } - } - if err := rs.store.UpdateInTx(ctx, nil, []string{"status", "message"}, nil, dps...); err != nil { - slog.Error("failed update deploy status to `BuildFailed`", slog.Int64("deploy_task_id", t.ID), "error", err) - return - } - - deploy, err := rs.store.GetDeployByID(ctx, t.DeployID) - if err != nil { - slog.Error("failed to get deploy when check build status", slog.Any("error", err), - slog.Int64("deploy_id", t.DeployID)) - return - } - deploy.Status = common.DeployFailed - deploy.Message = reason - rs.logReporter.Report(types.LogEntry{ - Message: fmt.Sprintf("%s, task status: %d", reason, deploy.Status), - Stage: types.StageDeploy, - Step: types.StepDeployFailed, - DeployID: strconv.FormatInt(t.DeployID, 10), - Labels: map[string]string{ - types.LogLabelTypeKey: types.LogLabelDeploy, - types.StreamKeyDeployType: strconv.Itoa(deploy.Type), - types.StreamKeyDeployTaskID: strconv.FormatInt(deploytaskID, 10), - }, - }) - if err := rs.store.UpdateDeploy(ctx, deploy); err != nil { - slog.Error("failed update deploy status to `DeployFailed`", slog.Int64("deploy_id", t.DeployID), "error", err) - return - } - -} diff --git a/component/executors/webhook_executor_imagebuilder.go b/component/executors/webhook_executor_imagebuilder.go index bf3260c0b..7fbea4246 100644 --- a/component/executors/webhook_executor_imagebuilder.go +++ b/component/executors/webhook_executor_imagebuilder.go @@ -8,7 +8,6 @@ import ( "time" "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/config" "opencsg.com/csghub-server/common/types" @@ -85,14 +84,14 @@ func (h *imagebuilderExecutorImpl) ProcessEvent(ctx context.Context, event *type if task.Deploy.Status != common.BuildInQueue { return nil } - status = scheduler.BuildInProgress + status = common.TaskStatusBuildInProgress message = "build in progress" task.Deploy.Status = common.Building case string(v1alpha1.WorkflowSucceeded): if task.Deploy.Status != common.Building { return nil } - status = scheduler.BuildSucceed + status = common.TaskStatusBuildSucceed message = fmt.Sprintf("build success, image path: %s", data.ImagetPath) task.Deploy.ImageID = data.ImagetPath task.Deploy.Status = common.BuildSuccess @@ -100,27 +99,27 @@ func (h *imagebuilderExecutorImpl) ProcessEvent(ctx context.Context, event *type if task.Deploy.Status != common.Building { return nil } - status = scheduler.BuildFailed + status = common.TaskStatusBuildFailed task.Deploy.Status = common.BuildFailed case string(v1alpha1.WorkflowError): if task.Deploy.Status != common.Building { return nil } - status = scheduler.BuildFailed + status = common.TaskStatusBuildFailed task.Deploy.Status = common.BuildFailed default: return nil } - if task.Status == scheduler.Cancelled { + if task.Status == common.TaskStatusCancelled { return nil } - if task.Status == scheduler.BuildFailed { + if task.Status == common.TaskStatusBuildFailed { return nil } - if task.Status != scheduler.BuildInQueue && status <= task.Status { + if task.Status != common.TaskStatusBuildInQueue && status <= task.Status { return nil } diff --git a/component/executors/webhook_executor_imagebuilder_test.go b/component/executors/webhook_executor_imagebuilder_test.go index 949431947..facd50dca 100644 --- a/component/executors/webhook_executor_imagebuilder_test.go +++ b/component/executors/webhook_executor_imagebuilder_test.go @@ -10,7 +10,6 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/stretchr/testify/require" "opencsg.com/csghub-server/builder/deploy/common" - "opencsg.com/csghub-server/builder/deploy/scheduler" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/tests" "opencsg.com/csghub-server/common/types" @@ -35,7 +34,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { err = mockDb.CreateDeployTask(context.Background(), &database.DeployTask{ ID: taskId, DeployID: deployId, - Status: scheduler.BuildPending, + Status: common.TaskStatusBuildPending, }) require.Nil(t, err) data := types.ImageBuilderEvent{ @@ -61,7 +60,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { task, err := mockDb.GetDeployTask(context.Background(), taskId) require.Nil(t, err) - require.Equal(t, scheduler.BuildPending, task.Status) + require.Equal(t, common.TaskStatusBuildPending, task.Status) }) t.Run("WorkflowRunning", func(t *testing.T) { @@ -79,7 +78,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { err = mockDb.CreateDeployTask(context.Background(), &database.DeployTask{ ID: taskId, DeployID: deployId, - Status: scheduler.BuildInQueue, + Status: common.TaskStatusBuildInQueue, }) require.Nil(t, err) data := types.ImageBuilderEvent{ @@ -105,7 +104,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { task, err := mockDb.GetDeployTask(context.Background(), taskId) require.Nil(t, err) - require.Equal(t, scheduler.BuildInProgress, task.Status) + require.Equal(t, common.TaskStatusBuildInProgress, task.Status) }) t.Run("WorkflowSucceeded", func(t *testing.T) { @@ -148,7 +147,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { task, err := mockDb.GetDeployTask(context.Background(), taskId) require.Nil(t, err) - require.Equal(t, scheduler.BuildSucceed, task.Status) + require.Equal(t, common.TaskStatusBuildSucceed, task.Status) require.Equal(t, common.BuildSuccess, task.Deploy.Status) }) @@ -192,7 +191,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { task, err := mockDb.GetDeployTask(context.Background(), taskId) require.Nil(t, err) - require.Equal(t, scheduler.BuildFailed, task.Status) + require.Equal(t, common.TaskStatusBuildFailed, task.Status) require.Equal(t, common.BuildFailed, task.Deploy.Status) }) @@ -235,7 +234,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { task, err := mockDb.GetDeployTask(context.Background(), taskId) require.Nil(t, err) - require.NotEqual(t, scheduler.BuildSucceed, task.Status) + require.NotEqual(t, common.TaskStatusBuildSucceed, task.Status) require.NotEqual(t, common.BuildSuccess, task.Deploy.Status) }) @@ -278,7 +277,7 @@ func TestImageBuilderExecutor_ProcessEvent(t *testing.T) { task, err := mockDb.GetDeployTask(context.Background(), taskId) require.Nil(t, err) - require.NotEqual(t, scheduler.BuildFailed, task.Status) + require.NotEqual(t, common.TaskStatusBuildFailed, task.Status) require.NotEqual(t, common.BuildFailed, task.Deploy.Status) })