diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 8ddc66bcd..2fb779cee 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -52,6 +52,10 @@ jobs: name: native-table-over-query-service path: ./native/table/over/query/service label: native/table/over/query/service + - id: native_bulk_upsert + name: native-bulk-upsert + path: ./native/bulk-upsert + label: native/bulk-upsert - id: gorm_table name: gorm-table path: ./gorm/table @@ -103,6 +107,11 @@ jobs: - name: Run SLO Tests run: | + EXTRA_ARGS="" + if [ "${{ matrix.sdk.id }}" = "native_bulk_upsert" ]; then + EXTRA_ARGS="--batch-size=10" + fi + ./tests/slo/.bin/${{matrix.sdk.id}}_linux_amd64 run grpc://localhost:2135 /Root/testdb \ -prom-pgw localhost:9091 \ -report-period 250 \ @@ -110,7 +119,8 @@ jobs: -read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \ -write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ -read-timeout 1000 \ - -write-timeout 1000 || true + -write-timeout 1000 \ + $EXTRA_ARGS - if: always() name: Store ydb chaos testing logs diff --git a/tests/slo/gorm/query/main.go b/tests/slo/gorm/query/main.go index 02e1a8319..17fa101b8 100644 --- a/tests/slo/gorm/query/main.go +++ b/tests/slo/gorm/query/main.go @@ -85,10 +85,7 @@ func main() { }) } - err = g.Wait() - if err != nil { - panic(err) - } + _ = g.Wait() log.Println("entries write ok") case config.CleanupMode: diff --git a/tests/slo/gorm/table/main.go b/tests/slo/gorm/table/main.go index 25cf6bdde..d91a9312c 100644 --- a/tests/slo/gorm/table/main.go +++ b/tests/slo/gorm/table/main.go @@ -85,10 +85,7 @@ func main() { }) } - err = g.Wait() - if err != nil { - panic(err) - } + _ = g.Wait() log.Println("entries write ok") case config.CleanupMode: diff --git a/tests/slo/internal/config/config.go b/tests/slo/internal/config/config.go index 14ef8fc7b..d4276bb3b 100644 --- a/tests/slo/internal/config/config.go +++ b/tests/slo/internal/config/config.go @@ -33,6 +33,8 @@ type Config struct { Time int ShutdownTime int + + BatchSize int } func New() (*Config, error) { @@ -100,6 +102,8 @@ func New() (*Config, error) { fs.IntVar(&cfg.Time, "time", 600, "run time in seconds") fs.IntVar(&cfg.ShutdownTime, "shutdown-time", 30, "time to wait before force kill workers") + + fs.IntVar(&cfg.BatchSize, "batch-size", 1, "batch size (used for bulk_upsert/read_rows operations)") default: fmt.Print(mainHelp) diff --git a/tests/slo/internal/generator/generator.go b/tests/slo/internal/generator/generator.go index 60e8ca91f..85efc23fc 100755 --- a/tests/slo/internal/generator/generator.go +++ b/tests/slo/internal/generator/generator.go @@ -33,6 +33,7 @@ func (g *Generator) Generate() (Row, error) { ID: id, PayloadDouble: func(a float64) *float64 { return &a }(rand.Float64()), //nolint:gosec // speed more important PayloadTimestamp: func(a time.Time) *time.Time { return &a }(time.Now()), + PayloadHash: func(a uint64) *uint64 { return &a }(rand.Uint64()), //nolint:gosec } var err error diff --git a/tests/slo/internal/generator/row.go b/tests/slo/internal/generator/row.go index 2eb2c1fc7..a43865790 100644 --- a/tests/slo/internal/generator/row.go +++ b/tests/slo/internal/generator/row.go @@ -11,5 +11,5 @@ type Row struct { PayloadStr *string `sql:"payload_str" gorm:"column:payload_str" xorm:"'payload_str'"` PayloadDouble *float64 `sql:"payload_double" gorm:"column:payload_double" xorm:"'payload_double'"` PayloadTimestamp *time.Time `sql:"payload_timestamp" gorm:"column:payload_timestamp" xorm:"'payload_timestamp'"` - PayloadHash uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"` + PayloadHash *uint64 `sql:"payload_hash" gorm:"column:payload_hash" xorm:"'payload_hash'"` } diff --git a/tests/slo/internal/metrics/metrics.go b/tests/slo/internal/metrics/metrics.go index c47bd3191..e8b07c724 100644 --- a/tests/slo/internal/metrics/metrics.go +++ b/tests/slo/internal/metrics/metrics.go @@ -267,14 +267,18 @@ func (m *Metrics) TimeoutsTotal() float64 { } func (m *Metrics) FailOnError() { - if m.ErrorsTotal() > 0 { + if m.ErrorsTotal()*100 > m.OperationsTotal() { log.Panicf( "unretriable (or not successfully retried) errors: %.0f errors out of %.0f operations", m.ErrorsTotal(), m.OperationsTotal(), ) } - if m.TimeoutsTotal() > 0 { - log.Panicf("there are user timeouts: %.0f timeouts", m.TimeoutsTotal()) + if m.TimeoutsTotal()*100 > m.OperationsTotal() { + log.Panicf( + "user timeouts: %.0f timeouts out of %.0f operations", + m.TimeoutsTotal(), + m.OperationsTotal(), + ) } } diff --git a/tests/slo/internal/workers/read.go b/tests/slo/internal/workers/read.go index d989bcd93..99a4264a6 100644 --- a/tests/slo/internal/workers/read.go +++ b/tests/slo/internal/workers/read.go @@ -34,11 +34,21 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter } func (w *Workers) read(ctx context.Context) error { - id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important - - m := w.m.Start(metrics.OperationTypeRead) - - _, attempts, err := w.s.Read(ctx, id) + var m metrics.Span + var attempts int + var err error + if w.s != nil { + id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important + m = w.m.Start(metrics.OperationTypeRead) + _, attempts, err = w.s.Read(ctx, id) + } else { + ids := make([]uint64, 0, w.cfg.BatchSize) + for range w.cfg.BatchSize { + ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint:gosec + } + m = w.m.Start(metrics.OperationTypeRead) + _, attempts, err = w.sb.ReadBatch(ctx, ids) + } m.Finish(err, attempts) diff --git a/tests/slo/internal/workers/workers.go b/tests/slo/internal/workers/workers.go index 0849eb585..61fb491b1 100644 --- a/tests/slo/internal/workers/workers.go +++ b/tests/slo/internal/workers/workers.go @@ -14,9 +14,15 @@ type ReadWriter interface { Write(ctx context.Context, row generator.Row) (attempts int, err error) } +type BatchReadWriter interface { + ReadBatch(ctx context.Context, rowIDs []generator.RowID) (_ []generator.Row, attempts int, err error) + WriteBatch(ctx context.Context, rows []generator.Row) (attempts int, err error) +} + type Workers struct { cfg *config.Config s ReadWriter + sb BatchReadWriter m *metrics.Metrics } @@ -35,6 +41,21 @@ func New(cfg *config.Config, s ReadWriter, ref, label, jobName string) (*Workers }, nil } +func NewWithBatch(cfg *config.Config, s BatchReadWriter, ref, label, jobName string) (*Workers, error) { + m, err := metrics.New(cfg.PushGateway, ref, label, jobName) + if err != nil { + log.Printf("create metrics failed: %v", err) + + return nil, err + } + + return &Workers{ + cfg: cfg, + sb: s, + m: m, + }, nil +} + func (w *Workers) FailOnError() { w.m.FailOnError() } diff --git a/tests/slo/internal/workers/write.go b/tests/slo/internal/workers/write.go index 5fea95994..814904024 100644 --- a/tests/slo/internal/workers/write.go +++ b/tests/slo/internal/workers/write.go @@ -33,19 +33,33 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite } } -func (w *Workers) write(ctx context.Context, gen *generator.Generator) error { - row, err := gen.Generate() - if err != nil { - log.Printf("generate error: %v", err) +func (w *Workers) write(ctx context.Context, gen *generator.Generator) (finalErr error) { + m := w.m.Start(metrics.OperationTypeWrite) + var attempts int + if w.s != nil { + row, err := gen.Generate() + if err != nil { + log.Printf("generate error: %v", err) - return err - } + return err + } - m := w.m.Start(metrics.OperationTypeWrite) + attempts, finalErr = w.s.Write(ctx, row) + } else { + rows := make([]generator.Row, 0, w.cfg.BatchSize) + for range w.cfg.BatchSize { + row, err := gen.Generate() + if err != nil { + log.Printf("generate error: %v", err) - attempts, err := w.s.Write(ctx, row) + return err + } + rows = append(rows, row) + } - m.Finish(err, attempts) + attempts, finalErr = w.sb.WriteBatch(ctx, rows) + } + m.Finish(finalErr, attempts) - return err + return finalErr } diff --git a/tests/slo/native/bulk-upsert/main.go b/tests/slo/native/bulk-upsert/main.go new file mode 100644 index 000000000..fb3273011 --- /dev/null +++ b/tests/slo/native/bulk-upsert/main.go @@ -0,0 +1,150 @@ +package main + +import ( + "context" + "fmt" + "os/signal" + "strconv" + "sync" + "syscall" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + + "slo/internal/config" + "slo/internal/generator" + "slo/internal/log" + "slo/internal/workers" +) + +var ( + ref string + label string + jobName = "slo_native_bulk_upsert" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + defer cancel() + + cfg, err := config.New() + if err != nil { + panic(fmt.Errorf("create config failed: %w", err)) + } + + log.Println("program started") + defer log.Println("program finished") + + ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) + defer cancel() + + go func() { + <-ctx.Done() + log.Println("exiting...") + }() + + // pool size similar to query variant + s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, "no_session") + if err != nil { + panic(fmt.Errorf("create storage failed: %w", err)) + } + defer func() { + var ( + shutdownCtx context.Context + shutdownCancel context.CancelFunc + ) + if cfg.ShutdownTime > 0 { + shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), + time.Duration(cfg.ShutdownTime)*time.Second) + } else { + shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) + } + defer shutdownCancel() + + _ = s.Close(shutdownCtx) + }() + + log.Println("db init ok") + + switch cfg.Mode { + case config.CreateMode: + err = s.CreateTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + log.Println("create table ok") + + gen := generator.New(0) + + g := errgroup.Group{} + + for i := uint64(0); i < cfg.InitialDataCount; i++ { + g.Go(func() (err error) { + e, err := gen.Generate() + if err != nil { + return err + } + + _, err = s.WriteBatch(ctx, []generator.Row{e}) + if err != nil { + return err + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + panic(err) + } + + log.Println("entries write ok") + case config.CleanupMode: + err = s.DropTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + + log.Println("cleanup table ok") + case config.RunMode: + gen := generator.New(cfg.InitialDataCount) + + w, err := workers.NewWithBatch(cfg, s, ref, label, jobName) + if err != nil { + panic(fmt.Errorf("create workers failed: %w", err)) + } + defer func() { + err := w.Close() + if err != nil { + panic(fmt.Errorf("workers close failed: %w", err)) + } + log.Println("workers close ok") + }() + + wg := sync.WaitGroup{} + + readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1) + wg.Add(cfg.ReadRPS) + for i := 0; i < cfg.ReadRPS; i++ { + go w.Read(ctx, &wg, readRL) + } + log.Println("started " + strconv.Itoa(cfg.ReadRPS) + " read workers") + + writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1) + wg.Add(cfg.WriteRPS) + for i := 0; i < cfg.WriteRPS; i++ { + go w.Write(ctx, &wg, writeRL, gen) + } + log.Println("started " + strconv.Itoa(cfg.WriteRPS) + " write workers") + + metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1) + wg.Add(1) + go w.Metrics(ctx, &wg, metricsRL) + + wg.Wait() + default: + panic(fmt.Errorf("unknown mode: %v", cfg.Mode)) + } +} diff --git a/tests/slo/native/bulk-upsert/storage.go b/tests/slo/native/bulk-upsert/storage.go new file mode 100644 index 000000000..36c4dd0e7 --- /dev/null +++ b/tests/slo/native/bulk-upsert/storage.go @@ -0,0 +1,258 @@ +package main + +import ( + "context" + "fmt" + "path" + "time" + + ydb "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" + + "slo/internal/config" + "slo/internal/generator" +) + +const createTableQuery = ` +CREATE TABLE IF NOT EXISTS` + " `%s` " + `( + id Uint64?, + payload_str Text?, + payload_double Double?, + payload_timestamp Timestamp?, + payload_hash Uint64?, + PRIMARY KEY (id) +) WITH ( + UNIFORM_PARTITIONS = %d, + AUTO_PARTITIONING_BY_SIZE = ENABLED, + AUTO_PARTITIONING_PARTITION_SIZE_MB = %d, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d +) +` + +const dropTableQuery = "DROP TABLE IF EXISTS `%s`;" + +type Storage struct { + db *ydb.Driver + cfg *config.Config + tablePath string + retryBudget interface { + budget.Budget + Stop() + } +} + +func NewStorage(ctx context.Context, cfg *config.Config, poolSize int, label string) (*Storage, error) { + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:mnd + defer cancel() + + retryBudget := budget.Limited(int(float64(poolSize) * 0.1)) //nolint:mnd + + db, err := ydb.Open(ctx, + cfg.Endpoint+cfg.DB, + ydb.WithSessionPoolSizeLimit(poolSize), + ydb.WithRetryBudget(retryBudget), + ydb.WithInsecure(), + ydb.WithAnonymousCredentials(), + ydb.WithTLSSInsecureSkipVerify(), + ) + if err != nil { + return nil, err + } + + prefix := path.Join(db.Name(), label) + + s := &Storage{ + db: db, + cfg: cfg, + tablePath: path.Join(prefix, cfg.Table), + retryBudget: retryBudget, + } + + return s, nil +} + +func (s *Storage) WriteBatch(ctx context.Context, e []generator.Row) (attempts int, finalErr error) { + if err := ctx.Err(); err != nil { + return attempts, err + } + + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + rows := make([]types.Value, 0, len(e)) + + for _, row := range e { + rows = append(rows, types.StructValue( + types.StructFieldValue("id", types.Uint64Value(row.ID)), + types.StructFieldValue("payload_str", types.OptionalValue(types.TextValue(*row.PayloadStr))), + types.StructFieldValue("payload_double", types.OptionalValue(types.DoubleValue(*row.PayloadDouble))), + types.StructFieldValue( + "payload_timestamp", + types.OptionalValue(types.TimestampValue(uint64(row.PayloadTimestamp.UnixMicro()))), + ), + types.StructFieldValue("payload_hash", types.OptionalValue(types.Uint64Value(*row.PayloadHash))), + )) + } + + t := &trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + } + + err := s.db.Table().BulkUpsert( + ctx, + s.tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), + table.WithRetryOptions([]retry.Option{ //nolint:staticcheck + retry.WithTrace(t), + }), + table.WithIdempotent(), + table.WithLabel("WRITE"), + ) + + return attempts, err +} + +func (s *Storage) ReadBatch(ctx context.Context, rowIDs []generator.RowID) ( + _ []generator.Row, + attempts int, + finalErr error, +) { + if err := ctx.Err(); err != nil { + return []generator.Row{}, attempts, err + } + + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond) + defer cancel() + + t := &trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + } + + keys := make([]types.Value, 0, len(rowIDs)) + for _, rowID := range rowIDs { + key := types.StructValue( + types.StructFieldValue("id", types.Uint64Value(rowID)), + ) + keys = append(keys, key) + } + + res, err := s.db.Table().ReadRows(ctx, s.tablePath, types.ListValue(keys...), []options.ReadRowsOption{}, + table.WithRetryOptions([]retry.Option{ //nolint:staticcheck + retry.WithTrace(t), + }), + table.WithIdempotent(), + table.WithLabel("READ"), + ) + if err != nil { + return nil, attempts, err + } + defer func() { + _ = res.Close() + }() + + readRows := make([]generator.Row, 0, len(rowIDs)) + + for res.NextResultSet(ctx) { + if err = res.Err(); err != nil { + return nil, attempts, err + } + + if res.CurrentResultSet().Truncated() { + return nil, attempts, fmt.Errorf("read rows result set truncated") + } + + for res.NextRow() { + readRow := generator.Row{} + scans := []named.Value{ + named.Required("id", &readRow.ID), + named.Optional("payload_str", &readRow.PayloadStr), + named.Optional("payload_double", &readRow.PayloadDouble), + named.Optional("payload_timestamp", &readRow.PayloadTimestamp), + named.Optional("payload_hash", &readRow.PayloadHash), + } + + err = res.ScanNamed(scans...) + if err != nil { + return nil, attempts, err + } + + readRows = append(readRows, readRow) + } + } + + return readRows, attempts, nil +} + +func (s *Storage) CreateTable(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + return s.db.Query().Do(ctx, + func(ctx context.Context, session query.Session) error { + fmt.Println(fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount, s.cfg.PartitionSize, + s.cfg.MinPartitionsCount, s.cfg.MaxPartitionsCount, + )) + + return session.Exec(ctx, + fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount, s.cfg.PartitionSize, + s.cfg.MinPartitionsCount, s.cfg.MaxPartitionsCount, + ), + query.WithTxControl(query.EmptyTxControl()), + ) + }, query.WithIdempotent(), + query.WithLabel("CREATE TABLE"), + ) +} + +func (s *Storage) DropTable(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + return s.db.Query().Do(ctx, + func(ctx context.Context, session query.Session) error { + return session.Exec(ctx, + fmt.Sprintf(dropTableQuery, s.tablePath), + query.WithTxControl(query.EmptyTxControl()), + ) + }, + query.WithIdempotent(), + query.WithLabel("DROP TABLE"), + ) +} + +func (s *Storage) Close(ctx context.Context) error { + s.retryBudget.Stop() + + var ( + shutdownCtx context.Context + shutdownCancel context.CancelFunc + ) + if s.cfg.ShutdownTime > 0 { + shutdownCtx, shutdownCancel = context.WithTimeout(ctx, time.Duration(s.cfg.ShutdownTime)*time.Second) + } else { + shutdownCtx, shutdownCancel = context.WithCancel(ctx) + } + defer shutdownCancel() + + return s.db.Close(shutdownCtx) +} diff --git a/tests/slo/xorm/query/main.go b/tests/slo/xorm/query/main.go index 5ee32cf58..1985ee3c8 100644 --- a/tests/slo/xorm/query/main.go +++ b/tests/slo/xorm/query/main.go @@ -85,10 +85,7 @@ func main() { }) } - err = g.Wait() - if err != nil { - panic(err) - } + _ = g.Wait() log.Println("entries write ok") case config.CleanupMode: diff --git a/tests/slo/xorm/table/main.go b/tests/slo/xorm/table/main.go index ae7f64feb..23da2ad15 100644 --- a/tests/slo/xorm/table/main.go +++ b/tests/slo/xorm/table/main.go @@ -85,10 +85,7 @@ func main() { }) } - err = g.Wait() - if err != nil { - panic(err) - } + _ = g.Wait() log.Println("entries write ok") case config.CleanupMode: