Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 55 additions & 35 deletions tools/workload/schema/largerow/large_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/log"
Expand Down Expand Up @@ -53,7 +55,7 @@ func newRowValues(r *rand.Rand, columnSize int, columnCount int, batchSize int)
if sb.Len() != 0 {
sb.Write([]byte(","))
}
index := rand.Int() % numColumnValues
index := r.Intn(numColumnValues)
columnValue := columns[index]

sb.WriteByte('\'')
Expand All @@ -65,16 +67,6 @@ func newRowValues(r *rand.Rand, columnSize int, columnCount int, batchSize int)
return result
}

func (l *LargeRowWorkload) getSmallRow() string {
index := l.r.Int() % len(l.smallRows)
return l.smallRows[index]
}

func (l *LargeRowWorkload) getLargeRow() string {
index := l.r.Int() % len(l.largeRows)
return l.largeRows[index]
}

type LargeRowWorkload struct {
smallRows []string
largeRows []string
Expand All @@ -83,26 +75,44 @@ type LargeRowWorkload struct {

columnCount int

r *rand.Rand
seed atomic.Int64
randPool sync.Pool
}

func (l *LargeRowWorkload) getRand() *rand.Rand {
return l.randPool.Get().(*rand.Rand)
}

func (l *LargeRowWorkload) putRand(r *rand.Rand) {
l.randPool.Put(r)
}

func (l *LargeRowWorkload) getSmallRow(r *rand.Rand) string {
return l.smallRows[r.Intn(len(l.smallRows))]
}

func (l *LargeRowWorkload) getLargeRow(r *rand.Rand) string {
return l.largeRows[r.Intn(len(l.largeRows))]
}

func NewLargeRowWorkload(
normalRowSize, largeRowSize int, largeRatio float64,
) schema.Workload {
columnCount := int(float64(largeRowSize) / varcharColumnMaxLen)

r := rand.New(rand.NewSource(time.Now().UnixNano()))
columnCount := int(float64(largeRowSize) / varcharColumnMaxLen)
smallColumnSize := int(float64(normalRowSize) / float64(columnCount))

return &LargeRowWorkload{
r: r,
largeRatio: largeRatio,

l := &LargeRowWorkload{
largeRatio: largeRatio,
columnCount: columnCount,

smallRows: newRowValues(r, smallColumnSize, columnCount, 512),
largeRows: newRowValues(r, varcharColumnMaxLen, columnCount, 128),
smallRows: newRowValues(r, smallColumnSize, columnCount, 512),
largeRows: newRowValues(r, varcharColumnMaxLen, columnCount, 128),
}
l.seed.Store(time.Now().UnixNano())
l.randPool.New = func() any {
return rand.New(rand.NewSource(l.seed.Add(1)))
}
return l
}

func getTableName(n int) string {
Expand All @@ -124,37 +134,44 @@ func (l *LargeRowWorkload) BuildCreateTableStatement(n int) string {

func (l *LargeRowWorkload) BuildInsertSql(tableN int, batchSize int) string {
tableName := getTableName(tableN)
insertSQL := fmt.Sprintf("INSERT INTO %s VALUES (%d,%s)", tableName, rand.Int63()%maxValue, l.getSmallRow())
r := l.getRand()
defer l.putRand(r)

var sb strings.Builder
sb.WriteString(fmt.Sprintf("INSERT INTO %s VALUES (%d,%s)", tableName, r.Int63()%maxValue, l.getSmallRow(r)))

var largeRowCount int
for i := 1; i < batchSize; i++ {
if l.r.Float64() < l.largeRatio {
insertSQL = fmt.Sprintf("%s,(%d,%s)", insertSQL, rand.Int63()%maxValue, l.getLargeRow())
if r.Float64() < l.largeRatio {
sb.WriteString(fmt.Sprintf(",(%d,%s)", r.Int63()%maxValue, l.getLargeRow(r)))
largeRowCount++
} else {
insertSQL = fmt.Sprintf("%s,(%d,%s)", insertSQL, rand.Int63()%maxValue, l.getSmallRow())
sb.WriteString(fmt.Sprintf(",(%d,%s)", r.Int63()%maxValue, l.getSmallRow(r)))
}
}

log.Debug("large row workload, insert the table",
zap.Int("table", tableN), zap.Int("batchSize", batchSize),
zap.Int("largeRowCount", largeRowCount), zap.Int("length", len(insertSQL)))
zap.Int("largeRowCount", largeRowCount), zap.Int("length", sb.Len()))

return insertSQL
return sb.String()
}

func (l *LargeRowWorkload) BuildUpdateSql(opts schema.UpdateOption) string {
tableName := getTableName(opts.TableIndex)
r := l.getRand()
defer l.putRand(r)

upsertSQL := strings.Builder{}
upsertSQL.WriteString(fmt.Sprintf("INSERT INTO %s VALUES (%d,%s)", tableName, rand.Int63()%maxValue, l.getSmallRow()))
upsertSQL.WriteString(fmt.Sprintf("INSERT INTO %s VALUES (%d,%s)", tableName, r.Int63()%maxValue, l.getSmallRow(r)))

var largeRowCount int
for i := 1; i < opts.Batch; i++ {
if l.r.Float64() < l.largeRatio {
upsertSQL.WriteString(fmt.Sprintf(",(%d,%s)", rand.Int63()%maxValue, l.getLargeRow()))
if r.Float64() < l.largeRatio {
upsertSQL.WriteString(fmt.Sprintf(",(%d,%s)", r.Int63()%maxValue, l.getLargeRow(r)))
largeRowCount++
} else {
upsertSQL.WriteString(fmt.Sprintf(",(%d,%s)", rand.Int63()%maxValue, l.getSmallRow()))
upsertSQL.WriteString(fmt.Sprintf(",(%d,%s)", r.Int63()%maxValue, l.getSmallRow(r)))
}
}
upsertSQL.WriteString(" ON DUPLICATE KEY UPDATE col_0=VALUES(col_0)")
Expand All @@ -166,15 +183,18 @@ func (l *LargeRowWorkload) BuildUpdateSql(opts schema.UpdateOption) string {
}

func (l *LargeRowWorkload) BuildDeleteSql(opts schema.DeleteOption) string {
deleteType := rand.Intn(3)
r := l.getRand()
defer l.putRand(r)

deleteType := r.Intn(3)
tableName := getTableName(opts.TableIndex)

switch deleteType {
case 0:
// Strategy 1: Random single/multiple row delete by ID
var buf strings.Builder
for i := 0; i < opts.Batch; i++ {
id := rand.Int63() % maxValue
id := r.Int63() % maxValue
if i > 0 {
buf.WriteString(";")
}
Expand All @@ -184,7 +204,7 @@ func (l *LargeRowWorkload) BuildDeleteSql(opts schema.DeleteOption) string {

case 1:
// Strategy 2: Range delete by ID
startID := rand.Int63() % maxValue
startID := r.Int63() % maxValue
endID := startID + int64(opts.Batch*100)
if endID > maxValue {
endID = maxValue
Expand All @@ -194,7 +214,7 @@ func (l *LargeRowWorkload) BuildDeleteSql(opts schema.DeleteOption) string {

case 2:
// Strategy 3: Conditional delete by random ID modulo
modValue := rand.Intn(1000)
modValue := r.Intn(1000)
return fmt.Sprintf("DELETE FROM %s WHERE id %% 1000 = %d LIMIT %d",
tableName, modValue, opts.Batch)

Expand Down