Skip to content

Commit 95a7800

Browse files
authored
workload: add update action for sysbench schema (pingcap#1149)
ref pingcap#1158
1 parent b7d1b2a commit 95a7800

File tree

14 files changed

+1329
-590
lines changed

14 files changed

+1329
-590
lines changed

tools/workload/app.go

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"context"
18+
"database/sql"
19+
"math/rand"
20+
"strings"
21+
"sync"
22+
"sync/atomic"
23+
"time"
24+
25+
"github.com/pingcap/errors"
26+
plog "github.com/pingcap/log"
27+
"go.uber.org/zap"
28+
"workload/schema"
29+
pbank "workload/schema/bank"
30+
pbank2 "workload/schema/bank2"
31+
"workload/schema/bankupdate"
32+
pcrawler "workload/schema/crawler"
33+
"workload/schema/largerow"
34+
"workload/schema/shop"
35+
psysbench "workload/schema/sysbench"
36+
puuu "workload/schema/uuu"
37+
)
38+
39+
// WorkloadExecutor executes the workload and collects statistics
40+
type WorkloadExecutor struct {
41+
Config *WorkloadConfig
42+
DBManager *DBManager
43+
Workload schema.Workload
44+
45+
// statistics
46+
FlushedRowCount atomic.Uint64
47+
QueryCount atomic.Uint64
48+
ErrorCount atomic.Uint64
49+
}
50+
51+
// WorkloadStats saves the statistics of the workload
52+
type WorkloadStats struct {
53+
FlushedRowCount atomic.Uint64
54+
QueryCount atomic.Uint64
55+
ErrorCount atomic.Uint64
56+
CreatedTableNum atomic.Int32
57+
}
58+
59+
// WorkloadApp is the main structure of the application
60+
type WorkloadApp struct {
61+
Config *WorkloadConfig
62+
DBManager *DBManager
63+
Workload schema.Workload
64+
Stats *WorkloadStats
65+
}
66+
67+
const (
68+
bank = "bank"
69+
sysbench = "sysbench"
70+
largeRow = "large_row"
71+
shopItem = "shop_item"
72+
uuu = "uuu"
73+
crawler = "crawler"
74+
// for gf case, at most support table count = 2. Here only 2 tables in this cases.
75+
// And each insert sql contains 200 batch, each update sql only contains 1 batch.
76+
bank2 = "bank2"
77+
bankUpdate = "bank_update"
78+
)
79+
80+
// stmtCacheKey is used as the key for statement cache
81+
type stmtCacheKey struct {
82+
conn *sql.Conn
83+
sql string
84+
}
85+
86+
// NewWorkloadApp creates a new workload application
87+
func NewWorkloadApp(config *WorkloadConfig) *WorkloadApp {
88+
return &WorkloadApp{
89+
Config: config,
90+
Stats: &WorkloadStats{},
91+
}
92+
}
93+
94+
// Initialize initializes the workload application
95+
func (app *WorkloadApp) Initialize() error {
96+
// set database connection
97+
dbManager, err := NewDBManager(app.Config)
98+
if err != nil {
99+
return err
100+
}
101+
app.DBManager = dbManager
102+
103+
// create workload
104+
app.Workload = app.createWorkload()
105+
106+
return nil
107+
}
108+
109+
// createWorkload creates a workload based on configuration
110+
func (app *WorkloadApp) createWorkload() schema.Workload {
111+
plog.Info("start to create workload")
112+
defer func() {
113+
plog.Info("create workload finished")
114+
}()
115+
116+
var workload schema.Workload
117+
switch app.Config.WorkloadType {
118+
case bank:
119+
workload = pbank.NewBankWorkload()
120+
case sysbench:
121+
workload = psysbench.NewSysbenchWorkload()
122+
case largeRow:
123+
workload = largerow.NewLargeRowWorkload(app.Config.RowSize, app.Config.LargeRowSize, app.Config.LargeRowRatio)
124+
case shopItem:
125+
workload = shop.NewShopItemWorkload(app.Config.TotalRowCount, app.Config.RowSize)
126+
case uuu:
127+
workload = puuu.NewUUUWorkload()
128+
case crawler:
129+
workload = pcrawler.NewCrawlerWorkload()
130+
case bank2:
131+
workload = pbank2.NewBank2Workload()
132+
case bankUpdate:
133+
workload = bankupdate.NewBankUpdateWorkload(app.Config.TotalRowCount, app.Config.UpdateLargeColumnSize)
134+
default:
135+
plog.Panic("unsupported workload type", zap.String("workload", app.Config.WorkloadType))
136+
}
137+
return workload
138+
}
139+
140+
// Execute executes the workload
141+
func (app *WorkloadApp) Execute() error {
142+
wg := &sync.WaitGroup{}
143+
return app.executeWorkload(wg)
144+
}
145+
146+
// executeWorkload executes the workload
147+
func (app *WorkloadApp) executeWorkload(wg *sync.WaitGroup) error {
148+
updateConcurrency := int(float64(app.Config.Thread) * app.Config.PercentageForUpdate)
149+
insertConcurrency := app.Config.Thread - updateConcurrency
150+
151+
plog.Info("database info",
152+
zap.Int("dbCount", len(app.DBManager.GetDBs())),
153+
zap.Int("tableCount", app.Config.TableCount))
154+
155+
if !app.Config.SkipCreateTable && app.Config.Action == "prepare" {
156+
app.handlePrepareAction(insertConcurrency, wg)
157+
return nil
158+
}
159+
160+
if app.Config.OnlyDDL {
161+
return nil
162+
}
163+
164+
app.handleWorkloadExecution(insertConcurrency, updateConcurrency, wg)
165+
return nil
166+
}
167+
168+
// handlePrepareAction handles the prepare action
169+
func (app *WorkloadApp) handlePrepareAction(insertConcurrency int, mainWg *sync.WaitGroup) {
170+
plog.Info("start to create tables", zap.Int("tableCount", app.Config.TableCount))
171+
wg := &sync.WaitGroup{}
172+
for _, db := range app.DBManager.GetDBs() {
173+
wg.Add(1)
174+
go app.initTables(wg, db.DB)
175+
}
176+
wg.Wait()
177+
plog.Info("All dbs create tables finished")
178+
if app.Config.TotalRowCount != 0 {
179+
app.executeInsertWorkers(insertConcurrency, wg)
180+
}
181+
}
182+
183+
// handleWorkloadExecution handles the workload execution
184+
func (app *WorkloadApp) handleWorkloadExecution(insertConcurrency, updateConcurrency int, wg *sync.WaitGroup) {
185+
plog.Info("start running workload",
186+
zap.String("workloadType", app.Config.WorkloadType),
187+
zap.Float64("largeRatio", app.Config.LargeRowRatio),
188+
zap.Int("totalThread", app.Config.Thread),
189+
zap.Int("batchSize", app.Config.BatchSize),
190+
zap.String("action", app.Config.Action),
191+
)
192+
193+
if app.Config.Action == "write" || app.Config.Action == "insert" {
194+
app.executeInsertWorkers(insertConcurrency, wg)
195+
}
196+
197+
if app.Config.Action == "write" || app.Config.Action == "update" {
198+
app.executeUpdateWorkers(updateConcurrency, wg)
199+
}
200+
}
201+
202+
// initTables initializes tables
203+
func (app *WorkloadApp) initTables(wg *sync.WaitGroup, db *sql.DB) {
204+
defer wg.Done()
205+
for tableIndex := 0; tableIndex < app.Config.TableCount; tableIndex++ {
206+
sql := app.Workload.BuildCreateTableStatement(tableIndex + app.Config.TableStartIndex)
207+
if _, err := db.Exec(sql); err != nil {
208+
err := errors.Annotate(err, "create table failed")
209+
plog.Error("create table failed", zap.Error(err))
210+
}
211+
app.Stats.CreatedTableNum.Add(1)
212+
}
213+
plog.Info("create tables finished")
214+
}
215+
216+
// executeInsertWorkers executes insert workers
217+
func (app *WorkloadApp) executeInsertWorkers(insertConcurrency int, wg *sync.WaitGroup) {
218+
wg.Add(insertConcurrency)
219+
var retryCount atomic.Uint64
220+
for i := range insertConcurrency {
221+
db := app.DBManager.GetDB()
222+
go func(workerID int) {
223+
defer func() {
224+
plog.Info("insert worker exited", zap.Int("worker", workerID))
225+
wg.Done()
226+
}()
227+
for {
228+
conn, err := db.DB.Conn(context.Background())
229+
if err != nil {
230+
plog.Info("get connection failed, wait 5 seconds and retry", zap.Error(err))
231+
time.Sleep(time.Second * 5)
232+
}
233+
plog.Info("start insert worker to write data to db", zap.Int("worker", workerID), zap.String("db", db.Name))
234+
err = app.doInsert(conn)
235+
if err != nil {
236+
plog.Info("do insert error, get another connection and retry", zap.Error(err))
237+
app.Stats.ErrorCount.Add(1)
238+
retryCount.Add(1)
239+
conn.Close()
240+
time.Sleep(time.Second * 2)
241+
plog.Info("retry insert", zap.Int("worker", workerID), zap.String("db", db.Name), zap.Uint64("retryCount", retryCount.Load()))
242+
continue
243+
}
244+
}
245+
}(i)
246+
}
247+
}
248+
249+
// doInsert performs insert operations
250+
func (app *WorkloadApp) doInsert(conn *sql.Conn) error {
251+
for {
252+
j := rand.Intn(app.Config.TableCount) + app.Config.TableStartIndex
253+
var err error
254+
255+
switch app.Config.WorkloadType {
256+
case uuu:
257+
insertSql, values := app.Workload.(*puuu.UUUWorkload).BuildInsertSqlWithValues(j, app.Config.BatchSize)
258+
_, err = app.executeWithValues(conn, insertSql, j, values)
259+
case bank2:
260+
insertSql, values := app.Workload.(*pbank2.Bank2Workload).BuildInsertSqlWithValues(j, app.Config.BatchSize)
261+
_, err = app.executeWithValues(conn, insertSql, j, values)
262+
default:
263+
insertSql := app.Workload.BuildInsertSql(j, app.Config.BatchSize)
264+
_, err = app.execute(conn, insertSql, j)
265+
}
266+
if err != nil {
267+
if strings.Contains(err.Error(), "connection is already closed") {
268+
plog.Info("connection is already closed", zap.Error(err))
269+
app.Stats.ErrorCount.Add(1)
270+
return err
271+
}
272+
273+
plog.Info("do insert error", zap.Error(err))
274+
app.Stats.ErrorCount.Add(1)
275+
continue
276+
}
277+
app.Stats.FlushedRowCount.Add(uint64(app.Config.BatchSize))
278+
}
279+
}
280+
281+
// execute executes a SQL statement
282+
func (app *WorkloadApp) execute(conn *sql.Conn, sql string, tableIndex int) (sql.Result, error) {
283+
app.Stats.QueryCount.Add(1)
284+
res, err := conn.ExecContext(context.Background(), sql)
285+
if err != nil {
286+
if !strings.Contains(err.Error(), "Error 1146") {
287+
plog.Info("insert error", zap.Error(err))
288+
return res, err
289+
}
290+
// if table not exists, we create it
291+
_, err := conn.ExecContext(context.Background(), app.Workload.BuildCreateTableStatement(tableIndex))
292+
if err != nil {
293+
plog.Info("create table error: ", zap.Error(err))
294+
return res, err
295+
}
296+
_, err = conn.ExecContext(context.Background(), sql)
297+
return res, err
298+
}
299+
return res, nil
300+
}
301+
302+
// executeWithValues executes a SQL statement with values
303+
func (app *WorkloadApp) executeWithValues(conn *sql.Conn, sqlStr string, n int, values []interface{}) (sql.Result, error) {
304+
app.Stats.QueryCount.Add(1)
305+
306+
// Try to get prepared statement from cache
307+
key := stmtCacheKey{conn: conn, sql: sqlStr}
308+
if stmt, ok := app.DBManager.StmtCache.Load(key); ok {
309+
return stmt.(*sql.Stmt).Exec(values...)
310+
}
311+
312+
// Prepare the statement
313+
stmt, err := conn.PrepareContext(context.Background(), sqlStr)
314+
if err != nil {
315+
if !strings.Contains(err.Error(), "Error 1146") {
316+
plog.Info("prepare error", zap.Error(err))
317+
return nil, err
318+
}
319+
// Create table if not exists
320+
_, err := conn.ExecContext(context.Background(), app.Workload.BuildCreateTableStatement(n))
321+
if err != nil {
322+
plog.Info("create table error: ", zap.Error(err))
323+
return nil, err
324+
}
325+
// Try prepare again
326+
stmt, err = conn.PrepareContext(context.Background(), sqlStr)
327+
if err != nil {
328+
return nil, err
329+
}
330+
}
331+
332+
// Cache the prepared statement
333+
app.DBManager.StmtCache.Store(key, stmt)
334+
335+
// Execute the prepared statement
336+
return stmt.Exec(values...)
337+
}
338+
339+
// StartMetricsReporting starts reporting metrics
340+
func (app *WorkloadApp) StartMetricsReporting() {
341+
go app.reportMetrics()
342+
}
343+
344+
func getSQLPreview(sql string) string {
345+
if len(sql) > 140 {
346+
return sql[:140] + "..."
347+
}
348+
return sql
349+
}

0 commit comments

Comments
 (0)