Skip to content

Commit e4ad88f

Browse files
committed
Try implementation with dummyQueryResult to symbolize end of resultChan
1 parent 316a759 commit e4ad88f

File tree

1 file changed

+19
-21
lines changed

1 file changed

+19
-21
lines changed

stage/stage.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ func (s *Stage) Run(ctx context.Context) int {
139139
// This initial size is just a good start, might not be enough.
140140
results := make([]*QueryResult, 0, len(s.Queries)+len(s.QueryFiles))
141141
s.States.resultChan = make(chan *QueryResult, 16)
142+
// This dummyQueryResult is used to push into resultChan once we know all the query results are done recording.
143+
var dummyQueryResult = &QueryResult{
144+
StageId: "__EOF__SENTINEL__",
145+
}
142146
timeToExit := make(chan os.Signal, 1)
143147
signal.Notify(timeToExit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
144148
// Each goroutine we spawn will increment this wait group (count-down latch). We may start a goroutine for running
@@ -150,14 +154,10 @@ func (s *Stage) Run(ctx context.Context) int {
150154

151155
go func() {
152156
s.States.wgExitMainStage.Wait()
153-
close(s.States.resultChan)
154-
// wgExitMainStage goes down to 0 after all the goroutines finish. Then we exit the driver by
155-
// closing the timeToExit channel, which will trigger the graceful shutdown process -
156-
// (flushing the log file, writing the final time log summary, etc.).
157157

158-
// When SIGKILL and SIGINT are captured, we trigger this process by canceling the context, which will cause
159-
// "context cancelled" errors in goroutines to let them exit.
160-
close(timeToExit)
158+
// wgExitMainStage goes down to 0 after all the goroutines finish. Then we will send dummyQueryResult
159+
// to resultChan to symbolize all query results are done recording.
160+
s.States.resultChan <- dummyQueryResult;
161161
}()
162162

163163
ctx, s.States.AbortAll = context.WithCancelCause(ctx)
@@ -175,31 +175,29 @@ func (s *Stage) Run(ctx context.Context) int {
175175

176176
for {
177177
select {
178-
case result, ok := <-s.States.resultChan:
179-
if !ok {
180-
// resultChan closed: all results received, finalize and exit
178+
case result := <-s.States.resultChan:
179+
// result == dummyQueryResult: all results received, finalize and exit
180+
if result == dummyQueryResult {
181181
s.States.RunFinishTime = time.Now()
182182
for _, recorder := range s.States.runRecorders {
183183
recorder.RecordRun(utils.GetCtxWithTimeout(time.Second*5), s, results)
184184
}
185+
// Explicitly close(timeToExit) here to trigger the the graceful shutdown process -
186+
// (flushing the log file, writing the final time log summary, etc.).
187+
close(timeToExit)
185188
return int(s.States.exitCode.Load())
186189
}
187190
results = append(results, result)
188191
for _, recorder := range s.States.runRecorders {
189192
recorder.RecordQuery(utils.GetCtxWithTimeout(time.Second*5), s, result)
190193
}
191194

192-
case sig, ok := <-timeToExit:
193-
if !ok {
194-
// timeToExit channel closed, no more signals — continue to receive results
195-
continue
196-
}
197-
if sig != nil {
198-
// Received shutdown signal; cancel ongoing queries
199-
log.Info().Msgf("Shutdown signal received: %v. Aborting queries...", sig)
200-
s.States.AbortAll(fmt.Errorf("%s", sig.String()))
201-
// Keep receiving results until resultChan is closed
202-
}
195+
// When SIGKILL and SIGINT are captured, we trigger this process by canceling the context, which will cause
196+
// "context cancelled" errors in goroutines to let them exit.
197+
case sig := <-timeToExit:
198+
// Received shutdown signal; cancel ongoing queries
199+
log.Info().Msgf("Shutdown signal received: %v. Aborting queries...", sig)
200+
s.States.AbortAll(fmt.Errorf("%s", sig.String()))
203201
}
204202
}
205203
}

0 commit comments

Comments
 (0)