Skip to content

Commit f05580a

Browse files
committed
feat: add modular SQLC tools for DB definitions
1 parent 734312a commit f05580a

File tree

14 files changed

+589
-1
lines changed

14 files changed

+589
-1
lines changed

rolling-shutter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ require (
4444
golang.org/x/crypto v0.12.0
4545
golang.org/x/sync v0.3.0
4646
google.golang.org/protobuf v1.30.0
47+
gopkg.in/yaml.v3 v3.0.1
4748
gotest.tools v2.2.0+incompatible
4849
gotest.tools/v3 v3.2.0
4950
)
@@ -226,7 +227,6 @@ require (
226227
gopkg.in/ini.v1 v1.67.0 // indirect
227228
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
228229
gopkg.in/yaml.v2 v2.4.0 // indirect
229-
gopkg.in/yaml.v3 v3.0.1 // indirect
230230
lukechampine.com/blake3 v1.2.1 // indirect
231231
)
232232

rolling-shutter/medley/db/connect.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package db
2+
3+
import (
4+
"context"
5+
6+
"github.com/jackc/pgx/v4"
7+
"github.com/jackc/pgx/v4/pgxpool"
8+
"github.com/pkg/errors"
9+
"github.com/rs/zerolog/log"
10+
11+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/shdb"
13+
)
14+
15+
func ValidateDBVersion(ctx context.Context, dbpool *pgxpool.Pool, role string) error {
16+
err := dbpool.BeginFunc(
17+
ctx,
18+
func(tx pgx.Tx) error {
19+
return ValidateDatabaseVersion(ctx, tx, role)
20+
},
21+
)
22+
if err != nil {
23+
return errors.Wrap(err, "database is used for a different role already, preventing overwrite")
24+
}
25+
return nil
26+
}
27+
28+
// Connect to the database `url` from within a runner.Start() method
29+
// and create the pgxpool.Pool.
30+
func Connect(ctx context.Context, runner service.Runner, url, version string) (*pgxpool.Pool, error) {
31+
dbpool, err := pgxpool.Connect(ctx, url)
32+
if err != nil {
33+
return nil, err
34+
}
35+
runner.Defer(dbpool.Close)
36+
37+
err = ValidateDBVersion(ctx, dbpool, version)
38+
if err != nil {
39+
return nil, err
40+
}
41+
shdb.AddConnectionInfo(log.Info(), dbpool).Msg("connected to database")
42+
return dbpool, nil
43+
}

rolling-shutter/medley/db/db.sqlc.gen.go

Lines changed: 32 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package db
2+
3+
import (
4+
"embed"
5+
6+
"github.com/rs/zerolog/log"
7+
)
8+
9+
//go:generate sqlc generate --file sql/sqlc.yaml
10+
11+
//go:embed sql
12+
var files embed.FS
13+
var metaDefinition Definition
14+
15+
func init() {
16+
var err error
17+
metaDefinition, err = NewSQLCDefinition(files, "sql/", "meta", 1)
18+
if err != nil {
19+
log.Fatal().Err(err).Msg("failed to initialize DB metadata")
20+
}
21+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package db
2+
3+
import (
4+
"context"
5+
6+
"github.com/jackc/pgx/v4"
7+
"github.com/jackc/pgx/v4/pgxpool"
8+
"github.com/pkg/errors"
9+
"github.com/rs/zerolog/log"
10+
11+
"github.com/shutter-network/rolling-shutter/rolling-shutter/shdb"
12+
)
13+
14+
// InitDB initializes an empty database with the all schema definitions as specified from
15+
// the passed in `Definition`'s Create() and Init() actions.
16+
// Additionally, a `role` will be written in the database's meta key/value store, that
17+
// pins the database to a specific role, e.g. "keyper-test" or "snapshot-keyper-production"
18+
// in order to prevent later usage of the database with commands that fulfill a different role.
19+
func InitDB(ctx context.Context, dbpool *pgxpool.Pool, role string, definition Definition) error {
20+
err := dbpool.BeginFunc(WrapContext(ctx, definition.Validate))
21+
if err == nil {
22+
shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database already exists")
23+
return nil
24+
} else if errors.Is(err, ErrValueMismatch) {
25+
return err
26+
}
27+
28+
err = dbpool.BeginFunc(WrapContext(ctx, definition.Create))
29+
if err != nil {
30+
return err
31+
}
32+
33+
err = dbpool.BeginFunc(WrapContext(ctx, definition.Init))
34+
if err != nil {
35+
return err
36+
}
37+
38+
// For the outer DB initialisation, also set the database version to
39+
// the overall "role", so that e.g. a snapshot keyper database won't be
40+
// used by another keyper implementation, no matter if the schemas
41+
// are compatible
42+
err = dbpool.BeginFunc(
43+
ctx,
44+
func(tx pgx.Tx) error {
45+
return InsertDBVersion(ctx, tx, role)
46+
},
47+
)
48+
if err != nil {
49+
return err
50+
}
51+
shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database initialized")
52+
return nil
53+
}
54+
55+
var _ Definition = AggregateDefinition{}
56+
57+
// NewAggregateDefinition constructs a new AggregateDefinition instance
58+
// and implements the `Definition` interface.
59+
// The passed in `definitions` will be stored in the AggregateDefinition's
60+
// state and a call to the AggregateDefinition's methods will be dispatched to all
61+
// stored definitions.
62+
// Importantly, this constructor will unpack the internal definitions of any passed in,
63+
// wrapped AggregateDefinition. This means that the outer AggregateDefinition always stores
64+
// the flattened set of all underlying child-definitions.
65+
func NewAggregateDefinition(name string, definitions ...Definition) AggregateDefinition {
66+
defs := map[Definition]bool{}
67+
for _, def := range definitions {
68+
nestedDefs, isWrapper := def.(AggregateDefinition)
69+
if isWrapper {
70+
for deff := range nestedDefs.defs {
71+
defs[deff] = true
72+
}
73+
} else {
74+
defs[def] = true
75+
}
76+
}
77+
return AggregateDefinition{name: name, defs: defs}
78+
}
79+
80+
type AggregateDefinition struct {
81+
name string
82+
defs map[Definition]bool
83+
}
84+
85+
func (d AggregateDefinition) Name() string {
86+
return d.name
87+
}
88+
89+
func (d AggregateDefinition) Init(ctx context.Context, tx pgx.Tx) error {
90+
for def := range d.defs {
91+
err := def.Init(ctx, tx)
92+
if err != nil {
93+
return errors.Wrapf(err, "can't initialize DB for definition '%s'", def.Name())
94+
}
95+
}
96+
return nil
97+
}
98+
99+
func (d AggregateDefinition) Create(ctx context.Context, tx pgx.Tx) error {
100+
err := metaDefinition.Create(ctx, tx)
101+
if err != nil {
102+
return errors.Wrap(err, "can't create DB for meta definition")
103+
}
104+
for def := range d.defs {
105+
err := def.Create(ctx, tx)
106+
if err != nil {
107+
return errors.Wrapf(err, "can't create DB for definition '%s'", def.Name())
108+
}
109+
}
110+
return nil
111+
}
112+
113+
func (d AggregateDefinition) Validate(ctx context.Context, tx pgx.Tx) error {
114+
for def := range d.defs {
115+
err := def.Validate(ctx, tx)
116+
if err != nil {
117+
return errors.Wrapf(err, "validation error for DB '%s'", def.Name())
118+
}
119+
}
120+
return nil
121+
}
122+
123+
type Definition interface {
124+
Name() string
125+
Create(context.Context, pgx.Tx) error
126+
Init(context.Context, pgx.Tx) error
127+
Validate(context.Context, pgx.Tx) error
128+
}
129+
130+
type Schema struct {
131+
Version int
132+
Name string
133+
Path string
134+
}
135+
136+
type Migration struct {
137+
Version int
138+
Path string
139+
Up bool // up or down migration
140+
}

rolling-shutter/medley/db/meta.sqlc.gen.go

Lines changed: 35 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rolling-shutter/medley/db/metadb.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package db
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/jackc/pgx/v4"
8+
"github.com/pkg/errors"
9+
"github.com/rs/zerolog/log"
10+
)
11+
12+
var (
13+
ErrValueMismatch = errors.New("database has unexpected value")
14+
ErrKeyNotFound = errors.New("key does not exist")
15+
)
16+
17+
var DatabaseVersionKey string = "database-version"
18+
19+
// MakeSchemaVersionKey generates the version key for the metadb
20+
// that is used to check database schema compatibility.
21+
// `definitionName` is the name of the database definition that corresponds
22+
// to a database subset (one entry in an sqlc file).
23+
// `schemaName` corresponds to the individual *.sql files in the schema folder.
24+
func MakeSchemaVersionKey(definitionName, schemaName string) string {
25+
return "schema-version-" + definitionName + "-" + schemaName
26+
}
27+
28+
func InsertDBVersion(ctx context.Context, tx pgx.Tx, version string) error {
29+
return insertMetaInf(ctx, tx, DatabaseVersionKey, version)
30+
}
31+
32+
func InsertSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) error {
33+
return insertMetaInf(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(schema.Version))
34+
}
35+
36+
func insertMetaInf(ctx context.Context, tx pgx.Tx, key, val string) error {
37+
log.Info().Str("key", key).
38+
Str("value", val).Msg("insert schema meta inf")
39+
return New(tx).InsertMeta(ctx, InsertMetaParams{
40+
Key: key,
41+
Value: val,
42+
})
43+
}
44+
45+
// ValidateSchemaVersion checks that the database schema is compatible.
46+
func ValidateSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) error {
47+
return expectMetaKeyVal(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(schema.Version))
48+
}
49+
50+
func expectMetaKeyVal(ctx context.Context, tx pgx.Tx, key, val string) error {
51+
haveVal, err := New(tx).GetMeta(ctx, key)
52+
if err == pgx.ErrNoRows {
53+
return errors.Wrapf(ErrKeyNotFound, "key: %s", key)
54+
} else if err != nil {
55+
return errors.Wrapf(err, "failed to get key '%s' from meta_inf table", key)
56+
}
57+
if haveVal != val {
58+
return errors.Wrapf(ErrValueMismatch, "expected %s, have %s", val, haveVal)
59+
}
60+
return nil
61+
}
62+
63+
// ValidateDatabaseVersion checks that the overall database is compatible.
64+
// This corresponds to the "role" of the database, e.g. a snapshot-keyper
65+
// might not be compatible with a snapshot test-keyper, even if the schema's
66+
// versions would match exactly.
67+
func ValidateDatabaseVersion(ctx context.Context, tx pgx.Tx, version string) error {
68+
return expectMetaKeyVal(ctx, tx, DatabaseVersionKey, version)
69+
}

rolling-shutter/medley/db/models.sqlc.gen.go

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rolling-shutter/medley/db/querier.sqlc.gen.go

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- name: InsertMeta :exec
2+
INSERT INTO meta_inf (key, value) VALUES ($1, $2);
3+
4+
-- name: GetMeta :one
5+
SELECT value FROM meta_inf WHERE key = $1;

0 commit comments

Comments
 (0)