@@ -10,7 +10,6 @@ import (
1010 ydb "github.com/ydb-platform/ydb-go-sdk/v3"
1111 "github.com/ydb-platform/ydb-go-sdk/v3/retry"
1212 "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
13- "github.com/ydb-platform/ydb-go-sdk/v3/table"
1413 "github.com/ydb-platform/ydb-go-sdk/v3/trace"
1514
1615 "slo/internal/config"
@@ -19,50 +18,40 @@ import (
1918
2019const (
2120 createTemplate = `
22- CREATE TABLE ` + "`%s`" + ` (
23- hash Uint64,
24- id Uint64,
25- payload_str Utf8,
26- payload_double Double,
27- payload_timestamp Timestamp,
28- payload_hash Uint64,
29- PRIMARY KEY (
30- hash,
31- id
32- )
33- ) WITH (
34- AUTO_PARTITIONING_BY_SIZE = ENABLED,
35- AUTO_PARTITIONING_BY_LOAD = ENABLED,
36- AUTO_PARTITIONING_PARTITION_SIZE_MB = %d,
37- AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d,
38- AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d,
39- UNIFORM_PARTITIONS = %d
40- );`
41- dropTemplate = `DROP TABLE ` + "`%s`" + `;`
21+ CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
22+ hash Uint64,
23+ id Uint64,
24+ payload_str Utf8,
25+ payload_double Double,
26+ payload_timestamp Timestamp,
27+ payload_hash Uint64,
28+ PRIMARY KEY (
29+ hash,
30+ id
31+ )
32+ ) WITH (
33+ AUTO_PARTITIONING_BY_SIZE = ENABLED,
34+ AUTO_PARTITIONING_BY_LOAD = ENABLED,
35+ AUTO_PARTITIONING_PARTITION_SIZE_MB = %d,
36+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d,
37+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d,
38+ UNIFORM_PARTITIONS = %d
39+ );
40+ `
41+ dropTemplate = `
42+ DROP TABLE IF EXISTS ` + "`%s`" + `;
43+ `
4244 upsertTemplate = `
43- UPSERT INTO ` + "`%s`" + ` (
44- id, hash, payload_str, payload_double, payload_timestamp
45- ) VALUES (
46- $id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp
47- );
48- `
45+ UPSERT INTO ` + "`%s`" + ` (
46+ id, hash, payload_str, payload_double, payload_timestamp
47+ ) VALUES (
48+ $id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp
49+ );
50+ `
4951 selectTemplate = `
50- SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
51- FROM ` + "`%s`" + ` WHERE id = $id AND hash = Digest::NumericHash($id);
52- `
53- )
54-
55- var (
56- readTx = table .TxControl (
57- table .BeginTx (
58- table .WithOnlineReadOnly (),
59- ),
60- table .CommitTx (),
61- )
62-
63- writeTx = table .SerializableReadWriteTxControl (
64- table .CommitTx (),
65- )
52+ SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
53+ FROM ` + "`%s`" + ` WHERE id = $id AND hash = Digest::NumericHash($id);
54+ `
6655)
6756
6857type Storage struct {
@@ -132,13 +121,13 @@ func (s *Storage) Read(ctx context.Context, entryID generator.RowID) (res genera
132121 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .ReadTimeout )* time .Millisecond )
133122 defer cancel ()
134123
135- err = retry .Do (ydb . WithTxControl ( ctx , readTx ) , s .db ,
124+ err = retry .Do (ctx , s .db ,
136125 func (ctx context.Context , cc * sql.Conn ) (err error ) {
137126 if err = ctx .Err (); err != nil {
138127 return err
139128 }
140129
141- row := cc .QueryRowContext (ydb . WithQueryMode ( ctx , ydb . DataQueryMode ) , s .selectQuery ,
130+ row := cc .QueryRowContext (ctx , s .selectQuery ,
142131 sql .Named ("id" , & entryID ),
143132 )
144133
@@ -169,13 +158,13 @@ func (s *Storage) Write(ctx context.Context, e generator.Row) (attempts int, err
169158 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .WriteTimeout )* time .Millisecond )
170159 defer cancel ()
171160
172- err = retry .Do (ydb . WithTxControl ( ctx , writeTx ) , s .db ,
161+ err = retry .Do (ctx , s .db ,
173162 func (ctx context.Context , cc * sql.Conn ) (err error ) {
174163 if err = ctx .Err (); err != nil {
175164 return err
176165 }
177166
178- _ , err = cc .ExecContext (ydb . WithQueryMode ( ctx , ydb . DataQueryMode ) , s .upsertQuery ,
167+ _ , err = cc .ExecContext (ctx , s .upsertQuery ,
179168 sql .Named ("id" , e .ID ),
180169 sql .Named ("payload_str" , * e .PayloadStr ),
181170 sql .Named ("payload_double" , * e .PayloadDouble ),
@@ -207,9 +196,9 @@ func (s *Storage) createTable(ctx context.Context) error {
207196 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .WriteTimeout )* time .Millisecond )
208197 defer cancel ()
209198
210- return retry .Do (ydb . WithTxControl ( ctx , writeTx ) , s .db ,
199+ return retry .Do (ctx , s .db ,
211200 func (ctx context.Context , cc * sql.Conn ) error {
212- _ , err := s .db .ExecContext (ydb . WithQueryMode ( ctx , ydb . SchemeQueryMode ) , s .createQuery )
201+ _ , err := s .db .ExecContext (ctx , s .createQuery )
213202
214203 return err
215204 }, retry .WithIdempotent (true ),
@@ -224,9 +213,9 @@ func (s *Storage) dropTable(ctx context.Context) error {
224213 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .WriteTimeout )* time .Millisecond )
225214 defer cancel ()
226215
227- return retry .Do (ydb . WithTxControl ( ctx , writeTx ) , s .db ,
216+ return retry .Do (ctx , s .db ,
228217 func (ctx context.Context , cc * sql.Conn ) error {
229- _ , err := s .db .ExecContext (ydb . WithQueryMode ( ctx , ydb . SchemeQueryMode ) , s .dropQuery )
218+ _ , err := s .db .ExecContext (ctx , s .dropQuery )
230219
231220 return err
232221 }, retry .WithIdempotent (true ),
0 commit comments