@@ -2,13 +2,16 @@ package main
22
33import (
44 "context"
5+ "database/sql"
56 "fmt"
67 "time"
78
89 ydb "github.com/ydb-platform/gorm-driver"
910 environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
1011 ydbZap "github.com/ydb-platform/ydb-go-sdk-zap"
1112 ydbSDK "github.com/ydb-platform/ydb-go-sdk/v3"
13+ "github.com/ydb-platform/ydb-go-sdk/v3/retry"
14+ "github.com/ydb-platform/ydb-go-sdk/v3/table"
1215 "github.com/ydb-platform/ydb-go-sdk/v3/trace"
1316 "gorm.io/gorm"
1417 "gorm.io/gorm/clause"
@@ -28,6 +31,19 @@ WITH (
2831 UNIFORM_PARTITIONS = %d
2932)`
3033
34+ var (
35+ readTx = table .TxControl (
36+ table .BeginTx (
37+ table .WithOnlineReadOnly (),
38+ ),
39+ table .CommitTx (),
40+ )
41+
42+ writeTx = table .SerializableReadWriteTxControl (
43+ table .CommitTx (),
44+ )
45+ )
46+
3147type entry struct {
3248 Hash uint64 `gorm:"column:hash;primarykey;autoIncrement:false"`
3349
@@ -73,53 +89,108 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage
7389 return nil , err
7490 }
7591
76- s .db = s .db .Debug ()
77-
7892 return s , nil
7993}
8094
81- func (s * Storage ) Read (ctx context.Context , id generator.RowID ) (generator.Row , error ) {
82- if err : = ctx .Err (); err != nil {
83- return generator.Row {}, err
95+ func (s * Storage ) Read (ctx context.Context , id generator.RowID ) (r generator.Row , attempts int , err error ) {
96+ if err = ctx .Err (); err != nil {
97+ return generator.Row {}, attempts , err
8498 }
8599
86100 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .ReadTimeout )* time .Millisecond )
87101 defer cancel ()
88102
89- var e entry
90- err := s .db .WithContext (ctx ).Scopes (addTableToScope (s .cfg .Table )).Model (& entry {}).
91- First (& e , "hash = ? AND id = ?" ,
92- clause.Expr {
93- SQL : "Digest::NumericHash(?)" ,
94- Vars : []interface {}{id },
95- },
96- id ,
97- ).Error
103+ db , err := s .db .DB ()
98104 if err != nil {
99- return generator.Row {}, err
105+ return generator.Row {}, attempts , err
100106 }
101107
102- return e .Row , err
108+ return r , attempts , retry .Do (ydbSDK .WithTxControl (ctx , readTx ), db ,
109+ func (ctx context.Context , cc * sql.Conn ) (err error ) {
110+ if err = ctx .Err (); err != nil {
111+ return err
112+ }
113+
114+ var e entry
115+ err = s .db .WithContext (ctx ).Scopes (addTableToScope (s .cfg .Table )).Model (& entry {}).
116+ First (& e , "hash = ? AND id = ?" ,
117+ clause.Expr {
118+ SQL : "Digest::NumericHash(?)" ,
119+ Vars : []interface {}{id },
120+ },
121+ id ,
122+ ).Error
123+ if err != nil {
124+ return err
125+ }
126+
127+ r = e .Row
128+
129+ return nil
130+ },
131+ retry .WithDoRetryOptions (
132+ retry .WithIdempotent (true ),
133+ retry .WithTrace (
134+ trace.Retry {
135+ OnRetry : func (info trace.RetryLoopStartInfo ) func (trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
136+ return func (info trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
137+ return func (info trace.RetryLoopDoneInfo ) {
138+ attempts = info .Attempts
139+ }
140+ }
141+ },
142+ },
143+ ),
144+ ),
145+ )
103146}
104147
105- func (s * Storage ) Write (ctx context.Context , row generator.Row ) error {
106- if err : = ctx .Err (); err != nil {
107- return err
148+ func (s * Storage ) Write (ctx context.Context , row generator.Row ) ( attempts int , err error ) {
149+ if err = ctx .Err (); err != nil {
150+ return attempts , err
108151 }
109152
110153 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .WriteTimeout )* time .Millisecond )
111154 defer cancel ()
112155
113- return s .db .WithContext (ctx ).Scopes (addTableToScope (s .cfg .Table )).Model (& entry {}).Create (map [string ]interface {}{
114- "Hash" : clause.Expr {
115- SQL : "Digest::NumericHash(?)" ,
116- Vars : []interface {}{row .ID },
156+ db , err := s .db .DB ()
157+ if err != nil {
158+ return attempts , err
159+ }
160+
161+ return attempts , retry .Do (ydbSDK .WithTxControl (ctx , writeTx ), db ,
162+ func (ctx context.Context , cc * sql.Conn ) (err error ) {
163+ if err = ctx .Err (); err != nil {
164+ return err
165+ }
166+
167+ return s .db .WithContext (ctx ).Scopes (addTableToScope (s .cfg .Table )).Model (& entry {}).
168+ Create (map [string ]interface {}{
169+ "Hash" : clause.Expr {
170+ SQL : "Digest::NumericHash(?)" ,
171+ Vars : []interface {}{row .ID },
172+ },
173+ "ID" : row .ID ,
174+ "PayloadStr" : row .PayloadStr ,
175+ "PayloadDouble" : row .PayloadDouble ,
176+ "PayloadTimestamp" : row .PayloadTimestamp ,
177+ }).Error
117178 },
118- "ID" : row .ID ,
119- "PayloadStr" : row .PayloadStr ,
120- "PayloadDouble" : row .PayloadDouble ,
121- "PayloadTimestamp" : row .PayloadTimestamp ,
122- }).Error
179+ retry .WithDoRetryOptions (
180+ retry .WithIdempotent (true ),
181+ retry .WithTrace (
182+ trace.Retry {
183+ OnRetry : func (info trace.RetryLoopStartInfo ) func (trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
184+ return func (info trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
185+ return func (info trace.RetryLoopDoneInfo ) {
186+ attempts = info .Attempts
187+ }
188+ }
189+ },
190+ },
191+ ),
192+ ),
193+ )
123194}
124195
125196func (s * Storage ) createTable (ctx context.Context ) error {
0 commit comments