Skip to content

Commit e7727a3

Browse files
committed
Try implementation with dummyQueryResult to symbolize end of resultChan
1 parent 316a759 commit e7727a3

File tree

1 file changed

+15
-15
lines changed

1 file changed

+15
-15
lines changed

stage/stage.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ func (s *Stage) Run(ctx context.Context) int {
139139
// This initial size is just a good start, might not be enough.
140140
results := make([]*QueryResult, 0, len(s.Queries)+len(s.QueryFiles))
141141
s.States.resultChan = make(chan *QueryResult, 16)
142+
// This dummyQueryResult is used to push into resultChan once we know all the query results are done recording.
143+
var dummyQueryResult = &QueryResult{
144+
StageId: "__EOF__SENTINEL__",
145+
}
142146
timeToExit := make(chan os.Signal, 1)
143147
signal.Notify(timeToExit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
144148
// Each goroutine we spawn will increment this wait group (count-down latch). We may start a goroutine for running
@@ -150,7 +154,10 @@ func (s *Stage) Run(ctx context.Context) int {
150154

151155
go func() {
152156
s.States.wgExitMainStage.Wait()
153-
close(s.States.resultChan)
157+
158+
// Send the dummyQueryResult to resultChan to symbolize all query results are done recording.
159+
s.States.resultChan <- dummyQueryResult;
160+
154161
// wgExitMainStage goes down to 0 after all the goroutines finish. Then we exit the driver by
155162
// closing the timeToExit channel, which will trigger the graceful shutdown process -
156163
// (flushing the log file, writing the final time log summary, etc.).
@@ -175,9 +182,9 @@ func (s *Stage) Run(ctx context.Context) int {
175182

176183
for {
177184
select {
178-
case result, ok := <-s.States.resultChan:
179-
if !ok {
180-
// resultChan closed: all results received, finalize and exit
185+
case result := <-s.States.resultChan:
186+
// result == dummyQueryResult: all results received, finalize and exit
187+
if result == dummyQueryResult {
181188
s.States.RunFinishTime = time.Now()
182189
for _, recorder := range s.States.runRecorders {
183190
recorder.RecordRun(utils.GetCtxWithTimeout(time.Second*5), s, results)
@@ -189,17 +196,10 @@ func (s *Stage) Run(ctx context.Context) int {
189196
recorder.RecordQuery(utils.GetCtxWithTimeout(time.Second*5), s, result)
190197
}
191198

192-
case sig, ok := <-timeToExit:
193-
if !ok {
194-
// timeToExit channel closed, no more signals — continue to receive results
195-
continue
196-
}
197-
if sig != nil {
198-
// Received shutdown signal; cancel ongoing queries
199-
log.Info().Msgf("Shutdown signal received: %v. Aborting queries...", sig)
200-
s.States.AbortAll(fmt.Errorf("%s", sig.String()))
201-
// Keep receiving results until resultChan is closed
202-
}
199+
case sig := <-timeToExit:
200+
// Received shutdown signal; cancel ongoing queries
201+
log.Info().Msgf("Shutdown signal received: %v. Aborting queries...", sig)
202+
s.States.AbortAll(fmt.Errorf("%s", sig.String()))
203203
}
204204
}
205205
}

0 commit comments

Comments
 (0)