Skip to content

Commit 316a759

Browse files
committed
Fixed recording of query.
When running with concurrent streams, not all queries were recorded successfully to the database like MySQL due to the timeToExit signal channel being received first which would prematurely end the loop that records the query to the database even though that resultChan is not done receiving all the queries report yet. Fix was to close the resultChan and modify the loop to allow for full processing of the resultChan even if the timeToExit signal channel was received.
1 parent 7a3bce5 commit 316a759

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

stage/stage.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func (s *Stage) Run(ctx context.Context) int {
150150

151151
go func() {
152152
s.States.wgExitMainStage.Wait()
153+
close(s.States.resultChan)
153154
// wgExitMainStage goes down to 0 after all the goroutines finish. Then we exit the driver by
154155
// closing the timeToExit channel, which will trigger the graceful shutdown process -
155156
// (flushing the log file, writing the final time log summary, etc.).
@@ -174,22 +175,31 @@ func (s *Stage) Run(ctx context.Context) int {
174175

175176
for {
176177
select {
177-
case result := <-s.States.resultChan:
178+
case result, ok := <-s.States.resultChan:
179+
if !ok {
180+
// resultChan closed: all results received, finalize and exit
181+
s.States.RunFinishTime = time.Now()
182+
for _, recorder := range s.States.runRecorders {
183+
recorder.RecordRun(utils.GetCtxWithTimeout(time.Second*5), s, results)
184+
}
185+
return int(s.States.exitCode.Load())
186+
}
178187
results = append(results, result)
179188
for _, recorder := range s.States.runRecorders {
180189
recorder.RecordQuery(utils.GetCtxWithTimeout(time.Second*5), s, result)
181190
}
182-
case sig := <-timeToExit:
183-
if sig != nil {
184-
// Cancel the context and wait for the goroutines to exit.
185-
s.States.AbortAll(fmt.Errorf(sig.String()))
191+
192+
case sig, ok := <-timeToExit:
193+
if !ok {
194+
// timeToExit channel closed, no more signals — continue to receive results
186195
continue
187196
}
188-
s.States.RunFinishTime = time.Now()
189-
for _, recorder := range s.States.runRecorders {
190-
recorder.RecordRun(utils.GetCtxWithTimeout(time.Second*5), s, results)
197+
if sig != nil {
198+
// Received shutdown signal; cancel ongoing queries
199+
log.Info().Msgf("Shutdown signal received: %v. Aborting queries...", sig)
200+
s.States.AbortAll(fmt.Errorf("%s", sig.String()))
201+
// Keep receiving results until resultChan is closed
191202
}
192-
return int(s.States.exitCode.Load())
193203
}
194204
}
195205
}

0 commit comments

Comments
 (0)