Skip to content

Commit 5830565

Browse files
committed
feat: wip
1 parent 4b1dbf3 commit 5830565

File tree

5 files changed

+303
-0
lines changed

5 files changed

+303
-0
lines changed

internal/controller/postgresql/postgres/database.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
GetTablesFromSchemaSQLTemplate = `SELECT tablename,tableowner FROM pg_tables WHERE schemaname = '%s'`
2929
ChangeTableOwnerSQLTemplate = `ALTER TABLE IF EXISTS "%s" OWNER TO "%s"`
3030
ChangeTypeOwnerSQLTemplate = `ALTER TYPE "%s"."%s" OWNER TO "%s"`
31+
GetColumnsFromTableSQLTemplate = `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
3132
// Got and edited from : https://stackoverflow.com/questions/3660787/how-to-list-custom-types-using-postgres-information-schema
3233
GetTypesFromSchemaSQLTemplate = `SELECT t.typname as type, pg_catalog.pg_get_userbyid(t.typowner) as owner
3334
FROM pg_type t
@@ -38,6 +39,43 @@ AND n.nspname = '%s';`
3839
DuplicateDatabaseErrorCode = "42P04"
3940
)
4041

42+
func (c *pg) GetColumnNamesFromTable(ctx context.Context, database string, schemaName string, tableName string) ([]string, error) {
43+
err := c.connect(database)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
rows, err := c.db.QueryContext(ctx, fmt.Sprintf(GetColumnsFromTableSQLTemplate, schemaName, tableName))
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
defer rows.Close()
54+
55+
res := []string{}
56+
57+
for rows.Next() {
58+
it := ""
59+
// Scan
60+
err = rows.Scan(&it)
61+
// Check error
62+
if err != nil {
63+
return nil, err
64+
}
65+
// Save
66+
res = append(res, it)
67+
}
68+
69+
// Rows error
70+
err = rows.Err()
71+
// Check error
72+
if err != nil {
73+
return nil, err
74+
}
75+
76+
return res, nil
77+
}
78+
4179
func (c *pg) GetDatabaseOwner(ctx context.Context, dbname string) (string, error) {
4280
err := c.connect(c.defaultDatabase)
4381
if err != nil {

internal/controller/postgresql/postgres/postgres.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,11 @@ type PG interface { //nolint:interfacebloat // This is needed
6868
CreatePublication(ctx context.Context, dbname string, builder *CreatePublicationBuilder) error
6969
UpdatePublication(ctx context.Context, dbname, publicationName string, builder *UpdatePublicationBuilder) error
7070
ChangePublicationOwner(ctx context.Context, dbname string, publicationName string, owner string) error
71+
GetPublicationTablesDetails(ctx context.Context, db, publicationName string) ([]*PublicationTableDetail, error)
7172
DropReplicationSlot(ctx context.Context, name string) error
7273
CreateReplicationSlot(ctx context.Context, dbname, name, plugin string) error
7374
GetReplicationSlot(ctx context.Context, name string) (*ReplicationSlotResult, error)
75+
GetColumnNamesFromTable(ctx context.Context, database string, schemaName string, tableName string) ([]string, error)
7476
GetUser() string
7577
GetHost() string
7678
GetPort() int

internal/controller/postgresql/postgres/publication.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
pg_catalog.pg_get_userbyid(pubowner), puballtables, pubinsert, pubupdate, pubdelete, pubtruncate, pubviaroot
1919
FROM pg_catalog.pg_publication
2020
WHERE pubname = '%s';`
21+
GetPublicationTablesSQLTemplate = `SELECT schemaname, tablename, attnames, rowfilter FROM pg_publication_tables WHERE pubname = '%s'`
2122
GetReplicationSlotSQLTemplate = `SELECT slot_name,plugin,database FROM pg_replication_slots WHERE slot_name = '%s'`
2223
CreateReplicationSlotSQLTemplate = `SELECT pg_create_logical_replication_slot('%s', '%s')`
2324
DropReplicationSlotSQLTemplate = `SELECT pg_drop_replication_slot('%s')`
@@ -46,6 +47,48 @@ type ReplicationSlotResult struct {
4647
Database string
4748
}
4849

50+
func (c *pg) GetPublicationTablesDetails(ctx context.Context, db, publicationName string) ([]*PublicationTableDetail, error) {
51+
err := c.connect(db)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
rows, err := c.db.QueryContext(ctx, fmt.Sprintf(GetPublicationTablesSQLTemplate, publicationName))
57+
if err != nil {
58+
return nil, err
59+
}
60+
61+
defer rows.Close()
62+
63+
res := []*PublicationTableDetail{}
64+
65+
for rows.Next() {
66+
var it PublicationTableDetail
67+
var pqSA pq.StringArray
68+
// Scan
69+
err = rows.Scan(&it.SchemaName, &it.TableName, &pqSA, &it.AdditionalWhere)
70+
// Check error
71+
if err != nil {
72+
return nil, err
73+
}
74+
// Save
75+
// ? Note: getting a list of string from pg imply a decode
76+
// ? See issue: https://github.com/cockroachdb/cockroach/issues/39770#issuecomment-576170805
77+
it.Columns = pqSA
78+
// Save
79+
res = append(res, &it)
80+
}
81+
82+
// Rows error
83+
err = rows.Err()
84+
// Check error
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
return res, nil
90+
}
91+
4992
func (c *pg) DropReplicationSlot(ctx context.Context, name string) error {
5093
err := c.connect(c.defaultDatabase)
5194
if err != nil {

internal/controller/postgresql/postgresqlpublication_controller.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package postgresql
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"reflect"
23+
"strings"
2224
"time"
2325

2426
"k8s.io/apimachinery/pkg/api/errors"
@@ -39,6 +41,9 @@ import (
3941
)
4042

4143
const DefaultReplicationSlotPlugin = "pgoutput"
44+
const DefaultPGWithPublishParameter = "insert, update, delete, truncate"
45+
46+
var DefaultPGWithPublishViaRootParameter = false
4247

4348
// PostgresqlPublicationReconciler reconciles a PostgresqlPublication object.
4449
type PostgresqlPublicationReconciler struct {
@@ -257,6 +262,20 @@ func (r *PostgresqlPublicationReconciler) mainReconcile(
257262
if err != nil {
258263
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
259264
}
265+
} else {
266+
// Check if reconcile from PG state is necessary because spec haven't been changed
267+
need, err := r.isReconcileOnPGNecessary(ctx, instance, pg, pgDB, pubRes, nameToSearch)
268+
// Check error
269+
if err != nil {
270+
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
271+
}
272+
273+
// Check if it is needed
274+
if need {
275+
reqLogger.Info("PG state have been changed but not via operator, update need to be done")
276+
277+
err = r.manageUpdate(ctx, instance, pg, pgDB, pubRes, nameToSearch)
278+
}
260279
}
261280

262281
// Check if owner are aligned
@@ -316,6 +335,138 @@ func (r *PostgresqlPublicationReconciler) mainReconcile(
316335
return r.manageSuccess(ctx, reqLogger, instance, originalPatch)
317336
}
318337

338+
func (*PostgresqlPublicationReconciler) isReconcileOnPGNecessary(
339+
ctx context.Context,
340+
instance *v1alpha1.PostgresqlPublication,
341+
pg postgres.PG,
342+
pgDB *v1alpha1.PostgresqlDatabase,
343+
pubRes *postgres.PublicationResult,
344+
currentPublicationName string,
345+
) (bool, error) {
346+
instanceSpec := instance.Spec
347+
348+
// Check with parameters
349+
// nil spec case
350+
if instanceSpec.WithParameters == nil && (pubRes.PublicationViaRoot || !pubRes.Delete || !pubRes.Insert || !pubRes.Truncate || !pubRes.Update) {
351+
return true, nil
352+
}
353+
// Non nil spec case
354+
if instanceSpec.WithParameters != nil {
355+
// publication via root check
356+
if (instanceSpec.WithParameters.PublishViaPartitionRoot == nil && pubRes.PublicationViaRoot) ||
357+
(instanceSpec.WithParameters.PublishViaPartitionRoot != nil && *instanceSpec.WithParameters.PublishViaPartitionRoot != pubRes.PublicationViaRoot) {
358+
return true, nil
359+
}
360+
361+
// Now check publish parameters
362+
// Save
363+
publish := strings.ToLower(instanceSpec.WithParameters.Publish)
364+
// Empty spec case
365+
if publish == "" && (!pubRes.Delete || !pubRes.Insert || !pubRes.Truncate || !pubRes.Update) {
366+
return true, nil
367+
}
368+
// Not empty case
369+
if publish != "" &&
370+
(strings.Contains(publish, "insert") != pubRes.Insert ||
371+
strings.Contains(publish, "update") != pubRes.Update ||
372+
strings.Contains(publish, "delete") != pubRes.Delete ||
373+
strings.Contains(publish, "truncate") != pubRes.Truncate) {
374+
return true, nil
375+
}
376+
}
377+
378+
// Check if it is a all tables
379+
if instanceSpec.AllTables {
380+
// This state cannot be updated so.. Ignoring it
381+
return false, nil
382+
}
383+
384+
// Get publication details
385+
details, err := pg.GetPublicationTablesDetails(ctx, pgDB.Status.Database, currentPublicationName)
386+
if err != nil {
387+
return false, err
388+
}
389+
390+
// Check if we are in the all tables in schema case
391+
if len(instanceSpec.TablesInSchema) != 0 {
392+
// Compute list of schema coming from publication tables and compare list length.
393+
// The computed list must be <= with the desired list
394+
// Why <= ? Because we can list a schema without any tables in
395+
// So we need to check > to go out quickly
396+
// After that, we need to check that computed list is included in the desired list
397+
// Compute list of schema
398+
computedSchemaList := lo.Uniq(lo.Map(details, func(it *postgres.PublicationTableDetail, _ int) string { return it.SchemaName }))
399+
400+
// Check length
401+
if len(computedSchemaList) > len(instanceSpec.TablesInSchema) {
402+
return true, nil
403+
}
404+
405+
// Check include/subset
406+
if !lo.Every(instanceSpec.TablesInSchema, computedSchemaList) {
407+
return true, nil
408+
}
409+
} else {
410+
// Need to check with tables
411+
// Loop over spec table list
412+
for _, st := range instanceSpec.Tables {
413+
// Check if table isn't in the current list
414+
detail, found := lo.Find(details, func(it *postgres.PublicationTableDetail) bool {
415+
return st.TableName == it.TableName || st.TableName == fmt.Sprintf("%s.%s", it.SchemaName, it.TableName)
416+
})
417+
if !found {
418+
return true, nil
419+
}
420+
421+
// Check if additional where aren't identical (nil case)
422+
if st.AdditionalWhere != detail.AdditionalWhere {
423+
return true, nil
424+
}
425+
// Check if additional where aren't identical (phase 2)
426+
if st.AdditionalWhere != nil && detail.AdditionalWhere != nil && fmt.Sprintf("(%s)", *st.AdditionalWhere) != *detail.AdditionalWhere {
427+
return true, nil
428+
}
429+
430+
// Now need to check columns
431+
432+
columnNamesToCheck := []string{}
433+
434+
// Check if columns aren't set in spec
435+
if st.Columns == nil {
436+
// If so, get real columns from table and check if list aren't identical
437+
// Split spec table name
438+
spl := strings.Split(st.TableName, ".")
439+
var schemaName, tableName string
440+
441+
// Check split size
442+
if len(spl) == 1 {
443+
schemaName = defaultPGPublicSchemaName
444+
tableName = spl[0]
445+
} else {
446+
schemaName = spl[0]
447+
tableName = spl[1]
448+
}
449+
450+
columnNamesToCheck, err = pg.GetColumnNamesFromTable(ctx, pgDB.Status.Database, schemaName, tableName)
451+
if err != nil {
452+
return false, err
453+
}
454+
} else {
455+
columnNamesToCheck = *st.Columns
456+
}
457+
458+
// Check difference
459+
r1, r2 := lo.Difference(columnNamesToCheck, detail.Columns)
460+
if len(r1) != 0 || len(r2) != 0 {
461+
return true, nil
462+
}
463+
}
464+
}
465+
466+
// Default
467+
return false, nil
468+
}
469+
319470
func (*PostgresqlPublicationReconciler) manageUpdate(
320471
ctx context.Context,
321472
instance *v1alpha1.PostgresqlPublication,
@@ -350,6 +501,11 @@ func (*PostgresqlPublicationReconciler) manageUpdate(
350501
if instance.Spec.WithParameters != nil {
351502
// Change with
352503
builder = builder.SetWith(instance.Spec.WithParameters.Publish, instance.Spec.WithParameters.PublishViaPartitionRoot)
504+
} else {
505+
// Potential reconcile case to manage
506+
if pubRes.PublicationViaRoot || !pubRes.Delete || !pubRes.Insert || !pubRes.Truncate || !pubRes.Update {
507+
builder = builder.SetWith(DefaultPGWithPublishParameter, &DefaultPGWithPublishViaRootParameter)
508+
}
353509
}
354510

355511
// Perform update

internal/controller/postgresql/postgresqlpublication_controller_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4185,5 +4185,69 @@ var _ = Describe("PostgresqlPublication tests", func() {
41854185
}))
41864186
}
41874187
})
4188+
4189+
It("should be ok to reconcile publication via root changed and not in spec", func() {
4190+
// Setup pgec
4191+
setupPGEC("30s", false)
4192+
// Create pgdb
4193+
pgdb := setupPGDB(false)
4194+
4195+
// Create tables
4196+
err := create2KnownTablesWithColumnsInPublicSchema()
4197+
Expect(err).NotTo(HaveOccurred())
4198+
4199+
// Setup a pg publication
4200+
item := setupPGPublicationWithPartialSpec(postgresqlv1alpha1.PostgresqlPublicationSpec{
4201+
AllTables: true,
4202+
})
4203+
4204+
// Checks
4205+
Expect(item.Status.Ready).To(BeTrue())
4206+
Expect(item.Status.Phase).To(Equal(postgresqlv1alpha1.PublicationCreatedPhase))
4207+
4208+
data, err := getPublication(item.Status.Name)
4209+
Expect(err).NotTo(HaveOccurred())
4210+
4211+
// Alter publication
4212+
err = rawSQLQuery("ALTER PUBLICATION " + pgpublicationPublicationName1 + " SET (publish_via_partition_root=true)")
4213+
Expect(err).NotTo(HaveOccurred())
4214+
4215+
Eventually(
4216+
func() error {
4217+
data, err = getPublication(item.Status.Name)
4218+
// Check error
4219+
if err != nil {
4220+
return err
4221+
}
4222+
4223+
// Check if status hasn't been updated
4224+
if data.PublicationViaRoot {
4225+
return gerrors.New("hasn't been updated by operator")
4226+
}
4227+
4228+
return nil
4229+
},
4230+
generalEventuallyTimeout,
4231+
generalEventuallyInterval,
4232+
).
4233+
Should(Succeed())
4234+
4235+
if Expect(err).NotTo(HaveOccurred()) {
4236+
// Assert
4237+
Expect(data).To(Equal(&PublicationResult{
4238+
Owner: pgdb.Status.Roles.Owner,
4239+
AllTables: true,
4240+
Insert: true,
4241+
Update: true,
4242+
Delete: true,
4243+
Truncate: true,
4244+
PublicationViaRoot: false,
4245+
}))
4246+
}
4247+
})
4248+
4249+
// It("should be ok to reconcile publication via root changed and in spec", func() {})
4250+
// It("should be ok to reconcile with parameter publish and not in spec", func() {})
4251+
// It("should be ok to reconcile with parameter publish and in spec", func() {})
41884252
})
41894253
})

0 commit comments

Comments
 (0)