66 "fmt"
77 "io"
88 "path"
9- "strconv"
109 "time"
1110
1211 "github.com/ydb-platform/ydb-go-sdk/v3"
@@ -17,50 +16,10 @@ import (
1716 "slo/internal/generator"
1817)
1918
20- //nolint:goconst
21- const (
22- upsertTemplate = `
23- PRAGMA TablePathPrefix("%s");
24-
25- DECLARE $id AS Uint64;
26- DECLARE $payload_str AS Utf8;
27- DECLARE $payload_double AS Double;
28- DECLARE $payload_timestamp AS Timestamp;
29-
30- UPSERT INTO ` + "`%s`" + ` (
31- id, hash, payload_str, payload_double, payload_timestamp
32- ) VALUES (
33- $id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp
34- );
35- `
36- selectTemplate = `
37- PRAGMA TablePathPrefix("%s");
38-
39- DECLARE $id AS Uint64;
40- SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
41- FROM ` + "`%s`" + ` WHERE id = $id AND hash = Digest::NumericHash($id);
42- `
43- )
44-
45- var (
46- readTx = query .TxControl (
47- query .BeginTx (
48- query .WithOnlineReadOnly (),
49- ),
50- query .CommitTx (),
51- )
52-
53- writeTx = query .SerializableReadWriteTxControl (
54- query .CommitTx (),
55- )
56- )
57-
5819type Storage struct {
59- db * ydb.Driver
60- cfg * config.Config
61- prefix string
62- upsertQuery string
63- selectQuery string
20+ db * ydb.Driver
21+ cfg * config.Config
22+ tablePath string
6423}
6524
6625func NewStorage (ctx context.Context , cfg * config.Config , poolSize int ) (* Storage , error ) {
@@ -79,11 +38,9 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage
7938 prefix := path .Join (db .Name (), label )
8039
8140 s := & Storage {
82- db : db ,
83- cfg : cfg ,
84- prefix : prefix ,
85- upsertQuery : fmt .Sprintf (upsertTemplate , prefix , cfg .Table ),
86- selectQuery : fmt .Sprintf (selectTemplate , prefix , cfg .Table ),
41+ db : db ,
42+ cfg : cfg ,
43+ tablePath : "`" + path .Join (prefix , cfg .Table ) + "`" ,
8744 }
8845
8946 return s , nil
@@ -105,13 +62,28 @@ func (s *Storage) Read(ctx context.Context, entryID generator.RowID) (_ generato
10562 return err
10663 }
10764
108- _ , res , err := session .Execute (ctx , s .selectQuery ,
65+ _ , res , err := session .Execute (ctx ,
66+ fmt .Sprintf (`
67+ DECLARE $id AS Uint64;
68+ DECLARE $payload_str AS Utf8;
69+ DECLARE $payload_double AS Double;
70+ DECLARE $payload_timestamp AS Timestamp;
71+
72+ UPSERT INTO %s (
73+ id, hash, payload_str, payload_double, payload_timestamp
74+ ) VALUES (
75+ $id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp
76+ );
77+ ` , s .tablePath ),
10978 query .WithParameters (
11079 ydb .ParamsBuilder ().
11180 Param ("$id" ).Uint64 (entryID ).
11281 Build (),
11382 ),
114- query .WithTxControl (readTx ),
83+ query .WithTxControl (query .TxControl (
84+ query .BeginTx (query .WithOnlineReadOnly ()),
85+ query .CommitTx (),
86+ )),
11587 )
11688 if err != nil {
11789 return err
@@ -174,7 +146,12 @@ func (s *Storage) Write(ctx context.Context, e generator.Row) (attempts int, _ e
174146 return err
175147 }
176148
177- _ , res , err := session .Execute (ctx , s .upsertQuery ,
149+ _ , res , err := session .Execute (ctx ,
150+ fmt .Sprintf (`
151+ DECLARE $id AS Uint64;
152+ SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
153+ FROM %s WHERE id = $id AND hash = Digest::NumericHash($id);
154+ ` , s .tablePath ),
178155 query .WithParameters (
179156 ydb .ParamsBuilder ().
180157 Param ("$id" ).Uint64 (e .ID ).
@@ -183,7 +160,6 @@ func (s *Storage) Write(ctx context.Context, e generator.Row) (attempts int, _ e
183160 Param ("$payload_timestamp" ).Timestamp (* e .PayloadTimestamp ).
184161 Build (),
185162 ),
186- query .WithTxControl (writeTx ),
187163 )
188164 if err != nil {
189165 return err
@@ -216,8 +192,9 @@ func (s *Storage) createTable(ctx context.Context) error {
216192 defer cancel ()
217193
218194 return s .db .Query ().Do (ctx , func (ctx context.Context , session query.Session ) error {
219- _ , _ , err := session .Execute (ctx , `
220- CREATE TABLE ` + "`" + path .Join (s .prefix , s .cfg .Table )+ "`" + ` (
195+ _ , _ , err := session .Execute (ctx ,
196+ fmt .Sprintf (`
197+ CREATE TABLE %s (
221198 hash Uint64?,
222199 id Uint64?,
223200 payload_str Text?,
@@ -226,13 +203,15 @@ func (s *Storage) createTable(ctx context.Context) error {
226203 payload_hash Uint64?,
227204 PRIMARY KEY (hash, id)
228205 ) WITH (
229- UNIFORM_PARTITIONS = ` + strconv . FormatUint ( s . cfg . MinPartitionsCount , 10 ) + ` ,
206+ UNIFORM_PARTITIONS = %d ,
230207 AUTO_PARTITIONING_BY_SIZE = ENABLED,
231- AUTO_PARTITIONING_PARTITION_SIZE_MB = ` + strconv .FormatUint (s .cfg .PartitionSize , 10 )+ `,
232- AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = ` + strconv .FormatUint (s .cfg .MinPartitionsCount , 10 )+ `,
233- AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = ` + strconv .FormatUint (s .cfg .MaxPartitionsCount , 10 )+ `
234- )
235- ` , query .WithTxControl (query .NoTx ()))
208+ AUTO_PARTITIONING_PARTITION_SIZE_MB = %d,
209+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d,
210+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d
211+ )` , s .tablePath , s .cfg .MinPartitionsCount , s .cfg .PartitionSize ,
212+ s .cfg .MinPartitionsCount , s .cfg .MaxPartitionsCount ,
213+ ),
214+ query .WithTxControl (query .NoTx ()))
236215
237216 return err
238217 }, query .WithIdempotent ())
@@ -250,7 +229,7 @@ func (s *Storage) dropTable(ctx context.Context) error {
250229 return s .db .Query ().Do (ctx ,
251230 func (ctx context.Context , session query.Session ) error {
252231 _ , _ , err := session .Execute (ctx ,
253- fmt .Sprintf (" DROP TABLE ` %s`" , path . Join ( s . prefix , s . cfg . Table ) ),
232+ fmt .Sprintf (` DROP TABLE %s`, s . tablePath ),
254233 query .WithTxControl (query .NoTx ()),
255234 )
256235
0 commit comments