Skip to content

Commit be6d82d

Browse files
committed
added SLO workloads for legacy and propose database/sql
1 parent 760866a commit be6d82d

File tree

13 files changed

+1168
-13
lines changed

13 files changed

+1168
-13
lines changed

.github/workflows/slo.yml

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ jobs:
3232
strategy:
3333
matrix:
3434
sdk:
35-
- id: database_sql
36-
name: database-sql
37-
path: ./database/sql
38-
label: database/sql
35+
- id: database_sql_legacy
36+
name: database-sql-legacy
37+
path: ./database/sql/legacy
38+
label: database/sql/legacy
39+
- id: database_sql_propose
40+
name: database-sql-propose
41+
path: ./database/sql/propose
42+
label: database/sql/propose
3943
- id: native_query
4044
name: native-query
4145
path: ./native/query
@@ -44,14 +48,22 @@ jobs:
4448
name: native-table
4549
path: ./native/table
4650
label: native/table
47-
- id: gorm
48-
name: gorm
49-
path: ./gorm
50-
label: gorm
51-
- id: xorm
52-
name: xorm
53-
path: ./xorm
54-
label: xorm
51+
- id: gorm_legacy
52+
name: gorm-legacy
53+
path: ./gorm/legacy
54+
label: gorm/legacy
55+
- id: gorm_propose
56+
name: gorm-propose
57+
path: ./gorm/propose
58+
label: gorm/propose
59+
- id: xorm_legacy
60+
name: xorm-legacy
61+
path: ./xorm/legacy
62+
label: xorm/legacy
63+
- id: xorm_propose
64+
name: xorm-propose
65+
path: ./xorm/propose
66+
label: xorm/propose
5567

5668
concurrency:
5769
group: slo-${{ github.ref }}-${{ matrix.sdk.name }}
File renamed without changes.
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"path"
8+
"time"
9+
10+
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
15+
16+
"slo/internal/config"
17+
"slo/internal/generator"
18+
)
19+
20+
const (
21+
createTemplate = `
22+
CREATE TABLE ` + "`%s`" + ` (
23+
hash Uint64,
24+
id Uint64,
25+
payload_str Utf8,
26+
payload_double Double,
27+
payload_timestamp Timestamp,
28+
payload_hash Uint64,
29+
PRIMARY KEY (
30+
hash,
31+
id
32+
)
33+
) WITH (
34+
AUTO_PARTITIONING_BY_SIZE = ENABLED,
35+
AUTO_PARTITIONING_BY_LOAD = ENABLED,
36+
AUTO_PARTITIONING_PARTITION_SIZE_MB = %d,
37+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d,
38+
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d,
39+
UNIFORM_PARTITIONS = %d
40+
);`
41+
dropTemplate = `DROP TABLE ` + "`%s`" + `;`
42+
upsertTemplate = `
43+
UPSERT INTO ` + "`%s`" + ` (
44+
id, hash, payload_str, payload_double, payload_timestamp
45+
) VALUES (
46+
$id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp
47+
);
48+
`
49+
selectTemplate = `
50+
SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
51+
FROM ` + "`%s`" + ` WHERE id = $id AND hash = Digest::NumericHash($id);
52+
`
53+
)
54+
55+
var (
56+
readTx = table.TxControl(
57+
table.BeginTx(
58+
table.WithOnlineReadOnly(),
59+
),
60+
table.CommitTx(),
61+
)
62+
63+
writeTx = table.SerializableReadWriteTxControl(
64+
table.CommitTx(),
65+
)
66+
)
67+
68+
type Storage struct {
69+
cc *ydb.Driver
70+
c ydb.SQLConnector
71+
db *sql.DB
72+
cfg *config.Config
73+
createQuery string
74+
dropQuery string
75+
upsertQuery string
76+
selectQuery string
77+
retryBudget interface {
78+
budget.Budget
79+
80+
Stop()
81+
}
82+
}
83+
84+
func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (s *Storage, err error) {
85+
ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:gomnd
86+
defer cancel()
87+
88+
retryBudget := budget.Limited(int(float64(poolSize) * 0.1)) //nolint:gomnd
89+
90+
s = &Storage{
91+
cfg: cfg,
92+
createQuery: fmt.Sprintf(createTemplate, cfg.Table,
93+
cfg.PartitionSize, cfg.MinPartitionsCount, cfg.MaxPartitionsCount, cfg.MinPartitionsCount),
94+
dropQuery: fmt.Sprintf(dropTemplate, cfg.Table),
95+
upsertQuery: fmt.Sprintf(upsertTemplate, cfg.Table),
96+
selectQuery: fmt.Sprintf(selectTemplate, cfg.Table),
97+
retryBudget: retryBudget,
98+
}
99+
100+
s.cc, err = ydb.Open(
101+
ctx,
102+
s.cfg.Endpoint+s.cfg.DB,
103+
ydb.WithRetryBudget(retryBudget),
104+
)
105+
if err != nil {
106+
return nil, fmt.Errorf("ydb.Open error: %w", err)
107+
}
108+
109+
s.c, err = ydb.Connector(s.cc,
110+
ydb.WithAutoDeclare(),
111+
ydb.WithTablePathPrefix(path.Join(s.cc.Name(), label)),
112+
ydb.WithQueryService(false),
113+
)
114+
if err != nil {
115+
return nil, fmt.Errorf("ydb.Connector error: %w", err)
116+
}
117+
118+
s.db = sql.OpenDB(s.c)
119+
120+
s.db.SetMaxOpenConns(poolSize)
121+
s.db.SetMaxIdleConns(poolSize)
122+
s.db.SetConnMaxIdleTime(time.Second)
123+
124+
return s, nil
125+
}
126+
127+
func (s *Storage) Read(ctx context.Context, entryID generator.RowID) (res generator.Row, attempts int, err error) {
128+
if err = ctx.Err(); err != nil {
129+
return generator.Row{}, attempts, err
130+
}
131+
132+
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond)
133+
defer cancel()
134+
135+
err = retry.Do(ydb.WithTxControl(ctx, readTx), s.db,
136+
func(ctx context.Context, cc *sql.Conn) (err error) {
137+
if err = ctx.Err(); err != nil {
138+
return err
139+
}
140+
141+
row := cc.QueryRowContext(ydb.WithQueryMode(ctx, ydb.DataQueryMode), s.selectQuery,
142+
sql.Named("id", &entryID),
143+
)
144+
145+
var hash *uint64
146+
147+
return row.Scan(&res.ID, &res.PayloadStr, &res.PayloadDouble, &res.PayloadTimestamp, &hash)
148+
},
149+
retry.WithIdempotent(true),
150+
retry.WithTrace(
151+
&trace.Retry{
152+
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
153+
return func(info trace.RetryLoopDoneInfo) {
154+
attempts = info.Attempts
155+
}
156+
},
157+
},
158+
),
159+
)
160+
161+
return res, attempts, err
162+
}
163+
164+
func (s *Storage) Write(ctx context.Context, e generator.Row) (attempts int, err error) {
165+
if err = ctx.Err(); err != nil {
166+
return attempts, err
167+
}
168+
169+
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond)
170+
defer cancel()
171+
172+
err = retry.Do(ydb.WithTxControl(ctx, writeTx), s.db,
173+
func(ctx context.Context, cc *sql.Conn) (err error) {
174+
if err = ctx.Err(); err != nil {
175+
return err
176+
}
177+
178+
_, err = cc.ExecContext(ydb.WithQueryMode(ctx, ydb.DataQueryMode), s.upsertQuery,
179+
sql.Named("id", e.ID),
180+
sql.Named("payload_str", *e.PayloadStr),
181+
sql.Named("payload_double", *e.PayloadDouble),
182+
sql.Named("payload_timestamp", *e.PayloadTimestamp),
183+
)
184+
185+
return err
186+
},
187+
retry.WithIdempotent(true),
188+
retry.WithTrace(
189+
&trace.Retry{
190+
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
191+
return func(info trace.RetryLoopDoneInfo) {
192+
attempts = info.Attempts
193+
}
194+
},
195+
},
196+
),
197+
)
198+
199+
return attempts, err
200+
}
201+
202+
func (s *Storage) createTable(ctx context.Context) error {
203+
if err := ctx.Err(); err != nil {
204+
return err
205+
}
206+
207+
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond)
208+
defer cancel()
209+
210+
return retry.Do(ydb.WithTxControl(ctx, writeTx), s.db,
211+
func(ctx context.Context, cc *sql.Conn) error {
212+
_, err := s.db.ExecContext(ydb.WithQueryMode(ctx, ydb.SchemeQueryMode), s.createQuery)
213+
214+
return err
215+
}, retry.WithIdempotent(true),
216+
)
217+
}
218+
219+
func (s *Storage) dropTable(ctx context.Context) error {
220+
if err := ctx.Err(); err != nil {
221+
return err
222+
}
223+
224+
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond)
225+
defer cancel()
226+
227+
return retry.Do(ydb.WithTxControl(ctx, writeTx), s.db,
228+
func(ctx context.Context, cc *sql.Conn) error {
229+
_, err := s.db.ExecContext(ydb.WithQueryMode(ctx, ydb.SchemeQueryMode), s.dropQuery)
230+
231+
return err
232+
}, retry.WithIdempotent(true),
233+
)
234+
}
235+
236+
func (s *Storage) close(ctx context.Context) error {
237+
s.retryBudget.Stop()
238+
239+
if err := ctx.Err(); err != nil {
240+
return err
241+
}
242+
243+
if err := s.db.Close(); err != nil {
244+
return fmt.Errorf("error close database/sql driver: %w", err)
245+
}
246+
247+
if err := s.c.Close(); err != nil {
248+
return fmt.Errorf("error close connector: %w", err)
249+
}
250+
251+
if err := s.cc.Close(ctx); err != nil {
252+
return fmt.Errorf("error close ydb driver: %w", err)
253+
}
254+
255+
return nil
256+
}

tests/slo/xorm/main.go renamed to tests/slo/database/sql/propose/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func main() {
8989
case config.CleanupMode:
9090
err = s.dropTable(ctx)
9191
if err != nil {
92-
panic(fmt.Errorf("create table failed: %w", err))
92+
panic(fmt.Errorf("drop table failed: %w", err))
9393
}
9494

9595
log.Println("cleanup table ok")

tests/slo/database/sql/storage.go renamed to tests/slo/database/sql/propose/storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (s *Stora
109109
s.c, err = ydb.Connector(s.cc,
110110
ydb.WithAutoDeclare(),
111111
ydb.WithTablePathPrefix(path.Join(s.cc.Name(), label)),
112+
ydb.WithQueryService(true),
112113
)
113114
if err != nil {
114115
return nil, fmt.Errorf("ydb.Connector error: %w", err)

0 commit comments

Comments
 (0)