@@ -22,17 +22,22 @@ import (
2222
2323const (
2424 upsertTemplate = `
25+ PRAGMA TablePathPrefix("%s");
26+
2527DECLARE $id AS Uint64;
2628DECLARE $payload_str AS Utf8;
2729DECLARE $payload_double AS Double;
2830DECLARE $payload_timestamp AS Timestamp;
31+
2932UPSERT INTO %s (
3033 id, hash, payload_str, payload_double, payload_timestamp
3134) VALUES (
3235 $id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp
3336);
3437`
3538 selectTemplate = `
39+ PRAGMA TablePathPrefix("%s");
40+
3641DECLARE $id AS Uint64;
3742SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
3843FROM %s WHERE id = $id AND hash = Digest::NumericHash($id);
5560type Storage struct {
5661 db * ydb.Driver
5762 cfg * config.Config
63+ prefix string
5864 upsertQuery string
5965 selectQuery string
6066}
@@ -63,15 +69,9 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage
6369 ctx , cancel := context .WithTimeout (ctx , time .Minute * 5 )
6470 defer cancel ()
6571
66- s := & Storage {
67- cfg : cfg ,
68- upsertQuery : fmt .Sprintf (upsertTemplate , cfg .Table ),
69- selectQuery : fmt .Sprintf (selectTemplate , cfg .Table ),
70- }
71- var err error
72- s .db , err = ydb .Open (
72+ db , err := ydb .Open (
7373 ctx ,
74- s . cfg .Endpoint + s . cfg .DB ,
74+ cfg .Endpoint + cfg .DB ,
7575 env .WithEnvironCredentials (ctx ),
7676 ydbZap .WithTraces (
7777 logger ,
@@ -83,6 +83,16 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage
8383 return nil , err
8484 }
8585
86+ prefix := path .Join (db .Name (), label )
87+
88+ s := & Storage {
89+ db : db ,
90+ cfg : cfg ,
91+ prefix : prefix ,
92+ upsertQuery : fmt .Sprintf (upsertTemplate , prefix , cfg .Table ),
93+ selectQuery : fmt .Sprintf (selectTemplate , prefix , cfg .Table ),
94+ }
95+
8696 return s , nil
8797}
8898
@@ -201,7 +211,7 @@ func (s *Storage) createTable(ctx context.Context) error {
201211
202212 return s .db .Table ().Do (ctx ,
203213 func (ctx context.Context , session table.Session ) error {
204- return session .CreateTable (ctx , path .Join (s .db . Name () , s .cfg .Table ),
214+ return session .CreateTable (ctx , path .Join (s .prefix , s .cfg .Table ),
205215 options .WithColumn ("hash" , types .Optional (types .TypeUint64 )),
206216 options .WithColumn ("id" , types .Optional (types .TypeUint64 )),
207217 options .WithColumn ("payload_str" , types .Optional (types .TypeUTF8 )),
@@ -234,7 +244,7 @@ func (s *Storage) dropTable(ctx context.Context) error {
234244
235245 return s .db .Table ().Do (ctx ,
236246 func (ctx context.Context , session table.Session ) (err error ) {
237- return session .DropTable (ctx , path .Join (s .db . Name () , s .cfg .Table ))
247+ return session .DropTable (ctx , path .Join (s .prefix , s .cfg .Table ))
238248 },
239249 table .WithIdempotent (),
240250 )
0 commit comments