44 "context"
55 "database/sql"
66 "fmt"
7+ "path"
78 "time"
89
910 env "github.com/ydb-platform/ydb-go-sdk-auth-environ"
@@ -104,6 +105,7 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (s *Stora
104105
105106 s .c , err = ydb .Connector (s .cc ,
106107 ydb .WithAutoDeclare (),
108+ ydb .WithTablePathPrefix (path .Join (s .cc .Name (), label )),
107109 )
108110 if err != nil {
109111 return nil , fmt .Errorf ("ydb.Connector error: %w" , err )
@@ -118,45 +120,84 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (s *Stora
118120 return s , nil
119121}
120122
121- func (s * Storage ) Read (ctx context.Context , entryID generator.RowID ) (res generator.Row , err error ) {
123+ func (s * Storage ) Read (ctx context.Context , entryID generator.RowID ) (res generator.Row , attempts int , err error ) {
122124 if err = ctx .Err (); err != nil {
123- return generator.Row {}, err
125+ return generator.Row {}, attempts , err
124126 }
125127
126128 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .ReadTimeout )* time .Millisecond )
127129 defer cancel ()
128130
129131 err = retry .Do (ydb .WithTxControl (ctx , readTx ), s .db ,
130132 func (ctx context.Context , cc * sql.Conn ) (err error ) {
133+ if err = ctx .Err (); err != nil {
134+ return err
135+ }
136+
131137 row := cc .QueryRowContext (ydb .WithQueryMode (ctx , ydb .DataQueryMode ), s .selectQuery ,
132138 sql .Named ("id" , & entryID ),
133139 )
140+
134141 var hash * uint64
142+
135143 return row .Scan (& res .ID , & res .PayloadStr , & res .PayloadDouble , & res .PayloadTimestamp , & hash )
136- }, retry .WithDoRetryOptions (retry .WithIdempotent (true )),
144+ },
145+ retry .WithDoRetryOptions (
146+ retry .WithIdempotent (true ),
147+ retry .WithTrace (
148+ trace.Retry {
149+ OnRetry : func (info trace.RetryLoopStartInfo ) func (trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
150+ return func (info trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
151+ return func (info trace.RetryLoopDoneInfo ) {
152+ attempts = info .Attempts
153+ }
154+ }
155+ },
156+ },
157+ ),
158+ ),
137159 )
138160
139- return res , err
161+ return res , attempts , err
140162}
141163
142- func (s * Storage ) Write (ctx context.Context , e generator.Row ) error {
143- if err : = ctx .Err (); err != nil {
144- return err
164+ func (s * Storage ) Write (ctx context.Context , e generator.Row ) ( attempts int , err error ) {
165+ if err = ctx .Err (); err != nil {
166+ return attempts , err
145167 }
146168
147169 ctx , cancel := context .WithTimeout (ctx , time .Duration (s .cfg .WriteTimeout )* time .Millisecond )
148170 defer cancel ()
149171
150- return retry .Do (ydb .WithTxControl (ctx , writeTx ), s .db ,
151- func (ctx context.Context , cc * sql.Conn ) error {
152- _ , err := cc .ExecContext (ydb .WithQueryMode (ctx , ydb .DataQueryMode ), s .upsertQuery ,
172+ return attempts , retry .Do (ydb .WithTxControl (ctx , writeTx ), s .db ,
173+ func (ctx context.Context , cc * sql.Conn ) (err error ) {
174+ if err = ctx .Err (); err != nil {
175+ return err
176+ }
177+
178+ _ , err = cc .ExecContext (ydb .WithQueryMode (ctx , ydb .DataQueryMode ), s .upsertQuery ,
153179 sql .Named ("id" , e .ID ),
154180 sql .Named ("payload_str" , * e .PayloadStr ),
155181 sql .Named ("payload_double" , * e .PayloadDouble ),
156182 sql .Named ("payload_timestamp" , * e .PayloadTimestamp ),
157183 )
184+
158185 return err
159- }, retry .WithDoRetryOptions (retry .WithIdempotent (true )),
186+ },
187+ retry .WithDoRetryOptions (
188+ retry .WithIdempotent (true ),
189+ retry .WithTrace (
190+ trace.Retry {
191+ OnRetry : func (info trace.RetryLoopStartInfo ) func (trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
192+ return func (info trace.RetryLoopIntermediateInfo ) func (trace.RetryLoopDoneInfo ) {
193+ return func (info trace.RetryLoopDoneInfo ) {
194+ attempts = info .Attempts
195+ }
196+ }
197+ },
198+ },
199+ ),
200+ ),
160201 )
161202}
162203
0 commit comments