diff --git a/db/Store.go b/db/Store.go index 088ce972b..a929ef0d0 100644 --- a/db/Store.go +++ b/db/Store.go @@ -385,6 +385,7 @@ type TaskManager interface { DeleteTaskWithOutputs(projectID int, taskID int) error GetTaskOutputs(projectID int, taskID int, params RetrieveQueryParams) ([]TaskOutput, error) CreateTaskOutput(output TaskOutput) (TaskOutput, error) + InsertTaskOutputBatch(output []TaskOutput) error CreateTaskStage(stage TaskStage) (TaskStage, error) EndTaskStage(taskID int, stageID int, end time.Time, endOutputID int) error CreateTaskStageResult(taskID int, stageID int, result map[string]any) error diff --git a/db/Task.go b/db/Task.go index a24a72653..e6e52d4c3 100644 --- a/db/Task.go +++ b/db/Task.go @@ -200,10 +200,11 @@ type TaskWithTpl struct { // TaskOutput is the ansible log output from the task type TaskOutput struct { - ID int `db:"id" json:"id"` - TaskID int `db:"task_id" json:"task_id"` - Time time.Time `db:"time" json:"time"` - Output string `db:"output" json:"output"` + ID int `db:"id" json:"id"` + TaskID int `db:"task_id" json:"task_id"` + Time time.Time `db:"time" json:"time"` + Output string `db:"output" json:"output"` + StageID *int `db:"stage_id" json:"stage_id"` } type TaskStageType string diff --git a/db/bolt/task.go b/db/bolt/task.go index 4e8ff5865..df7caca8e 100644 --- a/db/bolt/task.go +++ b/db/bolt/task.go @@ -139,6 +139,22 @@ func (d *BoltDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) { return newOutput.(db.TaskOutput), nil } +func (d *BoltDb) InsertTaskOutputBatch(output []db.TaskOutput) error { + if len(output) == 0 { + return nil + } + + return d.db.Update(func(tx *bbolt.Tx) error { + for _, out := range output { + _, err := d.createObjectTx(tx, out.TaskID, db.TaskOutputProps, out) + if err != nil { + return err + } + } + return nil + }) +} + func (d *BoltDb) getTasks(projectID int, templateID *int, params db.RetrieveQueryParams) (tasksWithTpl []db.TaskWithTpl, err error) { var tasks []db.Task diff --git a/db/sql/migrations/v2.16.2.sql b/db/sql/migrations/v2.16.2.sql index 5c4170577..8b0c2ab9e 100644 --- a/db/sql/migrations/v2.16.2.sql +++ b/db/sql/migrations/v2.16.2.sql @@ -17,4 +17,6 @@ create table project__task_params alter table project__integration drop task_params; alter table project__schedule add task_params_id int references `project__task_params`(`id`); -alter table project__integration add task_params_id int references `project__task_params`(`id`); \ No newline at end of file +alter table project__integration add task_params_id int references `project__task_params`(`id`); + +alter table `task__output` add `stage_id` int null references `task__stage`(`id`); diff --git a/db/sql/task.go b/db/sql/task.go index ce205791a..b6a1277a7 100644 --- a/db/sql/task.go +++ b/db/sql/task.go @@ -230,6 +230,28 @@ func (d *SqlDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) { return output, err } +func (d *SqlDb) InsertTaskOutputBatch(output []db.TaskOutput) error { + + if len(output) == 0 { + return nil + } + + q := squirrel.Insert("task__output"). + Columns("task_id", "output", "time", "stage_id") + + for _, item := range output { + q = q.Values(item.TaskID, item.Output, item.Time.UTC(), item.StageID) + } + + query, args, err := q.ToSql() + if err != nil { + return err + } + + _, err = d.exec(query, args...) + return err +} + func (d *SqlDb) getTasks(projectID int, templateID *int, taskIDs []int, params db.RetrieveQueryParams, tasks *[]db.TaskWithTpl) (err error) { fields := "task.*" fields += ", tpl.playbook as tpl_playbook" + diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index 76da305ef..e1483d316 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -22,9 +22,10 @@ import ( ) type logRecord struct { - task *TaskRunner - output string - time time.Time + task *TaskRunner + output string + time time.Time + currentStage *db.TaskStage } type EventType uint @@ -36,6 +37,11 @@ const ( EventTypeEmpty EventType = 3 ) +const ( + TaskOutputBatchSize = 500 + TaskOutputInsertIntervalMs = 500 +) + type PoolEvent struct { eventType EventType task *TaskRunner @@ -153,9 +159,7 @@ func (p *TaskPool) GetTaskByAlias(alias string) (task *TaskRunner) { func (p *TaskPool) Run() { ticker := time.NewTicker(5 * time.Second) - defer func() { - ticker.Stop() - }() + defer ticker.Stop() go p.handleQueue() go p.handleLogs() @@ -226,23 +230,49 @@ func (p *TaskPool) handleQueue() { } func (p *TaskPool) handleLogs() { + logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond) - for record := range p.logger { - db.StoreSession(p.store, "logger", func() { + defer logTicker.Stop() - newOutput, err := p.store.CreateTaskOutput(db.TaskOutput{ - TaskID: record.task.Task.ID, - Output: record.output, - Time: record.time, - }) + logs := make([]logRecord, 0) - if err != nil { - log.Error(err) - return + for { + + select { + case record := <-p.logger: + logs = append(logs, record) + + if len(logs) >= TaskOutputBatchSize { + p.flushLogs(&logs) } + case <-logTicker.C: + p.flushLogs(&logs) + } + } +} - currentOutput := record.task.currentOutput - record.task.currentOutput = &newOutput +func (p *TaskPool) flushLogs(logs *[]logRecord) { + if len(*logs) > 0 { + p.writeLogs(*logs) + *logs = (*logs)[:0] + } +} + +func (p *TaskPool) writeLogs(logs []logRecord) { + + taskOutput := make([]db.TaskOutput, 0) + + for _, record := range logs { + newOutput := db.TaskOutput{ + TaskID: record.task.Task.ID, + Output: record.output, + Time: record.time, + } + + currentOutput := record.task.currentOutput + record.task.currentOutput = &newOutput + + db.StoreSession(p.store, "logger", func() { newStage, newState, err := stage_parsers.MoveToNextStage( p.store, @@ -265,8 +295,21 @@ func (p *TaskPool) handleLogs() { if newStage != nil { record.task.currentStage = newStage } + + if record.task.currentStage != nil { + newOutput.StageID = &record.task.currentStage.ID + } }) + taskOutput = append(taskOutput, newOutput) } + + db.StoreSession(p.store, "logger", func() { + err := p.store.InsertTaskOutputBatch(taskOutput) + if err != nil { + log.Error(err) + return + } + }) } func runTask(task *TaskRunner, p *TaskPool) {