Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions api/workflow/activity/deploy_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/deploy/imagebuilder"
"opencsg.com/csghub-server/builder/deploy/imagerunner"
"opencsg.com/csghub-server/builder/deploy/scheduler"
"opencsg.com/csghub-server/builder/git/gitserver"
"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/errorx"
Expand All @@ -32,12 +31,12 @@ const (
)

const (
DeployStatusPending = 0
DeployStatusDeploying = 1
DeployStatusFailed = 2
DeployStatusStartUp = 3
DeployStatusRunning = 4
DeployStatusRunTimeError = 5
DeployStatusPending = common.TaskStatusDeployPending
DeployStatusDeploying = common.TaskStatusDeploying
DeployStatusFailed = common.TaskStatusDeployFailed
DeployStatusStartUp = common.TaskStatusDeployStartUp
DeployStatusRunning = common.TaskStatusDeployRunning
DeployStatusRunTimeError = common.TaskStatusDeployRunTimeError
)

type DeployActivity struct {
Expand Down Expand Up @@ -145,7 +144,7 @@ func (a *DeployActivity) Build(ctx context.Context, taskId int64) error {
if err != nil {
return fmt.Errorf("failed to get deploy task: %w", err)
}
if task.Status == scheduler.BuildSkip {
if task.Status == common.TaskStatusBuildSkip {
return nil
}
repoInfo, err := a.getRepositoryInfo(ctx, task)
Expand Down Expand Up @@ -173,7 +172,7 @@ func (a *DeployActivity) getLogger(ctx context.Context) log.Logger {
}

// pollBuildStatus
func (a *DeployActivity) pollBuildStatus(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo, buildRequest *types.ImageBuilderRequest) error {
func (a *DeployActivity) pollBuildStatus(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo, buildRequest *types.ImageBuilderRequest) error {
continueLoop, err := a.checkBuildStatus(ctx, task, buildRequest)
if err != nil {
return err
Expand Down Expand Up @@ -219,23 +218,28 @@ func (a *DeployActivity) checkBuildStatus(ctx context.Context, task *database.De
}

switch {
case updatedTask.Status == scheduler.BuildPending:
case updatedTask.Status == common.TaskStatusBuildPending:
if err := a.ib.Build(ctx, buildRequest); err != nil {
if herr := a.handleBuildError(task, err); herr != nil {
a.getLogger(ctx).Error("Build failed", "task_id", task.ID, "error", err)
return false, herr
}

a.getLogger(ctx).Error("Build failed", "task_id", task.ID, "error", err)
a.reportLog(types.BuildFailed.String()+": \n"+err.Error(), types.StepBuildFailed, task)
return false, fmt.Errorf("build failed: %w", err)
}
if err := a.handleBuildTaskToBuildInQueue(task); err != nil {
a.getLogger(ctx).Error("Failed to handle build task to build in queue", "task_id", task.ID, "error", err)
return false, err
}
a.reportLog(types.BuildInProgress.String(), types.StepBuildInProgress, task)
return true, nil
case updatedTask.Status == scheduler.BuildFailed:
case updatedTask.Status == common.TaskStatusBuildFailed:
a.getLogger(ctx).Info("Build task failed", "task_id", task.ID, "status", updatedTask.Status)
return false, fmt.Errorf("build task failed: %s", updatedTask.Message)
case updatedTask.Status == scheduler.BuildSucceed:
case updatedTask.Status == common.TaskStatusBuildSucceed:
a.getLogger(ctx).Info("Build task succeed", "task_id", task.ID, "status", updatedTask.Status)
return false, nil
case a.isTaskTimedOut(updatedTask):
a.reportLog("build task timeout", types.StepBuildFailed, task)
Expand Down Expand Up @@ -264,8 +268,8 @@ func (a *DeployActivity) isTaskTimedOut(task *database.DeployTask) bool {
}

// getRepositoryInfo
func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.DeployTask) (scheduler.RepoInfo, error) {
var repoInfo scheduler.RepoInfo
func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.DeployTask) (common.RepoInfo, error) {
var repoInfo common.RepoInfo

if task.Deploy.SpaceID > 0 {
space, err := a.ss.ByID(ctx, task.Deploy.SpaceID)
Expand All @@ -288,10 +292,10 @@ func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.D
}

// createSpaceRepoInfo
func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int64) scheduler.RepoInfo {
func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int64) common.RepoInfo {
cloneInfo := utilcommon.BuildCloneInfoByDomain(a.cfg.PublicDomain, a.cfg.SSHDomain, space.Repository)

return scheduler.RepoInfo{
return common.RepoInfo{
Path: space.Repository.Path,
Name: space.Repository.Name,
Sdk: space.Sdk,
Expand All @@ -308,8 +312,8 @@ func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int
}

// createModelRepoInfo
func (a *DeployActivity) createModelRepoInfo(model *database.Model, deployID int64) scheduler.RepoInfo {
return scheduler.RepoInfo{
func (a *DeployActivity) createModelRepoInfo(model *database.Model, deployID int64) common.RepoInfo {
return common.RepoInfo{
Path: model.Repository.Path,
Name: model.Repository.Name,
ModelID: model.ID,
Expand Down Expand Up @@ -358,7 +362,7 @@ func (a *DeployActivity) updateTaskStatus(task *database.DeployTask) error {

// handleRepositoryNotFound
func (a *DeployActivity) handleRepositoryNotFound(task *database.DeployTask) error {
task.Status = scheduler.BuildFailed
task.Status = common.TaskStatusBuildFailed
task.Message = "repository not found, please check the repository path"
task.Deploy.Status = common.BuildFailed
if err := a.updateTaskStatus(task); err != nil {
Expand All @@ -370,7 +374,7 @@ func (a *DeployActivity) handleRepositoryNotFound(task *database.DeployTask) err
func (a *DeployActivity) handleBuildCancelled(task *database.DeployTask) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(5))
defer cancel()
task.Status = scheduler.Cancelled
task.Status = common.TaskStatusCancelled
task.Message = "Cancelled"
if err := a.ds.UpdateDeployTask(ctx, task); err != nil {
return fmt.Errorf("handleBuildCancelled failed to update deploy task status: %w", err)
Expand All @@ -380,7 +384,7 @@ func (a *DeployActivity) handleBuildCancelled(task *database.DeployTask) error {
}

func (a *DeployActivity) handleBuildTaskTimeout(task *database.DeployTask) error {
task.Status = scheduler.BuildFailed
task.Status = common.TaskStatusBuildFailed
task.Message = "build task timeout"
task.Deploy.Status = common.BuildFailed

Expand All @@ -393,7 +397,7 @@ func (a *DeployActivity) handleBuildTaskTimeout(task *database.DeployTask) error

// handleBuildError
func (a *DeployActivity) handleBuildError(task *database.DeployTask, err error) error {
task.Status = scheduler.BuildFailed
task.Status = common.TaskStatusBuildFailed
task.Message = fmt.Sprintf("build task failed: %s", err.Error())
task.Deploy.Status = common.BuildFailed

Expand All @@ -405,7 +409,7 @@ func (a *DeployActivity) handleBuildError(task *database.DeployTask, err error)

// updateTaskStatusToBuildInQueue
func (a *DeployActivity) handleBuildTaskToBuildInQueue(task *database.DeployTask) error {
task.Status = scheduler.BuildInQueue
task.Status = common.TaskStatusBuildInQueue
task.Message = "build in queue"
task.Deploy.Status = common.BuildInQueue

Expand Down Expand Up @@ -455,7 +459,7 @@ func (a *DeployActivity) reportLog(message string, step types.Step, task *databa
}

// createBuildRequest
func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo) (*types.ImageBuilderRequest, error) {
func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo) (*types.ImageBuilderRequest, error) {
accessToken, err := a.ts.FindByUID(ctx, task.Deploy.UserID)
if err != nil {
return nil, fmt.Errorf("failed to get git access token: %w", err)
Expand Down Expand Up @@ -501,7 +505,7 @@ func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.
}

// createDeployRequest
func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo) (*types.RunRequest, error) {
func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo) (*types.RunRequest, error) {
logger := a.getLogger(ctx)

accessToken, err := a.ts.FindByUID(ctx, task.Deploy.UserID)
Expand Down Expand Up @@ -584,7 +588,7 @@ func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database
}, nil
}

func (a *DeployActivity) determineSDKVersion(repoInfo scheduler.RepoInfo) string {
func (a *DeployActivity) determineSDKVersion(repoInfo common.RepoInfo) string {
if repoInfo.SdkVersion != "" {
return repoInfo.SdkVersion
}
Expand All @@ -606,7 +610,7 @@ func (a *DeployActivity) parseHardware(input string) string {
return "cpu"
}

func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo scheduler.RepoInfo) {
func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo common.RepoInfo) {
stopCtx, stopCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer stopCancel()
paths := strings.Split(repoInfo.Path, "/")
Expand All @@ -624,7 +628,7 @@ func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo sche
}

// makeDeployEnv
func (a *DeployActivity) makeDeployEnv(ctx context.Context, hardware types.HardWare, accessToken *database.AccessToken, deployInfo *database.Deploy, engineArgsTemplates []types.EngineArg, toolCallParsers map[string]string, repoInfo scheduler.RepoInfo) (map[string]string, error) {
func (a *DeployActivity) makeDeployEnv(ctx context.Context, hardware types.HardWare, accessToken *database.AccessToken, deployInfo *database.Deploy, engineArgsTemplates []types.EngineArg, toolCallParsers map[string]string, repoInfo common.RepoInfo) (map[string]string, error) {
logger := a.getLogger(ctx)

envMap, err := utilcommon.JsonStrToMap(deployInfo.Env)
Expand Down
12 changes: 6 additions & 6 deletions api/workflow/activity/deploy_activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/deploy/scheduler"
"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/config"
"opencsg.com/csghub-server/common/types"
Expand Down Expand Up @@ -43,6 +42,7 @@ type testEnv struct {

func setupTest(t *testing.T) *testEnv {
ctx := context.Background()
ctx = context.WithValue(ctx, "test", "test")

// Create mock dependencies
mockDeployTaskStore := mockdb.NewMockDeployTaskStore(t)
Expand Down Expand Up @@ -98,36 +98,36 @@ func TestActivities_determineSDKVersion(t *testing.T) {
// Test cases
testCases := []struct {
name string
repoInfo scheduler.RepoInfo
repoInfo common.RepoInfo
expectedSDK string
}{
{
name: "With explicit SDK version",
repoInfo: scheduler.RepoInfo{
repoInfo: common.RepoInfo{
SdkVersion: "custom-version",
Sdk: "some-other-sdk",
},
expectedSDK: "custom-version",
},
{
name: "With GRADIO SDK",
repoInfo: scheduler.RepoInfo{
repoInfo: common.RepoInfo{
SdkVersion: "",
Sdk: types.GRADIO.Name,
},
expectedSDK: types.GRADIO.Version,
},
{
name: "With STREAMLIT SDK",
repoInfo: scheduler.RepoInfo{
repoInfo: common.RepoInfo{
SdkVersion: "",
Sdk: types.STREAMLIT.Name,
},
expectedSDK: types.STREAMLIT.Version,
},
{
name: "With unknown SDK",
repoInfo: scheduler.RepoInfo{
repoInfo: common.RepoInfo{
SdkVersion: "",
Sdk: "unknown-sdk",
},
Expand Down
5 changes: 2 additions & 3 deletions api/workflow/deployer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.temporal.io/sdk/testsuite"
"opencsg.com/csghub-server/api/workflow/activity"
"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/deploy/scheduler"
"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/config"
"opencsg.com/csghub-server/common/types"
Expand Down Expand Up @@ -120,7 +119,7 @@ func TestDeployWorkflowSuccess(t *testing.T) {
ID: 1,
DeployID: deploy.ID,
Deploy: deploy,
Status: scheduler.BuildSkip,
Status: common.TaskStatusBuildSkip,
}

runTask := &database.DeployTask{
Expand All @@ -137,7 +136,7 @@ func TestDeployWorkflowSuccess(t *testing.T) {
}, nil)

mockDeployTaskStore.EXPECT().GetDeployTask(mock.Anything, buildTask.ID).Return(buildTask, nil)
buildTask.Status = scheduler.BuildSucceed
buildTask.Status = common.TaskStatusBuildSucceed

// deploy
mockLogReporter.EXPECT().Report(mock.Anything).Return().Maybe()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package scheduler
package common

type Config struct {
ImageBuilderURL string `json:"image_builder_url"`
Expand Down
20 changes: 20 additions & 0 deletions builder/deploy/common/status.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
package common

// sub build task status
const (
TaskStatusCancelled = -1
TaskStatusBuildPending = 0
TaskStatusBuildInProgress = 1
TaskStatusBuildFailed = 2
TaskStatusBuildSucceed = 3
TaskStatusBuildSkip = 4 // export for other package
TaskStatusBuildInQueue = 5
)

// sub deploy task status
const (
TaskStatusDeployPending = 0
TaskStatusDeploying = 1
TaskStatusDeployFailed = 2
TaskStatusDeployStartUp = 3
TaskStatusDeployRunning = 4
TaskStatusDeployRunTimeError = 5
)

// deploy status
const (
Pending = 0
// step one: build
Expand Down
3 changes: 0 additions & 3 deletions builder/deploy/deploy_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
mockbuilder "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagebuilder"
mockrunner "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner"
mockScheduler "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/scheduler"
mockdb "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/store/database"
Expand All @@ -36,8 +35,6 @@ func newTestDeployer(t *testing.T) *testDepolyerWithMocks {
},
}
s.mocks.stores = mockStores
s.mocks.scheduler = mockScheduler.NewMockScheduler(t)
s.scheduler = s.mocks.scheduler
s.mocks.builder = mockbuilder.NewMockBuilder(t)
s.imageBuilder = s.mocks.builder
s.mocks.runner = mockrunner.NewMockRunner(t)
Expand Down
3 changes: 1 addition & 2 deletions builder/deploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/google/uuid"
"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/deploy/scheduler"
"opencsg.com/csghub-server/builder/loki"
"opencsg.com/csghub-server/builder/redis"
"opencsg.com/csghub-server/builder/store/database"
Expand Down Expand Up @@ -214,7 +213,7 @@ func (d *deployer) Deploy(ctx context.Context, dr types.DeployRepo) (int64, erro
imgStrLen := len(strings.Trim(deploy.ImageID, " "))
slog.Debug("do deployer.Deploy check image", slog.Any("deploy.ImageID", deploy.ImageID), slog.Any("imgStrLen", imgStrLen))
if imgStrLen > 0 {
bldTaskStatus = scheduler.BuildSkip
bldTaskStatus = common.TaskStatusBuildSkip
bldTaskMsg = "Skip"
}
slog.Debug("create build task", slog.Any("bldTaskStatus", bldTaskStatus), slog.Any("bldTaskMsg", bldTaskMsg))
Expand Down
5 changes: 1 addition & 4 deletions builder/deploy/deployer_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"opencsg.com/csghub-server/builder/deploy/common"
"opencsg.com/csghub-server/builder/deploy/imagebuilder"
"opencsg.com/csghub-server/builder/deploy/imagerunner"
"opencsg.com/csghub-server/builder/deploy/scheduler"
"opencsg.com/csghub-server/builder/event"
"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/config"
Expand All @@ -25,7 +24,6 @@ import (
)

type deployer struct {
scheduler scheduler.Scheduler
imageBuilder imagebuilder.Builder
imageRunner imagerunner.Runner

Expand All @@ -45,7 +43,7 @@ type deployer struct {
logReporter reporter.LogCollector
}

func newDeployer(s scheduler.Scheduler, ib imagebuilder.Builder, ir imagerunner.Runner, c common.DeployConfig, logReporter reporter.LogCollector, cfg *config.Config, startJobs bool) (*deployer, error) {
func newDeployer(ib imagebuilder.Builder, ir imagerunner.Runner, c common.DeployConfig, logReporter reporter.LogCollector, cfg *config.Config, startJobs bool) (*deployer, error) {

store := database.NewDeployTaskStore()
node, err := snowflake.NewNode(1)
Expand All @@ -55,7 +53,6 @@ func newDeployer(s scheduler.Scheduler, ib imagebuilder.Builder, ir imagerunner.
}

d := &deployer{
scheduler: s,
imageBuilder: ib,
imageRunner: ir,
deployTaskStore: store,
Expand Down
Loading
Loading