-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
batch output insert #3167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
batch output insert #3167
Changes from 4 commits
23ca740
a1e1644
d1c8bec
579340e
6d2de8f
5589d3e
0110449
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -22,9 +22,10 @@ import ( | |||||
) | ||||||
|
||||||
type logRecord struct { | ||||||
task *TaskRunner | ||||||
output string | ||||||
time time.Time | ||||||
task *TaskRunner | ||||||
output string | ||||||
time time.Time | ||||||
currentStage *db.TaskStage | ||||||
} | ||||||
|
||||||
type EventType uint | ||||||
|
@@ -36,6 +37,11 @@ const ( | |||||
EventTypeEmpty EventType = 3 | ||||||
) | ||||||
|
||||||
const ( | ||||||
TaskOutputBatchSize = 500 | ||||||
TaskOutputInsertIntervalMs = 500 | ||||||
) | ||||||
|
||||||
type PoolEvent struct { | ||||||
eventType EventType | ||||||
task *TaskRunner | ||||||
|
@@ -201,23 +207,47 @@ func (p *TaskPool) handleQueue() { | |||||
} | ||||||
|
||||||
func (p *TaskPool) handleLogs() { | ||||||
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond) | ||||||
logs := make([]logRecord, 0) | ||||||
|
||||||
for record := range p.logger { | ||||||
db.StoreSession(p.store, "logger", func() { | ||||||
for { | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This infinite loop has no exit condition and will prevent the goroutine from terminating gracefully. Consider adding a context or done channel to allow proper shutdown. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
newOutput, err := p.store.CreateTaskOutput(db.TaskOutput{ | ||||||
TaskID: record.task.Task.ID, | ||||||
Output: record.output, | ||||||
Time: record.time, | ||||||
}) | ||||||
select { | ||||||
case record := <-p.logger: | ||||||
logs = append(logs, record) | ||||||
|
||||||
if err != nil { | ||||||
log.Error(err) | ||||||
return | ||||||
if len(logs) >= TaskOutputBatchSize { | ||||||
p.flushLogs(&logs) | ||||||
} | ||||||
case <-logTicker.C: | ||||||
p.flushLogs(&logs) | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
func (p *TaskPool) flushLogs(logs *[]logRecord) { | ||||||
if len(*logs) > 0 { | ||||||
p.writeLogs(*logs) | ||||||
*logs = make([]logRecord, 0) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Creating a new slice instead of clearing the existing one may cause unnecessary allocations. Consider using *logs = (*logs)[:0] to reuse the underlying array.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
} | ||||||
} | ||||||
|
||||||
func (p *TaskPool) writeLogs(logs []logRecord) { | ||||||
|
||||||
taskOutput := make([]db.TaskOutput, 0) | ||||||
|
||||||
for _, record := range logs { | ||||||
newOutput := db.TaskOutput{ | ||||||
TaskID: record.task.Task.ID, | ||||||
Output: record.output, | ||||||
Time: record.time, | ||||||
} | ||||||
taskOutput = append(taskOutput, newOutput) | ||||||
|
||||||
currentOutput := record.task.currentOutput | ||||||
record.task.currentOutput = &newOutput | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The newOutput variable is being modified after assignment to record.task.currentOutput. The StageID is set on line 275, but this modification won't be reflected in the task's currentOutput since it's a copy, not a reference to the same object. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
currentOutput := record.task.currentOutput | ||||||
record.task.currentOutput = &newOutput | ||||||
db.StoreSession(p.store, "logger", func() { | ||||||
|
||||||
newStage, newState, err := stage_parsers.MoveToNextStage( | ||||||
p.store, | ||||||
|
@@ -240,8 +270,20 @@ func (p *TaskPool) handleLogs() { | |||||
if newStage != nil { | ||||||
record.task.currentStage = newStage | ||||||
} | ||||||
|
||||||
if record.task.currentStage != nil { | ||||||
newOutput.StageID = record.task.currentStage.ID | ||||||
} | ||||||
}) | ||||||
} | ||||||
|
||||||
db.StoreSession(p.store, "logger", func() { | ||||||
err := p.store.InsertTaskOutputBatch(taskOutput) | ||||||
if err != nil { | ||||||
log.Error(err) | ||||||
return | ||||||
} | ||||||
}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BugThe new batching logic for task output introduces several critical issues:
Locations (3) |
||||||
} | ||||||
|
||||||
func runTask(task *TaskRunner, p *TaskPool) { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ticker is never stopped, which can lead to a goroutine leak. Consider adding defer logTicker.Stop() or stopping it when the loop exits.
Copilot uses AI. Check for mistakes.