From 23ca740819a57465382338b0114bd866b7f17e57 Mon Sep 17 00:00:00 2001 From: mgukov Date: Thu, 17 Jul 2025 21:44:22 +0500 Subject: [PATCH 1/6] feat(be): batch task output insert --- db/Store.go | 1 + db/Task.go | 9 +++-- db/bolt/BoltDb.go | 5 +++ db/sql/migrations/v2.16.2.sql | 4 +- db/sql/task.go | 22 +++++++++++ services/tasks/TaskPool.go | 72 +++++++++++++++++++++++++++-------- 6 files changed, 93 insertions(+), 20 deletions(-) diff --git a/db/Store.go b/db/Store.go index ab0b6182f..7a96f816d 100644 --- a/db/Store.go +++ b/db/Store.go @@ -373,6 +373,7 @@ type TaskManager interface { DeleteTaskWithOutputs(projectID int, taskID int) error GetTaskOutputs(projectID int, taskID int, params RetrieveQueryParams) ([]TaskOutput, error) CreateTaskOutput(output TaskOutput) (TaskOutput, error) + InsertTaskOutputBatch(output []TaskOutput) error CreateTaskStage(stage TaskStage) (TaskStage, error) EndTaskStage(taskID int, stageID int, end time.Time, endOutputID int) error CreateTaskStageResult(taskID int, stageID int, result map[string]any) error diff --git a/db/Task.go b/db/Task.go index f936e56aa..d3a75c857 100644 --- a/db/Task.go +++ b/db/Task.go @@ -200,10 +200,11 @@ type TaskWithTpl struct { // TaskOutput is the ansible log output from the task type TaskOutput struct { - ID int `db:"id" json:"id"` - TaskID int `db:"task_id" json:"task_id"` - Time time.Time `db:"time" json:"time"` - Output string `db:"output" json:"output"` + ID int `db:"id" json:"id"` + TaskID int `db:"task_id" json:"task_id"` + Time time.Time `db:"time" json:"time"` + Output string `db:"output" json:"output"` + StageID int `db:"stage_id" json:"stage_id"` } type TaskStageType string diff --git a/db/bolt/BoltDb.go b/db/bolt/BoltDb.go index 7091f1bf2..51d209fce 100644 --- a/db/bolt/BoltDb.go +++ b/db/bolt/BoltDb.go @@ -44,6 +44,11 @@ type BoltDb struct { terraformAlias publicAlias } +func (d *BoltDb) InsertTaskOutputBatch(output []db.TaskOutput) error { + //TODO implement me + panic("implement me") +} + func (d *BoltDb) GetDialect() string { return util.DbDriverBolt } diff --git a/db/sql/migrations/v2.16.2.sql b/db/sql/migrations/v2.16.2.sql index 5c4170577..8b0c2ab9e 100644 --- a/db/sql/migrations/v2.16.2.sql +++ b/db/sql/migrations/v2.16.2.sql @@ -17,4 +17,6 @@ create table project__task_params alter table project__integration drop task_params; alter table project__schedule add task_params_id int references `project__task_params`(`id`); -alter table project__integration add task_params_id int references `project__task_params`(`id`); \ No newline at end of file +alter table project__integration add task_params_id int references `project__task_params`(`id`); + +alter table `task__output` add `stage_id` int null references `task__stage`(`id`); diff --git a/db/sql/task.go b/db/sql/task.go index bde66f734..b18f25408 100644 --- a/db/sql/task.go +++ b/db/sql/task.go @@ -229,6 +229,28 @@ func (d *SqlDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) { return output, err } +func (d *SqlDb) InsertTaskOutputBatch(output []db.TaskOutput) error { + + if len(output) == 0 { + return nil + } + + q := squirrel.Insert("task__output"). + Columns("task_id", "output", "time", "stage_id") + + for _, item := range output { + q = q.Values(item.TaskID, item.Output, item.Time.UTC(), item.StageID) + } + + query, args, err := q.ToSql() + if err != nil { + return err + } + + _, err = d.exec(query, args...) + return err +} + func (d *SqlDb) getTasks(projectID int, templateID *int, taskIDs []int, params db.RetrieveQueryParams, tasks *[]db.TaskWithTpl) (err error) { fields := "task.*" fields += ", tpl.playbook as tpl_playbook" + diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index d95d22563..01ba75fed 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -22,9 +22,10 @@ import ( ) type logRecord struct { - task *TaskRunner - output string - time time.Time + task *TaskRunner + output string + time time.Time + currentStage *db.TaskStage } type EventType uint @@ -36,6 +37,11 @@ const ( EventTypeEmpty EventType = 3 ) +const ( + TaskOutputBatchSize = 1000 + TaskOutputInsertIntervalMs = 500 +) + type PoolEvent struct { eventType EventType task *TaskRunner @@ -201,23 +207,47 @@ func (p *TaskPool) handleQueue() { } func (p *TaskPool) handleLogs() { + logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond) + logs := make([]logRecord, 0) - for record := range p.logger { - db.StoreSession(p.store, "logger", func() { + for { - newOutput, err := p.store.CreateTaskOutput(db.TaskOutput{ - TaskID: record.task.Task.ID, - Output: record.output, - Time: record.time, - }) + select { + case record := <-p.logger: + logs = append(logs, record) - if err != nil { - log.Error(err) - return + if len(logs) >= TaskOutputBatchSize { + p.flushLogs(&logs) } + case <-logTicker.C: + p.flushLogs(&logs) + } + } +} + +func (p *TaskPool) flushLogs(logs *[]logRecord) { + if len(*logs) > 0 { + p.writeLogs(*logs) + *logs = make([]logRecord, 0) + } +} + +func (p *TaskPool) writeLogs(logs []logRecord) { + + taskOutput := make([]db.TaskOutput, 0) + + for _, record := range logs { + newOutput := db.TaskOutput{ + TaskID: record.task.Task.ID, + Output: record.output, + Time: record.time, + } + taskOutput = append(taskOutput, newOutput) + + currentOutput := record.task.currentOutput + record.task.currentOutput = &newOutput - currentOutput := record.task.currentOutput - record.task.currentOutput = &newOutput + db.StoreSession(p.store, "logger", func() { newStage, newState, err := stage_parsers.MoveToNextStage( p.store, @@ -240,8 +270,20 @@ func (p *TaskPool) handleLogs() { if newStage != nil { record.task.currentStage = newStage } + + if record.task.currentStage != nil { + newOutput.StageID = record.task.currentStage.ID + } }) } + + db.StoreSession(p.store, "logger", func() { + err := p.store.InsertTaskOutputBatch(taskOutput) + if err != nil { + log.Error(err) + return + } + }) } func runTask(task *TaskRunner, p *TaskPool) { From a1e1644aa9ebc8968c16fc555733850e75d0666e Mon Sep 17 00:00:00 2001 From: mgukov Date: Sat, 26 Jul 2025 22:20:35 +0500 Subject: [PATCH 2/6] feat(be): batch task output insert, boltDB --- db/bolt/BoltDb.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/db/bolt/BoltDb.go b/db/bolt/BoltDb.go index 51d209fce..94cad659b 100644 --- a/db/bolt/BoltDb.go +++ b/db/bolt/BoltDb.go @@ -45,8 +45,13 @@ type BoltDb struct { } func (d *BoltDb) InsertTaskOutputBatch(output []db.TaskOutput) error { - //TODO implement me - panic("implement me") + for _, out := range output { + _, err := d.CreateTaskOutput(out) + if err != nil { + return err + } + } + return nil } func (d *BoltDb) GetDialect() string { From d1c8bece2b85508176ef987559d56b4a7332cf5e Mon Sep 17 00:00:00 2001 From: mgukov Date: Sat, 26 Jul 2025 23:46:45 +0500 Subject: [PATCH 3/6] feat(be): batch task output insert, boltDB --- db/bolt/BoltDb.go | 18 ++++++++++++------ services/tasks/TaskPool.go | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/db/bolt/BoltDb.go b/db/bolt/BoltDb.go index 94cad659b..5a8980d38 100644 --- a/db/bolt/BoltDb.go +++ b/db/bolt/BoltDb.go @@ -45,13 +45,19 @@ type BoltDb struct { } func (d *BoltDb) InsertTaskOutputBatch(output []db.TaskOutput) error { - for _, out := range output { - _, err := d.CreateTaskOutput(out) - if err != nil { - return err - } + if len(output) == 0 { + return nil } - return nil + + return d.db.Update(func(tx *bbolt.Tx) error { + for _, out := range output { + _, err := d.createObjectTx(tx, out.TaskID, db.TaskOutputProps, out) + if err != nil { + return err + } + } + return nil + }) } func (d *BoltDb) GetDialect() string { diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index 01ba75fed..082c8f5d9 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -38,7 +38,7 @@ const ( ) const ( - TaskOutputBatchSize = 1000 + TaskOutputBatchSize = 500 TaskOutputInsertIntervalMs = 500 ) From 579340e2e3b6eb1294155bbd1b3359a6088b59a5 Mon Sep 17 00:00:00 2001 From: mgukov Date: Sat, 26 Jul 2025 23:48:43 +0500 Subject: [PATCH 4/6] feat(be): batch task output insert, boltDB --- db/bolt/BoltDb.go | 16 ---------------- db/bolt/task.go | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/db/bolt/BoltDb.go b/db/bolt/BoltDb.go index 5a8980d38..7091f1bf2 100644 --- a/db/bolt/BoltDb.go +++ b/db/bolt/BoltDb.go @@ -44,22 +44,6 @@ type BoltDb struct { terraformAlias publicAlias } -func (d *BoltDb) InsertTaskOutputBatch(output []db.TaskOutput) error { - if len(output) == 0 { - return nil - } - - return d.db.Update(func(tx *bbolt.Tx) error { - for _, out := range output { - _, err := d.createObjectTx(tx, out.TaskID, db.TaskOutputProps, out) - if err != nil { - return err - } - } - return nil - }) -} - func (d *BoltDb) GetDialect() string { return util.DbDriverBolt } diff --git a/db/bolt/task.go b/db/bolt/task.go index 0ad3034f7..8aa61829f 100644 --- a/db/bolt/task.go +++ b/db/bolt/task.go @@ -121,6 +121,22 @@ func (d *BoltDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) { return newOutput.(db.TaskOutput), nil } +func (d *BoltDb) InsertTaskOutputBatch(output []db.TaskOutput) error { + if len(output) == 0 { + return nil + } + + return d.db.Update(func(tx *bbolt.Tx) error { + for _, out := range output { + _, err := d.createObjectTx(tx, out.TaskID, db.TaskOutputProps, out) + if err != nil { + return err + } + } + return nil + }) +} + func (d *BoltDb) getTasks(projectID int, templateID *int, params db.RetrieveQueryParams) (tasksWithTpl []db.TaskWithTpl, err error) { var tasks []db.Task From 6d2de8f98f3f8f74644f0a8c43b027d538724fea Mon Sep 17 00:00:00 2001 From: mgukov Date: Sun, 27 Jul 2025 00:43:08 +0500 Subject: [PATCH 5/6] feat(be): batch task output insert, fix for TaskOutput.StageID --- db/Task.go | 2 +- services/tasks/TaskPool.go | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/db/Task.go b/db/Task.go index d3a75c857..8df4e63de 100644 --- a/db/Task.go +++ b/db/Task.go @@ -204,7 +204,7 @@ type TaskOutput struct { TaskID int `db:"task_id" json:"task_id"` Time time.Time `db:"time" json:"time"` Output string `db:"output" json:"output"` - StageID int `db:"stage_id" json:"stage_id"` + StageID *int `db:"stage_id" json:"stage_id"` } type TaskStageType string diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index 082c8f5d9..4d0cafb39 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -208,6 +208,11 @@ func (p *TaskPool) handleQueue() { func (p *TaskPool) handleLogs() { logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond) + + defer func() { + logTicker.Stop() + }() + logs := make([]logRecord, 0) for { @@ -228,7 +233,7 @@ func (p *TaskPool) handleLogs() { func (p *TaskPool) flushLogs(logs *[]logRecord) { if len(*logs) > 0 { p.writeLogs(*logs) - *logs = make([]logRecord, 0) + *logs = (*logs)[:0] } } @@ -242,7 +247,6 @@ func (p *TaskPool) writeLogs(logs []logRecord) { Output: record.output, Time: record.time, } - taskOutput = append(taskOutput, newOutput) currentOutput := record.task.currentOutput record.task.currentOutput = &newOutput @@ -272,9 +276,10 @@ func (p *TaskPool) writeLogs(logs []logRecord) { } if record.task.currentStage != nil { - newOutput.StageID = record.task.currentStage.ID + newOutput.StageID = &record.task.currentStage.ID } }) + taskOutput = append(taskOutput, newOutput) } db.StoreSession(p.store, "logger", func() { From 5589d3eec4a847a7b6fdf0eacae2a659e07663c0 Mon Sep 17 00:00:00 2001 From: mgukov Date: Sun, 27 Jul 2025 00:50:05 +0500 Subject: [PATCH 6/6] feat(be): batch task output insert, simplify code --- services/tasks/TaskPool.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index 4d0cafb39..8a73f0a53 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -144,9 +144,7 @@ func (p *TaskPool) GetTaskByAlias(alias string) (task *TaskRunner) { func (p *TaskPool) Run() { ticker := time.NewTicker(5 * time.Second) - defer func() { - ticker.Stop() - }() + defer ticker.Stop() go p.handleQueue() go p.handleLogs() @@ -209,9 +207,7 @@ func (p *TaskPool) handleQueue() { func (p *TaskPool) handleLogs() { logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond) - defer func() { - logTicker.Stop() - }() + defer logTicker.Stop() logs := make([]logRecord, 0)