From 1bd35006a806c64b80606a2b9b07c66b8adc75a9 Mon Sep 17 00:00:00 2001 From: mahjonp Date: Thu, 21 Aug 2025 15:53:08 +0800 Subject: [PATCH] fixes: Queries continuing after timeout in CH benchmark --- ch/workload.go | 19 ++++++++++++++++--- cmd/go-tpc/misc.go | 35 +++++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/ch/workload.go b/ch/workload.go index 81190627..97f187f9 100644 --- a/ch/workload.go +++ b/ch/workload.go @@ -207,7 +207,14 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error { defer w.updateState(ctx) if err := s.Conn.PingContext(ctx); err != nil { - time.Sleep(w.cfg.RefreshConnWait) // I feel it silly to sleep, but don't come up with better idea + select { + case <-time.After(w.cfg.RefreshConnWait): + // Sleep completed normally + case <-ctx.Done(): + // Context was cancelled or timed out during sleep + return ctx.Err() + } + if err := s.RefreshConn(ctx); err != nil { return err } @@ -226,8 +233,14 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error { } start := time.Now() rows, err := s.Conn.QueryContext(ctx, query) - defer w.measurement.Measure(queryName, time.Now().Sub(start), err) + defer func() { + w.measurement.Measure(queryName, time.Since(start), err) + }() if err != nil { + // Check if error is due to context cancellation/timeout + if ctx.Err() != nil { + return fmt.Errorf("query %s cancelled due to timeout: %v", queryName, ctx.Err()) + } return fmt.Errorf("execute query %s failed %v", queryName, err) } defer rows.Close() @@ -237,7 +250,7 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error { if err != nil { return err } - util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table) + util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Since(start), table) return nil } if err := w.drainQueryResult(queryName, rows); err != nil { diff --git a/cmd/go-tpc/misc.go b/cmd/go-tpc/misc.go index 3cf04a92..f4be9275 100644 --- a/cmd/go-tpc/misc.go +++ b/cmd/go-tpc/misc.go @@ -40,7 +40,14 @@ func checkPrepare(ctx context.Context, w workload.Workloader) { func execute(timeoutCtx context.Context, w workload.Workloader, action string, threads, index int) error { count := totalCount / threads - ctx := w.InitThread(context.Background(), index) + // For prepare, cleanup and check operations, use background context to avoid timeout constraints + // Only run phases should be limited by timeout + var ctx context.Context + if action == "prepare" || action == "cleanup" || action == "check" { + ctx = w.InitThread(context.Background(), index) + } else { + ctx = w.InitThread(timeoutCtx, index) + } defer w.CleanupThread(ctx, index) switch action { @@ -58,9 +65,30 @@ func execute(timeoutCtx context.Context, w workload.Workloader, action string, t return w.Check(ctx, index) } + // This loop is only reached for "run" action since other actions return earlier for i := 0; i < count || count <= 0; i++ { + // Check if timeout has occurred before starting next query + select { + case <-ctx.Done(): + if !silence { + fmt.Printf("[%s] %s worker %d stopped due to timeout after %d iterations\n", + time.Now().Format("2006-01-02 15:04:05"), action, index, i) + } + return nil + default: + } + err := w.Run(ctx, index) if err != nil { + // Check if the error is due to timeout/cancellation + if ctx.Err() != nil { + if !silence { + fmt.Printf("[%s] %s worker %d stopped due to timeout: %v\n", + time.Now().Format("2006-01-02 15:04:05"), action, index, err) + } + return nil // Don't treat timeout as an error + } + if !silence { fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), action, err) } @@ -68,11 +96,6 @@ func execute(timeoutCtx context.Context, w workload.Workloader, action string, t return err } } - select { - case <-timeoutCtx.Done(): - return nil - default: - } } return nil