Skip to content

Commit 9f84593

Browse files
committed
workload: allow bank workload to run for multiple tables
Previously running the bank workload would spin up a single bank table and make changes to it with transfers between its rows/accounts. Now, with the num-tables argument specified, this workload will create that many tables (e.g. bank_1 to bank_n) and each transfer will choose a different table on which to make a transfer between two random accounts. This is a useful workload to be able to test multi-table and db-level changefeeds. Release note: None
1 parent 899c462 commit 9f84593

File tree

1 file changed

+75
-46
lines changed

1 file changed

+75
-46
lines changed

pkg/workload/bank/bank.go

Lines changed: 75 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const (
3434
defaultBatchSize = 1000
3535
defaultPayloadBytes = 100
3636
defaultRanges = 10
37+
defaultNumTables = 1
3738
maxTransfer = 999
3839
)
3940

@@ -45,6 +46,7 @@ type bank struct {
4546

4647
rows, batchSize int
4748
payloadBytes, ranges int
49+
numTables int
4850
}
4951

5052
func init() {
@@ -66,6 +68,7 @@ var bankMeta = workload.Meta{
6668
g.flags.IntVar(&g.batchSize, `batch-size`, defaultBatchSize, `Number of rows in each batch of initial data.`)
6769
g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each initial row.`)
6870
g.flags.IntVar(&g.ranges, `ranges`, defaultRanges, `Initial number of ranges in bank table.`)
71+
g.flags.IntVar(&g.numTables, `num-tables`, defaultNumTables, `Number of bank tables to create.`)
6972
RandomSeed.AddFlag(&g.flags)
7073
g.connFlags = workload.NewConnFlags(&g.flags)
7174
return g
@@ -117,11 +120,22 @@ func (b *bank) Hooks() workload.Hooks {
117120
if b.batchSize <= 0 {
118121
return errors.Errorf(`Value of batch-size must be greater than zero; was %d`, b.batchSize)
119122
}
123+
if b.numTables <= 0 {
124+
return errors.Errorf(`Value of num-tables must be greater than zero; was %d`, b.numTables)
125+
}
120126
return nil
121127
},
122128
}
123129
}
124130

131+
// tableName returns the table name with optional schema prefix and table number.
132+
func (b *bank) tableName(baseName string, tableIdx int) string {
133+
if b.numTables > 1 {
134+
return fmt.Sprintf("%s_%d", baseName, tableIdx)
135+
}
136+
return baseName
137+
}
138+
125139
var bankTypes = []*types.T{
126140
types.Int,
127141
types.Int,
@@ -131,46 +145,51 @@ var bankTypes = []*types.T{
131145
// Tables implements the Generator interface.
132146
func (b *bank) Tables() []workload.Table {
133147
numBatches := (b.rows + b.batchSize - 1) / b.batchSize // ceil(b.rows/b.batchSize)
134-
table := workload.Table{
135-
Name: `bank`,
136-
Schema: bankSchema,
137-
InitialRows: workload.BatchedTuples{
138-
NumBatches: numBatches,
139-
FillBatch: func(batchIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) {
140-
rng := rand.NewPCG(RandomSeed.Seed(), uint64(batchIdx))
141-
142-
rowBegin, rowEnd := batchIdx*b.batchSize, (batchIdx+1)*b.batchSize
143-
if rowEnd > b.rows {
144-
rowEnd = b.rows
145-
}
146-
cb.Reset(bankTypes, rowEnd-rowBegin, coldata.StandardColumnFactory)
147-
idCol := cb.ColVec(0).Int64()
148-
balanceCol := cb.ColVec(1).Int64()
149-
payloadCol := cb.ColVec(2).Bytes()
150-
// coldata.Bytes only allows appends so we have to reset it
151-
payloadCol.Reset()
152-
for rowIdx := rowBegin; rowIdx < rowEnd; rowIdx++ {
153-
var payload []byte
154-
*a, payload = a.Alloc(b.payloadBytes)
155-
randStringLetters(rng, payload)
156-
157-
rowOffset := rowIdx - rowBegin
158-
idCol[rowOffset] = int64(rowIdx)
159-
balanceCol[rowOffset] = 0
160-
payloadCol.Set(rowOffset, payload)
161-
}
162-
},
163-
},
164-
Splits: workload.Tuples(
165-
b.ranges-1,
166-
func(splitIdx int) []interface{} {
167-
return []interface{}{
168-
(splitIdx + 1) * (b.rows / b.ranges),
169-
}
148+
149+
tables := make([]workload.Table, b.numTables)
150+
for tableIdx := range b.numTables {
151+
table := workload.Table{
152+
Name: b.tableName(`bank`, tableIdx),
153+
Schema: bankSchema,
154+
InitialRows: workload.BatchedTuples{
155+
NumBatches: numBatches,
156+
FillBatch: func(batchIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) {
157+
rng := rand.NewPCG(RandomSeed.Seed(), uint64(batchIdx))
158+
159+
rowBegin, rowEnd := batchIdx*b.batchSize, (batchIdx+1)*b.batchSize
160+
if rowEnd > b.rows {
161+
rowEnd = b.rows
162+
}
163+
cb.Reset(bankTypes, rowEnd-rowBegin, coldata.StandardColumnFactory)
164+
idCol := cb.ColVec(0).Int64()
165+
balanceCol := cb.ColVec(1).Int64()
166+
payloadCol := cb.ColVec(2).Bytes()
167+
// coldata.Bytes only allows appends so we have to reset it
168+
payloadCol.Reset()
169+
for rowIdx := rowBegin; rowIdx < rowEnd; rowIdx++ {
170+
var payload []byte
171+
*a, payload = a.Alloc(b.payloadBytes)
172+
randStringLetters(rng, payload)
173+
174+
rowOffset := rowIdx - rowBegin
175+
idCol[rowOffset] = int64(rowIdx)
176+
balanceCol[rowOffset] = 0
177+
payloadCol.Set(rowOffset, payload)
178+
}
179+
},
170180
},
171-
),
181+
Splits: workload.Tuples(
182+
b.ranges-1,
183+
func(splitIdx int) []interface{} {
184+
return []interface{}{
185+
(splitIdx + 1) * (b.rows / b.ranges),
186+
}
187+
},
188+
),
189+
}
190+
tables[tableIdx] = table
172191
}
173-
return []workload.Table{table}
192+
return tables
174193
}
175194

176195
// Ops implements the Opser interface.
@@ -186,13 +205,17 @@ func (b *bank) Ops(
186205
db.SetMaxIdleConns(b.connFlags.Concurrency + 1)
187206

188207
// TODO(dan): Move the various queries in the backup/restore tests here.
189-
updateStmt, err := db.Prepare(`
190-
UPDATE bank
191-
SET balance = CASE id WHEN $1 THEN balance-$3 WHEN $2 THEN balance+$3 END
192-
WHERE id IN ($1, $2)
193-
`)
194-
if err != nil {
195-
return workload.QueryLoad{}, errors.CombineErrors(err, db.Close())
208+
updateStmts := make([]*gosql.Stmt, b.numTables)
209+
for tableIdx := range b.numTables {
210+
updateStmt, err := db.Prepare(fmt.Sprintf(`
211+
UPDATE %s
212+
SET balance = CASE id WHEN $1 THEN balance-$3 WHEN $2 THEN balance+$3 END
213+
WHERE id IN ($1, $2)
214+
`, b.tableName("bank", tableIdx)))
215+
if err != nil {
216+
return workload.QueryLoad{}, errors.CombineErrors(err, db.Close())
217+
}
218+
updateStmts[tableIdx] = updateStmt
196219
}
197220

198221
ql := workload.QueryLoad{
@@ -201,9 +224,15 @@ func (b *bank) Ops(
201224
},
202225
}
203226
for i := 0; i < b.connFlags.Concurrency; i++ {
204-
rng := rand.New(rand.NewPCG(RandomSeed.Seed(), 0))
227+
// The PCG is seeded with the worker index to ensure that each worker
228+
// gets a unique sequence of random operations.
229+
rng := rand.New(rand.NewPCG(RandomSeed.Seed(), uint64(i)))
205230
hists := reg.GetHandle()
231+
206232
workerFn := func(ctx context.Context) error {
233+
tableIdx := rng.IntN(b.numTables)
234+
updateStmt := updateStmts[tableIdx]
235+
207236
from := rng.IntN(b.rows)
208237
to := rng.IntN(b.rows - 1)
209238
for from == to && b.rows != 1 {

0 commit comments

Comments
 (0)