Skip to content

Commit 8b2d7e1

Browse files
committed
feat: add migrate package
Adds a `migrate` package with a mechanism to run an ordered set of migrations on startup, such as Etcd data migrations. The migrations runner uses the `election` package to ensure that only one server instance is running the migrations at a time. Other instances will block startup until the migrations are complete, so we should aim to keep migrations as lightweight as possible. As stated in the package documentation, these migrations should be idempotent and, where possible, non-destructive. PLAT-347
1 parent 9238c9f commit 8b2d7e1

File tree

11 files changed

+809
-2
lines changed

11 files changed

+809
-2
lines changed

server/cmd/root.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,23 @@ package cmd
33
import (
44
"fmt"
55

6-
"github.com/pgEdge/control-plane/server/internal/cluster"
7-
"github.com/pgEdge/control-plane/server/internal/election"
86
"github.com/rs/zerolog"
97
"github.com/samber/do"
108
"github.com/spf13/cobra"
119

1210
"github.com/pgEdge/control-plane/server/internal/api"
1311
"github.com/pgEdge/control-plane/server/internal/certificates"
12+
"github.com/pgEdge/control-plane/server/internal/cluster"
1413
"github.com/pgEdge/control-plane/server/internal/config"
1514
"github.com/pgEdge/control-plane/server/internal/database"
1615
"github.com/pgEdge/control-plane/server/internal/docker"
16+
"github.com/pgEdge/control-plane/server/internal/election"
1717
"github.com/pgEdge/control-plane/server/internal/etcd"
1818
"github.com/pgEdge/control-plane/server/internal/filesystem"
1919
"github.com/pgEdge/control-plane/server/internal/host"
2020
"github.com/pgEdge/control-plane/server/internal/ipam"
2121
"github.com/pgEdge/control-plane/server/internal/logging"
22+
"github.com/pgEdge/control-plane/server/internal/migrate"
2223
"github.com/pgEdge/control-plane/server/internal/monitor"
2324
"github.com/pgEdge/control-plane/server/internal/orchestrator"
2425
"github.com/pgEdge/control-plane/server/internal/orchestrator/swarm"
@@ -69,6 +70,7 @@ func newRootCmd(i *do.Injector) *cobra.Command {
6970
host.Provide(i)
7071
ipam.Provide(i)
7172
logging.Provide(i)
73+
migrate.Provide(i)
7274
monitor.Provide(i)
7375
resource.Provide(i)
7476
scheduler.Provide(i)

server/internal/app/app.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/pgEdge/control-plane/server/internal/database"
1616
"github.com/pgEdge/control-plane/server/internal/etcd"
1717
"github.com/pgEdge/control-plane/server/internal/host"
18+
"github.com/pgEdge/control-plane/server/internal/migrate"
1819
"github.com/pgEdge/control-plane/server/internal/monitor"
1920
"github.com/pgEdge/control-plane/server/internal/scheduler"
2021
"github.com/pgEdge/control-plane/server/internal/workflows"
@@ -110,25 +111,37 @@ func (a *App) runInitialized(ctx context.Context) error {
110111
return err
111112
}
112113

114+
// Run migrations before starting other services
115+
migrationRunner, err := do.Invoke[*migrate.Runner](a.i)
116+
if err != nil {
117+
return handleError(fmt.Errorf("failed to initialize migration runner: %w", err))
118+
}
119+
if err := migrationRunner.Run(ctx); err != nil {
120+
return handleError(fmt.Errorf("failed to run migrations: %w", err))
121+
}
122+
113123
certSvc, err := do.Invoke[*certificates.Service](a.i)
114124
if err != nil {
115125
return handleError(fmt.Errorf("failed to initialize certificate service: %w", err))
116126
}
117127
if err := certSvc.Start(ctx); err != nil {
118128
return handleError(fmt.Errorf("failed to start certificate service: %w", err))
119129
}
130+
120131
hostSvc, err := do.Invoke[*host.Service](a.i)
121132
if err != nil {
122133
return handleError(fmt.Errorf("failed to initialize host service: %w", err))
123134
}
124135
if err := hostSvc.UpdateHost(ctx); err != nil {
125136
return handleError(fmt.Errorf("failed to update host: %w", err))
126137
}
138+
127139
hostTicker, err := do.Invoke[*host.UpdateTicker](a.i)
128140
if err != nil {
129141
return handleError(fmt.Errorf("failed to initialize host ticker: %w", err))
130142
}
131143
hostTicker.Start(ctx)
144+
132145
monitorSvc, err := do.Invoke[*monitor.Service](a.i)
133146
if err != nil {
134147
return handleError(fmt.Errorf("failed to initialize monitor service: %w", err))
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package migrate
2+
3+
// allMigrations returns the ordered list of migrations.
4+
// Order matters - migrations are executed in slice order.
5+
// Add new migrations to this list in chronological order.
6+
func allMigrations() []Migration {
7+
return []Migration{
8+
// Add migrations here in chronological order
9+
// Example:
10+
// &AddHostMetadataField{},
11+
// &RenameDatabaseStatus{},
12+
}
13+
}

server/internal/migrate/doc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Package migrate provides a mechanism for arbitrary migration operations that
2+
// should block startup, such as moving Etcd objects from one key to another.
3+
// IMPORTANT: migrations _must_ be written to be idempotent, and we should
4+
// prefer non-destructive updates in order to allow rollbacks.
5+
package migrate
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package migrate
2+
3+
import (
4+
"context"
5+
6+
"github.com/samber/do"
7+
)
8+
9+
// Migration defines the interface for data migrations.
10+
type Migration interface {
11+
// Identifier returns a unique semantic name for this migration.
12+
Identifier() string
13+
// Run executes the migration using dependencies from the injector.
14+
// The context should be used for cancellation and timeouts.
15+
Run(ctx context.Context, i *do.Injector) error
16+
}

server/internal/migrate/provide.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package migrate
2+
3+
import (
4+
"time"
5+
6+
"github.com/rs/zerolog"
7+
"github.com/samber/do"
8+
clientv3 "go.etcd.io/etcd/client/v3"
9+
10+
"github.com/pgEdge/control-plane/server/internal/config"
11+
"github.com/pgEdge/control-plane/server/internal/election"
12+
)
13+
14+
const ElectionName = election.Name("migration_runner")
15+
const LockTTL time.Duration = 30 * time.Second
16+
17+
// Provide registers migration dependencies with the injector.
18+
func Provide(i *do.Injector) {
19+
provideStore(i)
20+
provideRunner(i)
21+
}
22+
23+
func provideStore(i *do.Injector) {
24+
do.Provide(i, func(i *do.Injector) (*Store, error) {
25+
cfg, err := do.Invoke[config.Config](i)
26+
if err != nil {
27+
return nil, err
28+
}
29+
client, err := do.Invoke[*clientv3.Client](i)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return NewStore(client, cfg.EtcdKeyRoot), nil
34+
})
35+
}
36+
37+
func provideRunner(i *do.Injector) {
38+
do.Provide(i, func(i *do.Injector) (*Runner, error) {
39+
store, err := do.Invoke[*Store](i)
40+
if err != nil {
41+
return nil, err
42+
}
43+
cfg, err := do.Invoke[config.Config](i)
44+
if err != nil {
45+
return nil, err
46+
}
47+
logger, err := do.Invoke[zerolog.Logger](i)
48+
if err != nil {
49+
return nil, err
50+
}
51+
electionSvc, err := do.Invoke[*election.Service](i)
52+
if err != nil {
53+
return nil, err
54+
}
55+
locker := electionSvc.NewCandidate(ElectionName, cfg.HostID, LockTTL)
56+
return NewRunner(
57+
cfg.HostID,
58+
store,
59+
i,
60+
logger,
61+
allMigrations(),
62+
locker,
63+
), nil
64+
})
65+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package migrate
2+
3+
import (
4+
"time"
5+
6+
"github.com/pgEdge/control-plane/server/internal/storage"
7+
"github.com/pgEdge/control-plane/server/internal/version"
8+
clientv3 "go.etcd.io/etcd/client/v3"
9+
)
10+
11+
// StoredResult tracks the outcome of a specific migration.
12+
type StoredResult struct {
13+
storage.StoredValue
14+
Identifier string `json:"identifier"`
15+
Successful bool `json:"successful"`
16+
StartedAt time.Time `json:"started_at"`
17+
CompletedAt time.Time `json:"completed_at"`
18+
RunByHostID string `json:"run_by_host_id"`
19+
RunByVersionInfo *version.Info `json:"run_by_version_info"`
20+
Error string `json:"error,omitempty"`
21+
}
22+
23+
type ResultStore struct {
24+
client *clientv3.Client
25+
root string
26+
}
27+
28+
func NewResultStore(client *clientv3.Client, root string) *ResultStore {
29+
return &ResultStore{
30+
client: client,
31+
root: root,
32+
}
33+
}
34+
35+
func (s *ResultStore) Prefix() string {
36+
return storage.Prefix(s.root, "migrations", "results")
37+
}
38+
39+
func (s *ResultStore) Key(identifier string) string {
40+
return storage.Key(s.Prefix(), identifier)
41+
}
42+
43+
func (s *ResultStore) Get(identifier string) storage.GetOp[*StoredResult] {
44+
return storage.NewGetOp[*StoredResult](s.client, s.Key(identifier))
45+
}
46+
47+
func (s *ResultStore) Put(item *StoredResult) storage.PutOp[*StoredResult] {
48+
return storage.NewPutOp(s.client, s.Key(item.Identifier), item)
49+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package migrate
2+
3+
import (
4+
"github.com/pgEdge/control-plane/server/internal/storage"
5+
clientv3 "go.etcd.io/etcd/client/v3"
6+
)
7+
8+
// StoredRevision tracks the most recently applied migration.
9+
type StoredRevision struct {
10+
storage.StoredValue
11+
Identifier string `json:"identifier"`
12+
}
13+
14+
type RevisionStore struct {
15+
client *clientv3.Client
16+
root string
17+
}
18+
19+
func NewRevisionStore(client *clientv3.Client, root string) *RevisionStore {
20+
return &RevisionStore{
21+
client: client,
22+
root: root,
23+
}
24+
}
25+
26+
func (s *RevisionStore) Key() string {
27+
return storage.Key(s.root, "migrations", "revision")
28+
}
29+
30+
func (s *RevisionStore) Get() storage.GetOp[*StoredRevision] {
31+
return storage.NewGetOp[*StoredRevision](s.client, s.Key())
32+
}
33+
34+
func (s *RevisionStore) Create(item *StoredRevision) storage.PutOp[*StoredRevision] {
35+
return storage.NewCreateOp(s.client, s.Key(), item)
36+
}
37+
38+
func (s *RevisionStore) Update(item *StoredRevision) storage.PutOp[*StoredRevision] {
39+
return storage.NewUpdateOp(s.client, s.Key(), item)
40+
}
41+
42+
func (s *RevisionStore) Watch() storage.WatchOp[*StoredRevision] {
43+
return storage.NewWatchOp[*StoredRevision](s.client, s.Key())
44+
}

0 commit comments

Comments
 (0)