Skip to content

Commit 4219606

Browse files
QinYuuuuDev Agent
andauthored
clean deploy code (#659)
* clean deploy code * remove unused --------- Co-authored-by: Dev Agent <[email protected]>
1 parent 99232a1 commit 4219606

17 files changed

+80
-1230
lines changed

api/workflow/activity/deploy_activity.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"opencsg.com/csghub-server/builder/deploy/common"
1818
"opencsg.com/csghub-server/builder/deploy/imagebuilder"
1919
"opencsg.com/csghub-server/builder/deploy/imagerunner"
20-
"opencsg.com/csghub-server/builder/deploy/scheduler"
2120
"opencsg.com/csghub-server/builder/git/gitserver"
2221
"opencsg.com/csghub-server/builder/store/database"
2322
"opencsg.com/csghub-server/common/errorx"
@@ -32,12 +31,12 @@ const (
3231
)
3332

3433
const (
35-
DeployStatusPending = 0
36-
DeployStatusDeploying = 1
37-
DeployStatusFailed = 2
38-
DeployStatusStartUp = 3
39-
DeployStatusRunning = 4
40-
DeployStatusRunTimeError = 5
34+
DeployStatusPending = common.TaskStatusDeployPending
35+
DeployStatusDeploying = common.TaskStatusDeploying
36+
DeployStatusFailed = common.TaskStatusDeployFailed
37+
DeployStatusStartUp = common.TaskStatusDeployStartUp
38+
DeployStatusRunning = common.TaskStatusDeployRunning
39+
DeployStatusRunTimeError = common.TaskStatusDeployRunTimeError
4140
)
4241

4342
type DeployActivity struct {
@@ -145,7 +144,7 @@ func (a *DeployActivity) Build(ctx context.Context, taskId int64) error {
145144
if err != nil {
146145
return fmt.Errorf("failed to get deploy task: %w", err)
147146
}
148-
if task.Status == scheduler.BuildSkip {
147+
if task.Status == common.TaskStatusBuildSkip {
149148
return nil
150149
}
151150
repoInfo, err := a.getRepositoryInfo(ctx, task)
@@ -173,7 +172,7 @@ func (a *DeployActivity) getLogger(ctx context.Context) log.Logger {
173172
}
174173

175174
// pollBuildStatus
176-
func (a *DeployActivity) pollBuildStatus(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo, buildRequest *types.ImageBuilderRequest) error {
175+
func (a *DeployActivity) pollBuildStatus(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo, buildRequest *types.ImageBuilderRequest) error {
177176
continueLoop, err := a.checkBuildStatus(ctx, task, buildRequest)
178177
if err != nil {
179178
return err
@@ -219,23 +218,28 @@ func (a *DeployActivity) checkBuildStatus(ctx context.Context, task *database.De
219218
}
220219

221220
switch {
222-
case updatedTask.Status == scheduler.BuildPending:
221+
case updatedTask.Status == common.TaskStatusBuildPending:
223222
if err := a.ib.Build(ctx, buildRequest); err != nil {
224223
if herr := a.handleBuildError(task, err); herr != nil {
224+
a.getLogger(ctx).Error("Build failed", "task_id", task.ID, "error", err)
225225
return false, herr
226226
}
227227

228+
a.getLogger(ctx).Error("Build failed", "task_id", task.ID, "error", err)
228229
a.reportLog(types.BuildFailed.String()+": \n"+err.Error(), types.StepBuildFailed, task)
229230
return false, fmt.Errorf("build failed: %w", err)
230231
}
231232
if err := a.handleBuildTaskToBuildInQueue(task); err != nil {
233+
a.getLogger(ctx).Error("Failed to handle build task to build in queue", "task_id", task.ID, "error", err)
232234
return false, err
233235
}
234236
a.reportLog(types.BuildInProgress.String(), types.StepBuildInProgress, task)
235237
return true, nil
236-
case updatedTask.Status == scheduler.BuildFailed:
238+
case updatedTask.Status == common.TaskStatusBuildFailed:
239+
a.getLogger(ctx).Info("Build task failed", "task_id", task.ID, "status", updatedTask.Status)
237240
return false, fmt.Errorf("build task failed: %s", updatedTask.Message)
238-
case updatedTask.Status == scheduler.BuildSucceed:
241+
case updatedTask.Status == common.TaskStatusBuildSucceed:
242+
a.getLogger(ctx).Info("Build task succeed", "task_id", task.ID, "status", updatedTask.Status)
239243
return false, nil
240244
case a.isTaskTimedOut(updatedTask):
241245
a.reportLog("build task timeout", types.StepBuildFailed, task)
@@ -264,8 +268,8 @@ func (a *DeployActivity) isTaskTimedOut(task *database.DeployTask) bool {
264268
}
265269

266270
// getRepositoryInfo
267-
func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.DeployTask) (scheduler.RepoInfo, error) {
268-
var repoInfo scheduler.RepoInfo
271+
func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.DeployTask) (common.RepoInfo, error) {
272+
var repoInfo common.RepoInfo
269273

270274
if task.Deploy.SpaceID > 0 {
271275
space, err := a.ss.ByID(ctx, task.Deploy.SpaceID)
@@ -288,10 +292,10 @@ func (a *DeployActivity) getRepositoryInfo(ctx context.Context, task *database.D
288292
}
289293

290294
// createSpaceRepoInfo
291-
func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int64) scheduler.RepoInfo {
295+
func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int64) common.RepoInfo {
292296
cloneInfo := utilcommon.BuildCloneInfoByDomain(a.cfg.PublicDomain, a.cfg.SSHDomain, space.Repository)
293297

294-
return scheduler.RepoInfo{
298+
return common.RepoInfo{
295299
Path: space.Repository.Path,
296300
Name: space.Repository.Name,
297301
Sdk: space.Sdk,
@@ -308,8 +312,8 @@ func (a *DeployActivity) createSpaceRepoInfo(space *database.Space, deployID int
308312
}
309313

310314
// createModelRepoInfo
311-
func (a *DeployActivity) createModelRepoInfo(model *database.Model, deployID int64) scheduler.RepoInfo {
312-
return scheduler.RepoInfo{
315+
func (a *DeployActivity) createModelRepoInfo(model *database.Model, deployID int64) common.RepoInfo {
316+
return common.RepoInfo{
313317
Path: model.Repository.Path,
314318
Name: model.Repository.Name,
315319
ModelID: model.ID,
@@ -358,7 +362,7 @@ func (a *DeployActivity) updateTaskStatus(task *database.DeployTask) error {
358362

359363
// handleRepositoryNotFound
360364
func (a *DeployActivity) handleRepositoryNotFound(task *database.DeployTask) error {
361-
task.Status = scheduler.BuildFailed
365+
task.Status = common.TaskStatusBuildFailed
362366
task.Message = "repository not found, please check the repository path"
363367
task.Deploy.Status = common.BuildFailed
364368
if err := a.updateTaskStatus(task); err != nil {
@@ -370,7 +374,7 @@ func (a *DeployActivity) handleRepositoryNotFound(task *database.DeployTask) err
370374
func (a *DeployActivity) handleBuildCancelled(task *database.DeployTask) error {
371375
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(5))
372376
defer cancel()
373-
task.Status = scheduler.Cancelled
377+
task.Status = common.TaskStatusCancelled
374378
task.Message = "Cancelled"
375379
if err := a.ds.UpdateDeployTask(ctx, task); err != nil {
376380
return fmt.Errorf("handleBuildCancelled failed to update deploy task status: %w", err)
@@ -380,7 +384,7 @@ func (a *DeployActivity) handleBuildCancelled(task *database.DeployTask) error {
380384
}
381385

382386
func (a *DeployActivity) handleBuildTaskTimeout(task *database.DeployTask) error {
383-
task.Status = scheduler.BuildFailed
387+
task.Status = common.TaskStatusBuildFailed
384388
task.Message = "build task timeout"
385389
task.Deploy.Status = common.BuildFailed
386390

@@ -393,7 +397,7 @@ func (a *DeployActivity) handleBuildTaskTimeout(task *database.DeployTask) error
393397

394398
// handleBuildError
395399
func (a *DeployActivity) handleBuildError(task *database.DeployTask, err error) error {
396-
task.Status = scheduler.BuildFailed
400+
task.Status = common.TaskStatusBuildFailed
397401
task.Message = fmt.Sprintf("build task failed: %s", err.Error())
398402
task.Deploy.Status = common.BuildFailed
399403

@@ -405,7 +409,7 @@ func (a *DeployActivity) handleBuildError(task *database.DeployTask, err error)
405409

406410
// updateTaskStatusToBuildInQueue
407411
func (a *DeployActivity) handleBuildTaskToBuildInQueue(task *database.DeployTask) error {
408-
task.Status = scheduler.BuildInQueue
412+
task.Status = common.TaskStatusBuildInQueue
409413
task.Message = "build in queue"
410414
task.Deploy.Status = common.BuildInQueue
411415

@@ -455,7 +459,7 @@ func (a *DeployActivity) reportLog(message string, step types.Step, task *databa
455459
}
456460

457461
// createBuildRequest
458-
func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo) (*types.ImageBuilderRequest, error) {
462+
func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo) (*types.ImageBuilderRequest, error) {
459463
accessToken, err := a.ts.FindByUID(ctx, task.Deploy.UserID)
460464
if err != nil {
461465
return nil, fmt.Errorf("failed to get git access token: %w", err)
@@ -501,7 +505,7 @@ func (a *DeployActivity) createBuildRequest(ctx context.Context, task *database.
501505
}
502506

503507
// createDeployRequest
504-
func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database.DeployTask, repoInfo scheduler.RepoInfo) (*types.RunRequest, error) {
508+
func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database.DeployTask, repoInfo common.RepoInfo) (*types.RunRequest, error) {
505509
logger := a.getLogger(ctx)
506510

507511
accessToken, err := a.ts.FindByUID(ctx, task.Deploy.UserID)
@@ -584,7 +588,7 @@ func (a *DeployActivity) createDeployRequest(ctx context.Context, task *database
584588
}, nil
585589
}
586590

587-
func (a *DeployActivity) determineSDKVersion(repoInfo scheduler.RepoInfo) string {
591+
func (a *DeployActivity) determineSDKVersion(repoInfo common.RepoInfo) string {
588592
if repoInfo.SdkVersion != "" {
589593
return repoInfo.SdkVersion
590594
}
@@ -606,7 +610,7 @@ func (a *DeployActivity) parseHardware(input string) string {
606610
return "cpu"
607611
}
608612

609-
func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo scheduler.RepoInfo) {
613+
func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo common.RepoInfo) {
610614
stopCtx, stopCancel := context.WithTimeout(context.Background(), 5*time.Second)
611615
defer stopCancel()
612616
paths := strings.Split(repoInfo.Path, "/")
@@ -624,7 +628,7 @@ func (a *DeployActivity) stopBuild(buildTask *database.DeployTask, repoInfo sche
624628
}
625629

626630
// makeDeployEnv
627-
func (a *DeployActivity) makeDeployEnv(ctx context.Context, hardware types.HardWare, accessToken *database.AccessToken, deployInfo *database.Deploy, engineArgsTemplates []types.EngineArg, toolCallParsers map[string]string, repoInfo scheduler.RepoInfo) (map[string]string, error) {
631+
func (a *DeployActivity) makeDeployEnv(ctx context.Context, hardware types.HardWare, accessToken *database.AccessToken, deployInfo *database.Deploy, engineArgsTemplates []types.EngineArg, toolCallParsers map[string]string, repoInfo common.RepoInfo) (map[string]string, error) {
628632
logger := a.getLogger(ctx)
629633

630634
envMap, err := utilcommon.JsonStrToMap(deployInfo.Env)

api/workflow/activity/deploy_activity_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"opencsg.com/csghub-server/builder/deploy/common"
10-
"opencsg.com/csghub-server/builder/deploy/scheduler"
1110
"opencsg.com/csghub-server/builder/store/database"
1211
"opencsg.com/csghub-server/common/config"
1312
"opencsg.com/csghub-server/common/types"
@@ -43,6 +42,7 @@ type testEnv struct {
4342

4443
func setupTest(t *testing.T) *testEnv {
4544
ctx := context.Background()
45+
ctx = context.WithValue(ctx, "test", "test")
4646

4747
// Create mock dependencies
4848
mockDeployTaskStore := mockdb.NewMockDeployTaskStore(t)
@@ -98,36 +98,36 @@ func TestActivities_determineSDKVersion(t *testing.T) {
9898
// Test cases
9999
testCases := []struct {
100100
name string
101-
repoInfo scheduler.RepoInfo
101+
repoInfo common.RepoInfo
102102
expectedSDK string
103103
}{
104104
{
105105
name: "With explicit SDK version",
106-
repoInfo: scheduler.RepoInfo{
106+
repoInfo: common.RepoInfo{
107107
SdkVersion: "custom-version",
108108
Sdk: "some-other-sdk",
109109
},
110110
expectedSDK: "custom-version",
111111
},
112112
{
113113
name: "With GRADIO SDK",
114-
repoInfo: scheduler.RepoInfo{
114+
repoInfo: common.RepoInfo{
115115
SdkVersion: "",
116116
Sdk: types.GRADIO.Name,
117117
},
118118
expectedSDK: types.GRADIO.Version,
119119
},
120120
{
121121
name: "With STREAMLIT SDK",
122-
repoInfo: scheduler.RepoInfo{
122+
repoInfo: common.RepoInfo{
123123
SdkVersion: "",
124124
Sdk: types.STREAMLIT.Name,
125125
},
126126
expectedSDK: types.STREAMLIT.Version,
127127
},
128128
{
129129
name: "With unknown SDK",
130-
repoInfo: scheduler.RepoInfo{
130+
repoInfo: common.RepoInfo{
131131
SdkVersion: "",
132132
Sdk: "unknown-sdk",
133133
},

api/workflow/deployer_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"go.temporal.io/sdk/testsuite"
1616
"opencsg.com/csghub-server/api/workflow/activity"
1717
"opencsg.com/csghub-server/builder/deploy/common"
18-
"opencsg.com/csghub-server/builder/deploy/scheduler"
1918
"opencsg.com/csghub-server/builder/store/database"
2019
"opencsg.com/csghub-server/common/config"
2120
"opencsg.com/csghub-server/common/types"
@@ -120,7 +119,7 @@ func TestDeployWorkflowSuccess(t *testing.T) {
120119
ID: 1,
121120
DeployID: deploy.ID,
122121
Deploy: deploy,
123-
Status: scheduler.BuildSkip,
122+
Status: common.TaskStatusBuildSkip,
124123
}
125124

126125
runTask := &database.DeployTask{
@@ -137,7 +136,7 @@ func TestDeployWorkflowSuccess(t *testing.T) {
137136
}, nil)
138137

139138
mockDeployTaskStore.EXPECT().GetDeployTask(mock.Anything, buildTask.ID).Return(buildTask, nil)
140-
buildTask.Status = scheduler.BuildSucceed
139+
buildTask.Status = common.TaskStatusBuildSucceed
141140

142141
// deploy
143142
mockLogReporter.EXPECT().Report(mock.Anything).Return().Maybe()
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package scheduler
1+
package common
22

33
type Config struct {
44
ImageBuilderURL string `json:"image_builder_url"`

builder/deploy/common/status.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,27 @@
11
package common
22

33
// sub build task status
4+
const (
5+
TaskStatusCancelled = -1
6+
TaskStatusBuildPending = 0
7+
TaskStatusBuildInProgress = 1
8+
TaskStatusBuildFailed = 2
9+
TaskStatusBuildSucceed = 3
10+
TaskStatusBuildSkip = 4 // export for other package
11+
TaskStatusBuildInQueue = 5
12+
)
13+
14+
// sub deploy task status
15+
const (
16+
TaskStatusDeployPending = 0
17+
TaskStatusDeploying = 1
18+
TaskStatusDeployFailed = 2
19+
TaskStatusDeployStartUp = 3
20+
TaskStatusDeployRunning = 4
21+
TaskStatusDeployRunTimeError = 5
22+
)
423

24+
// deploy status
525
const (
626
Pending = 0
727
// step one: build

builder/deploy/deploy_ce_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/stretchr/testify/require"
1414
mockbuilder "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagebuilder"
1515
mockrunner "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/imagerunner"
16-
mockScheduler "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/deploy/scheduler"
1716
mockdb "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/store/database"
1817
"opencsg.com/csghub-server/builder/deploy/common"
1918
"opencsg.com/csghub-server/builder/store/database"
@@ -36,8 +35,6 @@ func newTestDeployer(t *testing.T) *testDepolyerWithMocks {
3635
},
3736
}
3837
s.mocks.stores = mockStores
39-
s.mocks.scheduler = mockScheduler.NewMockScheduler(t)
40-
s.scheduler = s.mocks.scheduler
4138
s.mocks.builder = mockbuilder.NewMockBuilder(t)
4239
s.imageBuilder = s.mocks.builder
4340
s.mocks.runner = mockrunner.NewMockRunner(t)

builder/deploy/deployer.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717

1818
"github.com/google/uuid"
1919
"opencsg.com/csghub-server/builder/deploy/common"
20-
"opencsg.com/csghub-server/builder/deploy/scheduler"
2120
"opencsg.com/csghub-server/builder/loki"
2221
"opencsg.com/csghub-server/builder/redis"
2322
"opencsg.com/csghub-server/builder/store/database"
@@ -214,7 +213,7 @@ func (d *deployer) Deploy(ctx context.Context, dr types.DeployRepo) (int64, erro
214213
imgStrLen := len(strings.Trim(deploy.ImageID, " "))
215214
slog.Debug("do deployer.Deploy check image", slog.Any("deploy.ImageID", deploy.ImageID), slog.Any("imgStrLen", imgStrLen))
216215
if imgStrLen > 0 {
217-
bldTaskStatus = scheduler.BuildSkip
216+
bldTaskStatus = common.TaskStatusBuildSkip
218217
bldTaskMsg = "Skip"
219218
}
220219
slog.Debug("create build task", slog.Any("bldTaskStatus", bldTaskStatus), slog.Any("bldTaskMsg", bldTaskMsg))

builder/deploy/deployer_ce.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"opencsg.com/csghub-server/builder/deploy/common"
1717
"opencsg.com/csghub-server/builder/deploy/imagebuilder"
1818
"opencsg.com/csghub-server/builder/deploy/imagerunner"
19-
"opencsg.com/csghub-server/builder/deploy/scheduler"
2019
"opencsg.com/csghub-server/builder/event"
2120
"opencsg.com/csghub-server/builder/store/database"
2221
"opencsg.com/csghub-server/common/config"
@@ -25,7 +24,6 @@ import (
2524
)
2625

2726
type deployer struct {
28-
scheduler scheduler.Scheduler
2927
imageBuilder imagebuilder.Builder
3028
imageRunner imagerunner.Runner
3129

@@ -45,7 +43,7 @@ type deployer struct {
4543
logReporter reporter.LogCollector
4644
}
4745

48-
func newDeployer(s scheduler.Scheduler, ib imagebuilder.Builder, ir imagerunner.Runner, c common.DeployConfig, logReporter reporter.LogCollector, cfg *config.Config, startJobs bool) (*deployer, error) {
46+
func newDeployer(ib imagebuilder.Builder, ir imagerunner.Runner, c common.DeployConfig, logReporter reporter.LogCollector, cfg *config.Config, startJobs bool) (*deployer, error) {
4947

5048
store := database.NewDeployTaskStore()
5149
node, err := snowflake.NewNode(1)
@@ -55,7 +53,6 @@ func newDeployer(s scheduler.Scheduler, ib imagebuilder.Builder, ir imagerunner.
5553
}
5654

5755
d := &deployer{
58-
scheduler: s,
5956
imageBuilder: ib,
6057
imageRunner: ir,
6158
deployTaskStore: store,

0 commit comments

Comments
 (0)