Skip to content

Commit 692074f

Browse files
committed
bulk_upsert/read_rows SLO workload
1 parent fe6beb2 commit 692074f

File tree

14 files changed

+490
-34
lines changed

14 files changed

+490
-34
lines changed

.github/workflows/slo.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ jobs:
5252
name: native-table-over-query-service
5353
path: ./native/table/over/query/service
5454
label: native/table/over/query/service
55+
- id: native_bulk_upsert
56+
name: native-bulk-upsert
57+
path: ./native/bulk-upsert
58+
label: native/bulk-upsert
5559
- id: gorm_table
5660
name: gorm-table
5761
path: ./gorm/table
@@ -103,14 +107,20 @@ jobs:
103107
104108
- name: Run SLO Tests
105109
run: |
110+
EXTRA_ARGS=""
111+
if [ "${{ matrix.sdk.id }}" = "native_bulk_upsert" ]; then
112+
EXTRA_ARGS="--batch-size=10"
113+
fi
114+
106115
./tests/slo/.bin/${{matrix.sdk.id}}_linux_amd64 run grpc://localhost:2135 /Root/testdb \
107116
-prom-pgw localhost:9091 \
108117
-report-period 250 \
109118
-time ${{inputs.slo_workload_duration_seconds || 600}} \
110119
-read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \
111120
-write-rps ${{inputs.slo_workload_write_max_rps || 100}} \
112121
-read-timeout 1000 \
113-
-write-timeout 1000 || true
122+
-write-timeout 1000 \
123+
$EXTRA_ARGS
114124
115125
- if: always()
116126
name: Store ydb chaos testing logs

tests/slo/gorm/query/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,7 @@ func main() {
8585
})
8686
}
8787

88-
err = g.Wait()
89-
if err != nil {
90-
panic(err)
91-
}
88+
_ = g.Wait()
9289

9390
log.Println("entries write ok")
9491
case config.CleanupMode:

tests/slo/gorm/table/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,7 @@ func main() {
8585
})
8686
}
8787

88-
err = g.Wait()
89-
if err != nil {
90-
panic(err)
91-
}
88+
_ = g.Wait()
9289

9390
log.Println("entries write ok")
9491
case config.CleanupMode:

tests/slo/internal/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type Config struct {
3333

3434
Time int
3535
ShutdownTime int
36+
37+
BatchSize int
3638
}
3739

3840
func New() (*Config, error) {
@@ -100,6 +102,8 @@ func New() (*Config, error) {
100102

101103
fs.IntVar(&cfg.Time, "time", 600, "run time in seconds")
102104
fs.IntVar(&cfg.ShutdownTime, "shutdown-time", 30, "time to wait before force kill workers")
105+
106+
fs.IntVar(&cfg.BatchSize, "batch-size", 1, "batch size (used for bulk_upsert/read_rows operations)")
103107
default:
104108
fmt.Print(mainHelp)
105109

tests/slo/internal/generator/generator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func (g *Generator) Generate() (Row, error) {
3333
ID: id,
3434
PayloadDouble: func(a float64) *float64 { return &a }(rand.Float64()), //nolint:gosec // speed more important
3535
PayloadTimestamp: func(a time.Time) *time.Time { return &a }(time.Now()),
36+
PayloadHash: func(a uint64) *uint64 { return &a }(rand.Uint64()), //nolint:gosec
3637
}
3738

3839
var err error

tests/slo/internal/generator/row.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ type Row struct {
1111
PayloadStr *string `sql:"payload_str" gorm:"column:payload_str" xorm:"'payload_str'"`
1212
PayloadDouble *float64 `sql:"payload_double" gorm:"column:payload_double" xorm:"'payload_double'"`
1313
PayloadTimestamp *time.Time `sql:"payload_timestamp" gorm:"column:payload_timestamp" xorm:"'payload_timestamp'"`
14-
PayloadHash uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"`
14+
PayloadHash *uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"`
1515
}

tests/slo/internal/metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (m *Metrics) TimeoutsTotal() float64 {
267267
}
268268

269269
func (m *Metrics) FailOnError() {
270-
if m.ErrorsTotal() > 0 {
270+
if m.ErrorsTotal()*100 > m.OperationsTotal() {
271271
log.Panicf(
272272
"unretriable (or not successfully retried) errors: %.0f errors out of %.0f operations",
273273
m.ErrorsTotal(),

tests/slo/internal/workers/read.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,21 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter
3434
}
3535

3636
func (w *Workers) read(ctx context.Context) error {
37-
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important
38-
39-
m := w.m.Start(metrics.OperationTypeRead)
40-
41-
_, attempts, err := w.s.Read(ctx, id)
37+
var m metrics.Span
38+
var attempts int
39+
var err error
40+
if w.s != nil {
41+
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important
42+
m = w.m.Start(metrics.OperationTypeRead)
43+
_, attempts, err = w.s.Read(ctx, id)
44+
} else {
45+
ids := make([]uint64, 0, w.cfg.BatchSize)
46+
for range w.cfg.BatchSize {
47+
ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint:gosec
48+
}
49+
m = w.m.Start(metrics.OperationTypeRead)
50+
_, attempts, err = w.sb.ReadBatch(ctx, ids)
51+
}
4252

4353
m.Finish(err, attempts)
4454

tests/slo/internal/workers/workers.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,15 @@ type ReadWriter interface {
1414
Write(ctx context.Context, row generator.Row) (attempts int, err error)
1515
}
1616

17+
type BatchReadWriter interface {
18+
ReadBatch(ctx context.Context, rowIDs []generator.RowID) (_ []generator.Row, attempts int, err error)
19+
WriteBatch(ctx context.Context, rows []generator.Row) (attempts int, err error)
20+
}
21+
1722
type Workers struct {
1823
cfg *config.Config
1924
s ReadWriter
25+
sb BatchReadWriter
2026
m *metrics.Metrics
2127
}
2228

@@ -35,6 +41,21 @@ func New(cfg *config.Config, s ReadWriter, ref, label, jobName string) (*Workers
3541
}, nil
3642
}
3743

44+
func NewWithBatch(cfg *config.Config, s BatchReadWriter, ref, label, jobName string) (*Workers, error) {
45+
m, err := metrics.New(cfg.PushGateway, ref, label, jobName)
46+
if err != nil {
47+
log.Printf("create metrics failed: %v", err)
48+
49+
return nil, err
50+
}
51+
52+
return &Workers{
53+
cfg: cfg,
54+
sb: s,
55+
m: m,
56+
}, nil
57+
}
58+
3859
func (w *Workers) FailOnError() {
3960
w.m.FailOnError()
4061
}

tests/slo/internal/workers/write.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,33 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite
3333
}
3434
}
3535

36-
func (w *Workers) write(ctx context.Context, gen *generator.Generator) error {
37-
row, err := gen.Generate()
38-
if err != nil {
39-
log.Printf("generate error: %v", err)
36+
func (w *Workers) write(ctx context.Context, gen *generator.Generator) (finalErr error) {
37+
m := w.m.Start(metrics.OperationTypeWrite)
38+
var attempts int
39+
if w.s != nil {
40+
row, err := gen.Generate()
41+
if err != nil {
42+
log.Printf("generate error: %v", err)
4043

41-
return err
42-
}
44+
return err
45+
}
4346

44-
m := w.m.Start(metrics.OperationTypeWrite)
47+
attempts, finalErr = w.s.Write(ctx, row)
48+
} else {
49+
rows := make([]generator.Row, 0, w.cfg.BatchSize)
50+
for range w.cfg.BatchSize {
51+
row, err := gen.Generate()
52+
if err != nil {
53+
log.Printf("generate error: %v", err)
4554

46-
attempts, err := w.s.Write(ctx, row)
55+
return err
56+
}
57+
rows = append(rows, row)
58+
}
4759

48-
m.Finish(err, attempts)
60+
attempts, finalErr = w.sb.WriteBatch(ctx, rows)
61+
}
62+
m.Finish(finalErr, attempts)
4963

50-
return err
64+
return finalErr
5165
}

0 commit comments

Comments
 (0)