Skip to content

Commit 17f7632

Browse files
authored
[-] fix config upgrade circular dependency, closes #1156 (#1171)
* [-] fix `config upgrade` circular dependency, closes #1156 postpone migration check to `cmdopts` functions * fix tests * move Migrator interface to db package * add `db.NeedsMigration()` * make linter happy
1 parent d188371 commit 17f7632

File tree

9 files changed

+102
-91
lines changed

9 files changed

+102
-91
lines changed

internal/cmdopts/cmdconfig.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66

7+
"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
78
"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
9+
"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
810
"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
911
)
1012

@@ -96,7 +98,7 @@ func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
9698
opts.CompleteCommand(ExitCodeConfigError)
9799
return
98100
}
99-
if m, ok := opts.MetricsReaderWriter.(metrics.Migrator); ok {
101+
if m, ok := opts.MetricsReaderWriter.(db.Migrator); ok {
100102
err = m.Migrate()
101103
if err != nil {
102104
opts.CompleteCommand(ExitCodeConfigError)
@@ -109,12 +111,12 @@ func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
109111
}
110112
// Upgrade sinks configuration if it's postgres
111113
if len(opts.Sinks.Sinks) > 0 {
112-
err = opts.InitSinkWriter(ctx)
114+
opts.SinksWriter, err = sinks.NewSinkWriter(ctx, &opts.Sinks)
113115
if err != nil {
114116
opts.CompleteCommand(ExitCodeConfigError)
115117
return
116118
}
117-
if m, ok := opts.SinksWriter.(metrics.Migrator); ok {
119+
if m, ok := opts.SinksWriter.(db.Migrator); ok {
118120
err = m.Migrate()
119121
if err != nil {
120122
opts.CompleteCommand(ExitCodeConfigError)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package cmdopts
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
9+
"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
10+
"github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
11+
"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
// TestConfigUpgrade_VerifyNoCircularDependency tests that config upgrade can be run even when the schema
17+
// needs migrations, proving there's no circular dependency
18+
func TestConfigUpgrade_VerifyNoCircularDependency(t *testing.T) {
19+
if testing.Short() {
20+
t.Skip("Skipping integration test")
21+
}
22+
23+
// Create a PostgreSQL container with empty database (no schema)
24+
pgContainer, tearDown, err := testutil.SetupPostgresContainerWithInitScripts()
25+
require.NoError(t, err)
26+
defer tearDown()
27+
28+
connStr, err := pgContainer.ConnectionString(testutil.TestContext)
29+
require.NoError(t, err)
30+
31+
ctx := context.Background()
32+
opts := &Options{
33+
Metrics: metrics.CmdOpts{Metrics: connStr},
34+
Sources: sources.CmdOpts{Sources: connStr, Refresh: 120, MaxParallelConnectionsPerDb: 1},
35+
Sinks: sinks.CmdOpts{BatchingDelay: time.Second},
36+
}
37+
38+
// This is the key test: config upgrade should work even on empty database
39+
// (or database needing migrations). Before the fix, this would fail with
40+
// circular dependency because InitConfigReaders would fail, preventing
41+
// the upgrade from running
42+
cmd := ConfigUpgradeCommand{owner: opts}
43+
err = cmd.Execute(nil)
44+
assert.NoError(t, err)
45+
assert.Equal(t, ExitCodeOK, opts.ExitCode)
46+
47+
// After successful upgrade, InitConfigReaders should succeed
48+
opts2 := &Options{
49+
Metrics: metrics.CmdOpts{Metrics: connStr},
50+
Sources: sources.CmdOpts{Sources: connStr, Refresh: 120, MaxParallelConnectionsPerDb: 1},
51+
Sinks: sinks.CmdOpts{BatchingDelay: time.Second},
52+
}
53+
err = opts2.InitConfigReaders(ctx)
54+
assert.NoError(t, err)
55+
assert.NotNil(t, opts2.MetricsReaderWriter)
56+
assert.NotNil(t, opts2.SourcesReaderWriter)
57+
}

internal/cmdopts/cmdoptions.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"time"
1010

11+
"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
1112
"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
1213
"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
1314
"github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
@@ -156,30 +157,37 @@ func (c *Options) InitSourceReader(ctx context.Context) (err error) {
156157

157158
// InitConfigReaders creates the configuration readers based on the configuration kind from the options.
158159
func (c *Options) InitConfigReaders(ctx context.Context) error {
159-
return errors.Join(c.InitMetricReader(ctx), c.InitSourceReader(ctx))
160+
err := errors.Join(c.InitMetricReader(ctx), c.InitSourceReader(ctx))
161+
if err != nil {
162+
return err
163+
}
164+
return db.NeedsMigration(c.MetricsReaderWriter, metrics.ErrNeedsMigration)
160165
}
161166

162167
// InitSinkWriter creates a new MultiWriter instance if needed.
163168
func (c *Options) InitSinkWriter(ctx context.Context) (err error) {
164169
c.SinksWriter, err = sinks.NewSinkWriter(ctx, &c.Sinks)
165-
return
170+
if err != nil {
171+
return err
172+
}
173+
return db.NeedsMigration(c.SinksWriter, sinks.ErrNeedsMigration)
166174
}
167175

168176
// NeedsSchemaUpgrade checks if the configuration database schema needs an upgrade.
169177
func (c *Options) NeedsSchemaUpgrade() (upgrade bool, err error) {
170-
if m, ok := c.SourcesReaderWriter.(metrics.Migrator); ok {
178+
if m, ok := c.SourcesReaderWriter.(db.Migrator); ok {
171179
upgrade, err = m.NeedsMigration()
172180
}
173181
if upgrade || err != nil {
174182
return
175183
}
176-
if m, ok := c.MetricsReaderWriter.(metrics.Migrator); ok {
184+
if m, ok := c.MetricsReaderWriter.(db.Migrator); ok {
177185
upgrade, err = m.NeedsMigration()
178186
}
179187
if upgrade || err != nil {
180188
return
181189
}
182-
if m, ok := c.SinksWriter.(metrics.Migrator); ok {
190+
if m, ok := c.SinksWriter.(db.Migrator); ok {
183191
return m.NeedsMigration()
184192
}
185193
return

internal/db/conn.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,23 @@ type PgxPoolIface interface {
4646
Stat() *pgxpool.Stat
4747
}
4848

49+
// Migrator is an interface for database schema migration
50+
type Migrator interface {
51+
Migrate() error
52+
NeedsMigration() (bool, error)
53+
}
54+
55+
func NeedsMigration(storage any, needsMigrationErr error) error {
56+
if m, ok := storage.(Migrator); ok {
57+
if needsMigration, err := m.NeedsMigration(); err != nil {
58+
return err
59+
} else if needsMigration {
60+
return needsMigrationErr
61+
}
62+
}
63+
return nil
64+
}
65+
4966
func MarshallParamToJSONB(v any) any {
5067
if v == nil {
5168
return nil

internal/metrics/postgres.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,6 @@ func NewPostgresMetricReaderWriterConn(ctx context.Context, conn db.PgxPoolIface
2525
ctx: ctx,
2626
configDb: conn,
2727
}
28-
// Check if migrations are needed
29-
if needsMigration, err := dmrw.NeedsMigration(); err != nil {
30-
return nil, err
31-
} else if needsMigration {
32-
return nil, ErrNeedsMigration
33-
}
3428
return dmrw, conn.Ping(ctx)
3529
}
3630

@@ -50,7 +44,7 @@ var (
5044
)
5145

5246
// make sure *dbMetricReaderWriter implements the Migrator interface
53-
var _ Migrator = (*dbMetricReaderWriter)(nil)
47+
var _ db.Migrator = (*dbMetricReaderWriter)(nil)
5448

5549
// writeMetricsToPostgres writes the metrics and presets definitions to the
5650
// pgwatch.metric and pgwatch.preset tables in the ConfigDB.

internal/metrics/postgres_test.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ func TestNewPostgresMetricReaderWriterConn(t *testing.T) {
5555
conn.ExpectExec(`INSERT.+preset`).WithArgs(AnyArgs(3)...).WillReturnResult(pgxmock.NewResult("INSERT", 1)).Times(uint(presetsCount))
5656
conn.ExpectCommit()
5757
conn.ExpectCommit()
58-
// Expect migration check
59-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
60-
conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(metrics.MigrationsCount))
6158
conn.ExpectPing()
6259

6360
readerWriter, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn)
@@ -163,24 +160,12 @@ func TestNewPostgresMetricReaderWriterConn(t *testing.T) {
163160
a.NoError(conn.ExpectationsWereMet())
164161
})
165162

166-
t.Run("MigrationCheckFail", func(*testing.T) {
163+
t.Run("SchemaExists", func(*testing.T) {
167164
conn.ExpectQuery(`SELECT EXISTS`).WithArgs("pgwatch").WillReturnRows(pgxmock.NewRows([]string{"exists"}).AddRow(true))
168-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnError(assert.AnError)
169-
rw, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn)
170-
a.Error(err)
171-
a.Nil(rw)
172-
a.NoError(conn.ExpectationsWereMet())
173-
})
174-
175-
t.Run("MigrationNeeded", func(*testing.T) {
176-
conn.ExpectQuery(`SELECT EXISTS`).WithArgs("pgwatch").WillReturnRows(pgxmock.NewRows([]string{"exists"}).AddRow(true))
177-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
178-
conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(metrics.MigrationsCount - 1))
165+
conn.ExpectPing()
179166
rw, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn)
180-
a.Error(err)
181-
a.ErrorContains(err, "config database schema is outdated")
182-
a.ErrorContains(err, "pgwatch config upgrade")
183-
a.Nil(rw)
167+
a.NoError(err)
168+
a.NotNil(rw)
184169
a.NoError(conn.ExpectationsWereMet())
185170
})
186171
}
@@ -191,9 +176,6 @@ func TestMetricsToPostgres(t *testing.T) {
191176
a.NoError(err)
192177

193178
conn.ExpectQuery(`SELECT EXISTS`).WithArgs("pgwatch").WillReturnRows(pgxmock.NewRows([]string{"exists"}).AddRow(true))
194-
// Expect migration check
195-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
196-
conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(metrics.MigrationsCount))
197179
conn.ExpectPing()
198180

199181
readerWriter, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn)

internal/metrics/types.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,6 @@ type Writer interface {
209209
CreatePreset(presetName string, preset Preset) error
210210
}
211211

212-
type Migrator interface {
213-
Migrate() error
214-
NeedsMigration() (bool, error)
215-
}
216-
217212
type ReaderWriter interface {
218213
Reader
219214
Writer

internal/sinks/postgres.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,14 @@ type PostgresWriter struct {
6868
maintenanceInterval time.Duration
6969
input chan metrics.MeasurementEnvelope
7070
lastError chan error
71-
forceRecreatePartitions bool // to signal override PG metrics storage cache
72-
partitionMapMetric map[string]ExistingPartitionInfo // metric = min/max bounds
71+
forceRecreatePartitions bool // to signal override PG metrics storage cache
72+
partitionMapMetric map[string]ExistingPartitionInfo // metric = min/max bounds
7373
partitionMapMetricDbname map[string]map[string]ExistingPartitionInfo // metric[dbname = min/max bounds]
7474
}
7575

76+
// make sure *dbMetricReaderWriter implements the Migrator interface
77+
var _ db.Migrator = (*PostgresWriter)(nil)
78+
7679
func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
7780
var conn db.PgxPoolIface
7881
if conn, err = db.New(ctx, connstr); err != nil {
@@ -100,11 +103,6 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
100103
if err = pgw.init(); err != nil {
101104
return nil, err
102105
}
103-
if needsMigration, e := pgw.NeedsMigration(); e != nil {
104-
return nil, e
105-
} else if needsMigration {
106-
return nil, ErrNeedsMigration
107-
}
108106
if err = pgw.ReadMetricSchemaType(); err != nil {
109107
return nil, err
110108
}

internal/sinks/postgres_test.go

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
5656
pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true),
5757
)
5858
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
59-
// Expect migration check (before ReadMetricSchemaType)
60-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
61-
conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount))
6259
conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
6360
for _, m := range metrics.GetDefaultBuiltInMetrics() {
6461
conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
@@ -82,42 +79,7 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
8279
a.NoError(conn.ExpectationsWereMet())
8380
})
8481

85-
t.Run("MigrationCheckFail", func(*testing.T) {
86-
conn, err := pgxmock.NewPool()
87-
a.NoError(err)
88-
89-
conn.ExpectPing()
90-
conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows(
91-
pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true),
92-
)
93-
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
94-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnError(assert.AnError)
95-
96-
pgw, err := NewWriterFromPostgresConn(ctx, conn, opts)
97-
a.Error(err)
98-
a.Nil(pgw)
99-
a.NoError(conn.ExpectationsWereMet())
100-
})
10182

102-
t.Run("MigrationNeeded", func(*testing.T) {
103-
conn, err := pgxmock.NewPool()
104-
a.NoError(err)
105-
106-
conn.ExpectPing()
107-
conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows(
108-
pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true),
109-
)
110-
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
111-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
112-
conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount - 1))
113-
114-
pgw, err := NewWriterFromPostgresConn(ctx, conn, opts)
115-
a.ErrorIs(err, ErrNeedsMigration)
116-
a.ErrorContains(err, "sink database schema is outdated")
117-
a.ErrorContains(err, "pgwatch config upgrade")
118-
a.Nil(pgw)
119-
a.NoError(conn.ExpectationsWereMet())
120-
})
12183

12284
t.Run("ReadMetricSchemaTypeFail", func(*testing.T) {
12385
conn, err := pgxmock.NewPool()
@@ -128,8 +90,6 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
12890
pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true),
12991
)
13092
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
131-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
132-
conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount))
13393
conn.ExpectQuery("SELECT schema_type").WillReturnError(assert.AnError)
13494

13595
pgw, err := NewWriterFromPostgresConn(ctx, conn, opts)
@@ -147,8 +107,6 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
147107
pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true),
148108
)
149109
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
150-
conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true))
151-
conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount))
152110
conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
153111
conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(pgxmock.AnyArg()).WillReturnError(assert.AnError)
154112

0 commit comments

Comments
 (0)