Skip to content

Commit 1abb869

Browse files
committed
bulk_upsert/read_rows SLO workload
1 parent fe6beb2 commit 1abb869

File tree

10 files changed

+486
-18
lines changed

10 files changed

+486
-18
lines changed

.github/workflows/slo.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ jobs:
5252
name: native-table-over-query-service
5353
path: ./native/table/over/query/service
5454
label: native/table/over/query/service
55+
- id: native_bulk_upsert
56+
name: native-bulk-upsert
57+
path: ./native/bulk-upsert
58+
label: native/bulk-upsert
5559
- id: gorm_table
5660
name: gorm-table
5761
path: ./gorm/table
@@ -103,14 +107,20 @@ jobs:
103107
104108
- name: Run SLO Tests
105109
run: |
110+
EXTRA_ARGS=""
111+
if [ "${{ matrix.sdk.id }}" = "native_bulk_upsert" ]; then
112+
EXTRA_ARGS="--batch-size=10"
113+
fi
114+
106115
./tests/slo/.bin/${{matrix.sdk.id}}_linux_amd64 run grpc://localhost:2135 /Root/testdb \
107116
-prom-pgw localhost:9091 \
108117
-report-period 250 \
109118
-time ${{inputs.slo_workload_duration_seconds || 600}} \
110119
-read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \
111120
-write-rps ${{inputs.slo_workload_write_max_rps || 100}} \
112121
-read-timeout 1000 \
113-
-write-timeout 1000 || true
122+
-write-timeout 1000 \
123+
$EXTRA_ARGS
114124
115125
- if: always()
116126
name: Store ydb chaos testing logs

tests/slo/internal/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type Config struct {
3333

3434
Time int
3535
ShutdownTime int
36+
37+
BatchSize int
3638
}
3739

3840
func New() (*Config, error) {
@@ -100,6 +102,8 @@ func New() (*Config, error) {
100102

101103
fs.IntVar(&cfg.Time, "time", 600, "run time in seconds")
102104
fs.IntVar(&cfg.ShutdownTime, "shutdown-time", 30, "time to wait before force kill workers")
105+
106+
fs.IntVar(&cfg.BatchSize, "batch-size", 1, "batch size (used for bulk_upsert/read_rows operations)")
103107
default:
104108
fmt.Print(mainHelp)
105109

tests/slo/internal/generator/generator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func (g *Generator) Generate() (Row, error) {
3333
ID: id,
3434
PayloadDouble: func(a float64) *float64 { return &a }(rand.Float64()), //nolint:gosec // speed more important
3535
PayloadTimestamp: func(a time.Time) *time.Time { return &a }(time.Now()),
36+
PayloadHash: func(a uint64) *uint64 { return &a }(rand.Uint64()), //nolint:gosec
3637
}
3738

3839
var err error

tests/slo/internal/generator/row.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ type Row struct {
1111
PayloadStr *string `sql:"payload_str" gorm:"column:payload_str" xorm:"'payload_str'"`
1212
PayloadDouble *float64 `sql:"payload_double" gorm:"column:payload_double" xorm:"'payload_double'"`
1313
PayloadTimestamp *time.Time `sql:"payload_timestamp" gorm:"column:payload_timestamp" xorm:"'payload_timestamp'"`
14-
PayloadHash uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"`
14+
PayloadHash *uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"`
1515
}

tests/slo/internal/metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (m *Metrics) TimeoutsTotal() float64 {
267267
}
268268

269269
func (m *Metrics) FailOnError() {
270-
if m.ErrorsTotal() > 0 {
270+
if m.ErrorsTotal()*100 > m.OperationsTotal() {
271271
log.Panicf(
272272
"unretriable (or not successfully retried) errors: %.0f errors out of %.0f operations",
273273
m.ErrorsTotal(),

tests/slo/internal/workers/read.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,21 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter
3434
}
3535

3636
func (w *Workers) read(ctx context.Context) error {
37-
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important
38-
39-
m := w.m.Start(metrics.OperationTypeRead)
40-
41-
_, attempts, err := w.s.Read(ctx, id)
37+
var m metrics.Span
38+
var attempts int
39+
var err error
40+
if w.s != nil {
41+
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important
42+
m = w.m.Start(metrics.OperationTypeRead)
43+
_, attempts, err = w.s.Read(ctx, id)
44+
} else {
45+
ids := make([]uint64, 0, w.cfg.BatchSize)
46+
for range w.cfg.BatchSize {
47+
ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint:gosec
48+
}
49+
m = w.m.Start(metrics.OperationTypeRead)
50+
_, attempts, err = w.sb.ReadBatch(ctx, ids)
51+
}
4252

4353
m.Finish(err, attempts)
4454

tests/slo/internal/workers/workers.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,15 @@ type ReadWriter interface {
1414
Write(ctx context.Context, row generator.Row) (attempts int, err error)
1515
}
1616

17+
type BatchReadWriter interface {
18+
ReadBatch(ctx context.Context, rowIDs []generator.RowID) (_ []generator.Row, attempts int, err error)
19+
WriteBatch(ctx context.Context, rows []generator.Row) (attempts int, err error)
20+
}
21+
1722
type Workers struct {
1823
cfg *config.Config
1924
s ReadWriter
25+
sb BatchReadWriter
2026
m *metrics.Metrics
2127
}
2228

@@ -35,6 +41,21 @@ func New(cfg *config.Config, s ReadWriter, ref, label, jobName string) (*Workers
3541
}, nil
3642
}
3743

44+
func NewWithBatch(cfg *config.Config, s BatchReadWriter, ref, label, jobName string) (*Workers, error) {
45+
m, err := metrics.New(cfg.PushGateway, ref, label, jobName)
46+
if err != nil {
47+
log.Printf("create metrics failed: %v", err)
48+
49+
return nil, err
50+
}
51+
52+
return &Workers{
53+
cfg: cfg,
54+
sb: s,
55+
m: m,
56+
}, nil
57+
}
58+
3859
func (w *Workers) FailOnError() {
3960
w.m.FailOnError()
4061
}

tests/slo/internal/workers/write.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,33 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite
3333
}
3434
}
3535

36-
func (w *Workers) write(ctx context.Context, gen *generator.Generator) error {
37-
row, err := gen.Generate()
38-
if err != nil {
39-
log.Printf("generate error: %v", err)
36+
func (w *Workers) write(ctx context.Context, gen *generator.Generator) (finalErr error) {
37+
m := w.m.Start(metrics.OperationTypeWrite)
38+
var attempts int
39+
if w.s != nil {
40+
row, err := gen.Generate()
41+
if err != nil {
42+
log.Printf("generate error: %v", err)
4043

41-
return err
42-
}
44+
return err
45+
}
4346

44-
m := w.m.Start(metrics.OperationTypeWrite)
47+
attempts, finalErr = w.s.Write(ctx, row)
48+
} else {
49+
rows := make([]generator.Row, 0, w.cfg.BatchSize)
50+
for range w.cfg.BatchSize {
51+
row, err := gen.Generate()
52+
if err != nil {
53+
log.Printf("generate error: %v", err)
4554

46-
attempts, err := w.s.Write(ctx, row)
55+
return err
56+
}
57+
rows = append(rows, row)
58+
}
4759

48-
m.Finish(err, attempts)
60+
attempts, finalErr = w.sb.WriteBatch(ctx, rows)
61+
}
62+
m.Finish(finalErr, attempts)
4963

50-
return err
64+
return finalErr
5165
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os/signal"
7+
"strconv"
8+
"sync"
9+
"syscall"
10+
"time"
11+
12+
"golang.org/x/sync/errgroup"
13+
"golang.org/x/time/rate"
14+
15+
"slo/internal/config"
16+
"slo/internal/generator"
17+
"slo/internal/log"
18+
"slo/internal/workers"
19+
)
20+
21+
var (
22+
ref string
23+
label string
24+
jobName = "slo_native_bulk_upsert"
25+
)
26+
27+
func main() {
28+
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
29+
defer cancel()
30+
31+
cfg, err := config.New()
32+
if err != nil {
33+
panic(fmt.Errorf("create config failed: %w", err))
34+
}
35+
36+
log.Println("program started")
37+
defer log.Println("program finished")
38+
39+
ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second)
40+
defer cancel()
41+
42+
go func() {
43+
<-ctx.Done()
44+
log.Println("exiting...")
45+
}()
46+
47+
// pool size similar to query variant
48+
s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, "no_session")
49+
if err != nil {
50+
panic(fmt.Errorf("create storage failed: %w", err))
51+
}
52+
defer func() {
53+
var (
54+
shutdownCtx context.Context
55+
shutdownCancel context.CancelFunc
56+
)
57+
if cfg.ShutdownTime > 0 {
58+
shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(),
59+
time.Duration(cfg.ShutdownTime)*time.Second)
60+
} else {
61+
shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
62+
}
63+
defer shutdownCancel()
64+
65+
_ = s.Close(shutdownCtx)
66+
}()
67+
68+
log.Println("db init ok")
69+
70+
switch cfg.Mode {
71+
case config.CreateMode:
72+
err = s.CreateTable(ctx)
73+
if err != nil {
74+
panic(fmt.Errorf("create table failed: %w", err))
75+
}
76+
log.Println("create table ok")
77+
78+
gen := generator.New(0)
79+
80+
g := errgroup.Group{}
81+
82+
for i := uint64(0); i < cfg.InitialDataCount; i++ {
83+
g.Go(func() (err error) {
84+
e, err := gen.Generate()
85+
if err != nil {
86+
return err
87+
}
88+
89+
_, err = s.WriteBatch(ctx, []generator.Row{e})
90+
if err != nil {
91+
return err
92+
}
93+
94+
return nil
95+
})
96+
}
97+
98+
err = g.Wait()
99+
if err != nil {
100+
panic(err)
101+
}
102+
103+
log.Println("entries write ok")
104+
case config.CleanupMode:
105+
err = s.DropTable(ctx)
106+
if err != nil {
107+
panic(fmt.Errorf("create table failed: %w", err))
108+
}
109+
110+
log.Println("cleanup table ok")
111+
case config.RunMode:
112+
gen := generator.New(cfg.InitialDataCount)
113+
114+
w, err := workers.NewWithBatch(cfg, s, ref, label, jobName)
115+
if err != nil {
116+
panic(fmt.Errorf("create workers failed: %w", err))
117+
}
118+
defer func() {
119+
err := w.Close()
120+
if err != nil {
121+
panic(fmt.Errorf("workers close failed: %w", err))
122+
}
123+
log.Println("workers close ok")
124+
}()
125+
126+
wg := sync.WaitGroup{}
127+
128+
readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1)
129+
wg.Add(cfg.ReadRPS)
130+
for i := 0; i < cfg.ReadRPS; i++ {
131+
go w.Read(ctx, &wg, readRL)
132+
}
133+
log.Println("started " + strconv.Itoa(cfg.ReadRPS) + " read workers")
134+
135+
writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1)
136+
wg.Add(cfg.WriteRPS)
137+
for i := 0; i < cfg.WriteRPS; i++ {
138+
go w.Write(ctx, &wg, writeRL, gen)
139+
}
140+
log.Println("started " + strconv.Itoa(cfg.WriteRPS) + " write workers")
141+
142+
metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1)
143+
wg.Add(1)
144+
go w.Metrics(ctx, &wg, metricsRL)
145+
146+
wg.Wait()
147+
default:
148+
panic(fmt.Errorf("unknown mode: %v", cfg.Mode))
149+
}
150+
}

0 commit comments

Comments
 (0)