Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,14 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
defer w.updateState(ctx)

if err := s.Conn.PingContext(ctx); err != nil {
time.Sleep(w.cfg.RefreshConnWait) // I feel it silly to sleep, but don't come up with better idea
select {
case <-time.After(w.cfg.RefreshConnWait):
// Sleep completed normally
case <-ctx.Done():
// Context was cancelled or timed out during sleep
return ctx.Err()
}

if err := s.RefreshConn(ctx); err != nil {
return err
}
Expand All @@ -226,8 +233,14 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
}
start := time.Now()
rows, err := s.Conn.QueryContext(ctx, query)
defer w.measurement.Measure(queryName, time.Now().Sub(start), err)
defer func() {
w.measurement.Measure(queryName, time.Since(start), err)
}()
if err != nil {
// Check if error is due to context cancellation/timeout
if ctx.Err() != nil {
return fmt.Errorf("query %s cancelled due to timeout: %v", queryName, ctx.Err())
}
return fmt.Errorf("execute query %s failed %v", queryName, err)
}
defer rows.Close()
Expand All @@ -237,7 +250,7 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
if err != nil {
return err
}
util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table)
util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Since(start), table)
return nil
}
if err := w.drainQueryResult(queryName, rows); err != nil {
Expand Down
35 changes: 29 additions & 6 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@ func checkPrepare(ctx context.Context, w workload.Workloader) {
func execute(timeoutCtx context.Context, w workload.Workloader, action string, threads, index int) error {
count := totalCount / threads

ctx := w.InitThread(context.Background(), index)
// For prepare, cleanup and check operations, use background context to avoid timeout constraints
// Only run phases should be limited by timeout
var ctx context.Context
if action == "prepare" || action == "cleanup" || action == "check" {
ctx = w.InitThread(context.Background(), index)
} else {
ctx = w.InitThread(timeoutCtx, index)
}
defer w.CleanupThread(ctx, index)

switch action {
Expand All @@ -58,21 +65,37 @@ func execute(timeoutCtx context.Context, w workload.Workloader, action string, t
return w.Check(ctx, index)
}

// This loop is only reached for "run" action since other actions return earlier
for i := 0; i < count || count <= 0; i++ {
// Check if timeout has occurred before starting next query
select {
case <-ctx.Done():
if !silence {
fmt.Printf("[%s] %s worker %d stopped due to timeout after %d iterations\n",
time.Now().Format("2006-01-02 15:04:05"), action, index, i)
}
return nil
default:
}

err := w.Run(ctx, index)
if err != nil {
// Check if the error is due to timeout/cancellation
if ctx.Err() != nil {
if !silence {
fmt.Printf("[%s] %s worker %d stopped due to timeout: %v\n",
time.Now().Format("2006-01-02 15:04:05"), action, index, err)
}
return nil // Don't treat timeout as an error
}

if !silence {
fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), action, err)
}
if !ignoreError {
return err
}
}
select {
case <-timeoutCtx.Done():
return nil
default:
}
}

return nil
Expand Down