Skip to content

Commit 1041bf0

Browse files
committed
workload: allow BatchedTuples to contain duplicates
Previously the BatchedTuples used to load initial data for a workload disallowed duplicate keys violating uniqueness constraints. Some of the workload key generators may inevitably generate duplicates when hashing keys because of the relatively small key space of 64-bit integers. This commit updates the BatchedTuples type so that the workload can indicate the possible existence of duplicates, and updates the data loader to use INSERT ... ON CONFLICT DO NOTHING queries to perform the insertion, effectively ignoring the duplicates.
1 parent fd6d253 commit 1041bf0

File tree

4 files changed

+21
-1
lines changed

4 files changed

+21
-1
lines changed

pkg/workload/kv/kv.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,13 @@ func (w *kv) Tables() []workload.Table {
428428
const batchSize = 1000
429429
table.InitialRows = workload.BatchedTuples{
430430
NumBatches: (w.insertCount + batchSize - 1) / batchSize,
431+
// If the key sequence is not sequential, duplicates are possible.
432+
// The zipfian distribution produces duplicates by design, and the
433+
// hash key mapper can also produce duplicates at larger insert
434+
// counts (it's at least inevitable at ~1b rows). Marking that the
435+
// keys may contain duplicates will cause the data loader to use
436+
// INSERT ... ON CONFLICT DO NOTHING statements.
437+
MayContainDuplicates: !w.sequential,
431438
FillBatch: func(batchIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) {
432439
rowBegin, rowEnd := batchIdx*batchSize, (batchIdx+1)*batchSize
433440
if rowEnd > w.insertCount {

pkg/workload/workload.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Generator interface {
4646
func SupportsFixtures(gen Generator) bool {
4747
tt := gen.Tables()
4848
for _, t := range tt {
49-
if t.InitialRows.FillBatch == nil {
49+
if t.InitialRows.FillBatch == nil || t.InitialRows.MayContainDuplicates {
5050
return false
5151
}
5252
}
@@ -205,6 +205,10 @@ func (t Table) GetResolvedName() tree.TableName {
205205
type BatchedTuples struct {
206206
// NumBatches is the number of batches of tuples.
207207
NumBatches int
208+
// MayContainDuplicates is a flag indicating whether the tuples may contain
209+
// keys that violate uniqueness constraints. If true, the data loader will
210+
// use INSERT ... ON CONFLICT DO NOTHING statements.
211+
MayContainDuplicates bool
208212
// FillBatch is a function to deterministically compute a columnar-batch of
209213
// tuples given its index.
210214
//

pkg/workload/workloadsql/dataload.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ func (l InsertsDataLoader) InitialDataLoad(
131131
var numRows int
132132
flush := func() error {
133133
if len(params) > 0 {
134+
if table.InitialRows.MayContainDuplicates {
135+
fmt.Fprint(&insertStmtBuf, ` ON CONFLICT DO NOTHING`)
136+
}
134137
insertStmt := insertStmtBuf.String()
135138
if _, err := db.ExecContext(gCtx, insertStmt, params...); err != nil {
136139
return errors.Wrapf(err, "failed insert into %s", tableName.String())

pkg/workload/ycsb/ycsb.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,12 @@ func (g *ycsb) Tables() []workload.Table {
359359
const batchSize = 1000
360360
usertable.InitialRows = workload.BatchedTuples{
361361
NumBatches: (g.insertCount + batchSize - 1) / batchSize,
362+
// If the key sequence is hashed, duplicates are possible. Hash
363+
// collisions are inevitable at large insert counts (they're at
364+
// least inevitable at ~1b rows). Marking that the keys may contain
365+
// duplicates will cause the data loader to use INSERT ... ON
366+
// CONFLICT DO NOTHING statements.
367+
MayContainDuplicates: !g.insertHash,
362368
FillBatch: func(batchIdx int, cb coldata.Batch, _ *bufalloc.ByteAllocator) {
363369
rowBegin, rowEnd := batchIdx*batchSize, (batchIdx+1)*batchSize
364370
if rowEnd > g.insertCount {

0 commit comments

Comments
 (0)