Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/slo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ jobs:
name: native-table-over-query-service
path: ./native/table/over/query/service
label: native/table/over/query/service
- id: native_bulk_upsert
name: native-bulk-upsert
path: ./native/bulk-upsert
label: native/bulk-upsert
- id: gorm_table
name: gorm-table
path: ./gorm/table
Expand Down Expand Up @@ -103,14 +107,20 @@ jobs:

- name: Run SLO Tests
run: |
EXTRA_ARGS=""
if [ "${{ matrix.sdk.id }}" = "native_bulk_upsert" ]; then
EXTRA_ARGS="--batch-size=10"
fi

./tests/slo/.bin/${{matrix.sdk.id}}_linux_amd64 run grpc://localhost:2135 /Root/testdb \
-prom-pgw localhost:9091 \
-report-period 250 \
-time ${{inputs.slo_workload_duration_seconds || 600}} \
-read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \
-write-rps ${{inputs.slo_workload_write_max_rps || 100}} \
-read-timeout 1000 \
-write-timeout 1000 || true
-write-timeout 1000 \
$EXTRA_ARGS

- if: always()
name: Store ydb chaos testing logs
Expand Down
5 changes: 1 addition & 4 deletions tests/slo/gorm/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ func main() {
})
}

err = g.Wait()
if err != nil {
panic(err)
}
_ = g.Wait()

log.Println("entries write ok")
case config.CleanupMode:
Expand Down
5 changes: 1 addition & 4 deletions tests/slo/gorm/table/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ func main() {
})
}

err = g.Wait()
if err != nil {
panic(err)
}
_ = g.Wait()

log.Println("entries write ok")
case config.CleanupMode:
Expand Down
4 changes: 4 additions & 0 deletions tests/slo/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Config struct {

Time int
ShutdownTime int

BatchSize int
}

func New() (*Config, error) {
Expand Down Expand Up @@ -100,6 +102,8 @@ func New() (*Config, error) {

fs.IntVar(&cfg.Time, "time", 600, "run time in seconds")
fs.IntVar(&cfg.ShutdownTime, "shutdown-time", 30, "time to wait before force kill workers")

fs.IntVar(&cfg.BatchSize, "batch-size", 1, "batch size (used for bulk_upsert/read_rows operations)")
default:
fmt.Print(mainHelp)

Expand Down
1 change: 1 addition & 0 deletions tests/slo/internal/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (g *Generator) Generate() (Row, error) {
ID: id,
PayloadDouble: func(a float64) *float64 { return &a }(rand.Float64()), //nolint:gosec // speed more important
PayloadTimestamp: func(a time.Time) *time.Time { return &a }(time.Now()),
PayloadHash: func(a uint64) *uint64 { return &a }(rand.Uint64()), //nolint:gosec
}

var err error
Expand Down
2 changes: 1 addition & 1 deletion tests/slo/internal/generator/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ type Row struct {
PayloadStr *string `sql:"payload_str" gorm:"column:payload_str" xorm:"'payload_str'"`
PayloadDouble *float64 `sql:"payload_double" gorm:"column:payload_double" xorm:"'payload_double'"`
PayloadTimestamp *time.Time `sql:"payload_timestamp" gorm:"column:payload_timestamp" xorm:"'payload_timestamp'"`
PayloadHash uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"`
PayloadHash *uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"`
}
10 changes: 7 additions & 3 deletions tests/slo/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,18 @@ func (m *Metrics) TimeoutsTotal() float64 {
}

func (m *Metrics) FailOnError() {
if m.ErrorsTotal() > 0 {
if m.ErrorsTotal()*100 > m.OperationsTotal() {
log.Panicf(
"unretriable (or not successfully retried) errors: %.0f errors out of %.0f operations",
m.ErrorsTotal(),
m.OperationsTotal(),
)
}
if m.TimeoutsTotal() > 0 {
log.Panicf("there are user timeouts: %.0f timeouts", m.TimeoutsTotal())
if m.TimeoutsTotal()*100 > m.OperationsTotal() {
log.Panicf(
"user timeouts: %.0f timeouts out of %.0f operations",
m.TimeoutsTotal(),
m.OperationsTotal(),
)
}
}
20 changes: 15 additions & 5 deletions tests/slo/internal/workers/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter
}

func (w *Workers) read(ctx context.Context) error {
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important

m := w.m.Start(metrics.OperationTypeRead)

_, attempts, err := w.s.Read(ctx, id)
var m metrics.Span
var attempts int
var err error
if w.s != nil {
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important
m = w.m.Start(metrics.OperationTypeRead)
_, attempts, err = w.s.Read(ctx, id)
} else {
ids := make([]uint64, 0, w.cfg.BatchSize)
for range w.cfg.BatchSize {
ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint:gosec
}
m = w.m.Start(metrics.OperationTypeRead)
_, attempts, err = w.sb.ReadBatch(ctx, ids)
}

m.Finish(err, attempts)

Expand Down
21 changes: 21 additions & 0 deletions tests/slo/internal/workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ type ReadWriter interface {
Write(ctx context.Context, row generator.Row) (attempts int, err error)
}

type BatchReadWriter interface {
ReadBatch(ctx context.Context, rowIDs []generator.RowID) (_ []generator.Row, attempts int, err error)
WriteBatch(ctx context.Context, rows []generator.Row) (attempts int, err error)
}

type Workers struct {
cfg *config.Config
s ReadWriter
sb BatchReadWriter
m *metrics.Metrics
}

Expand All @@ -35,6 +41,21 @@ func New(cfg *config.Config, s ReadWriter, ref, label, jobName string) (*Workers
}, nil
}

func NewWithBatch(cfg *config.Config, s BatchReadWriter, ref, label, jobName string) (*Workers, error) {
m, err := metrics.New(cfg.PushGateway, ref, label, jobName)
if err != nil {
log.Printf("create metrics failed: %v", err)

return nil, err
}

return &Workers{
cfg: cfg,
sb: s,
m: m,
}, nil
}

func (w *Workers) FailOnError() {
w.m.FailOnError()
}
Expand Down
34 changes: 24 additions & 10 deletions tests/slo/internal/workers/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,33 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite
}
}

func (w *Workers) write(ctx context.Context, gen *generator.Generator) error {
row, err := gen.Generate()
if err != nil {
log.Printf("generate error: %v", err)
func (w *Workers) write(ctx context.Context, gen *generator.Generator) (finalErr error) {
m := w.m.Start(metrics.OperationTypeWrite)
var attempts int
if w.s != nil {
row, err := gen.Generate()
if err != nil {
log.Printf("generate error: %v", err)

return err
}
return err
}

m := w.m.Start(metrics.OperationTypeWrite)
attempts, finalErr = w.s.Write(ctx, row)
} else {
rows := make([]generator.Row, 0, w.cfg.BatchSize)
for range w.cfg.BatchSize {
row, err := gen.Generate()
if err != nil {
log.Printf("generate error: %v", err)

attempts, err := w.s.Write(ctx, row)
return err
}
rows = append(rows, row)
}

m.Finish(err, attempts)
attempts, finalErr = w.sb.WriteBatch(ctx, rows)
}
m.Finish(finalErr, attempts)

return err
return finalErr
}
150 changes: 150 additions & 0 deletions tests/slo/native/bulk-upsert/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"context"
"fmt"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"slo/internal/config"
"slo/internal/generator"
"slo/internal/log"
"slo/internal/workers"
)

var (
ref string
label string
jobName = "slo_native_bulk_upsert"
)

func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
defer cancel()

cfg, err := config.New()
if err != nil {
panic(fmt.Errorf("create config failed: %w", err))
}

log.Println("program started")
defer log.Println("program finished")

ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second)
defer cancel()

go func() {
<-ctx.Done()
log.Println("exiting...")
}()

// pool size similar to query variant
s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, "no_session")
if err != nil {
panic(fmt.Errorf("create storage failed: %w", err))
}
defer func() {
var (
shutdownCtx context.Context
shutdownCancel context.CancelFunc
)
if cfg.ShutdownTime > 0 {
shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(),
time.Duration(cfg.ShutdownTime)*time.Second)
} else {
shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
}
defer shutdownCancel()

_ = s.Close(shutdownCtx)
}()

log.Println("db init ok")

switch cfg.Mode {
case config.CreateMode:
err = s.CreateTable(ctx)
if err != nil {
panic(fmt.Errorf("create table failed: %w", err))
}
log.Println("create table ok")

gen := generator.New(0)

g := errgroup.Group{}

for i := uint64(0); i < cfg.InitialDataCount; i++ {
g.Go(func() (err error) {
e, err := gen.Generate()
if err != nil {
return err
}

_, err = s.WriteBatch(ctx, []generator.Row{e})
if err != nil {
return err
}

return nil
})
}

err = g.Wait()
if err != nil {
panic(err)
}

log.Println("entries write ok")
case config.CleanupMode:
err = s.DropTable(ctx)
if err != nil {
panic(fmt.Errorf("create table failed: %w", err))
}

log.Println("cleanup table ok")
case config.RunMode:
gen := generator.New(cfg.InitialDataCount)

w, err := workers.NewWithBatch(cfg, s, ref, label, jobName)
if err != nil {
panic(fmt.Errorf("create workers failed: %w", err))
}
defer func() {
err := w.Close()
if err != nil {
panic(fmt.Errorf("workers close failed: %w", err))
}
log.Println("workers close ok")
}()

wg := sync.WaitGroup{}

readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1)
wg.Add(cfg.ReadRPS)
for i := 0; i < cfg.ReadRPS; i++ {
go w.Read(ctx, &wg, readRL)
}
log.Println("started " + strconv.Itoa(cfg.ReadRPS) + " read workers")

writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1)
wg.Add(cfg.WriteRPS)
for i := 0; i < cfg.WriteRPS; i++ {
go w.Write(ctx, &wg, writeRL, gen)
}
log.Println("started " + strconv.Itoa(cfg.WriteRPS) + " write workers")

metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1)
wg.Add(1)
go w.Metrics(ctx, &wg, metricsRL)

wg.Wait()
default:
panic(fmt.Errorf("unknown mode: %v", cfg.Mode))
}
}
Loading
Loading