Skip to content

Commit 3f77911

Browse files
committed
feat: wip
1 parent 4b1dbf3 commit 3f77911

File tree

8 files changed

+4712
-3924
lines changed

8 files changed

+4712
-3924
lines changed

internal/controller/postgresql/postgres/create-publication-builder.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ func (b *CreatePublicationBuilder) SetWith(publish string, publishViaPartitionRo
113113
}
114114
}
115115

116-
// Save
117-
b.withPart = fmt.Sprintf("WITH (%s)", with)
116+
// Check if there isn't something
117+
if with != "" {
118+
// Save
119+
b.withPart = fmt.Sprintf("WITH (%s)", with)
120+
}
118121

119122
return b
120123
}

internal/controller/postgresql/postgres/database.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
GetTablesFromSchemaSQLTemplate = `SELECT tablename,tableowner FROM pg_tables WHERE schemaname = '%s'`
2929
ChangeTableOwnerSQLTemplate = `ALTER TABLE IF EXISTS "%s" OWNER TO "%s"`
3030
ChangeTypeOwnerSQLTemplate = `ALTER TYPE "%s"."%s" OWNER TO "%s"`
31+
GetColumnsFromTableSQLTemplate = `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
3132
// Got and edited from : https://stackoverflow.com/questions/3660787/how-to-list-custom-types-using-postgres-information-schema
3233
GetTypesFromSchemaSQLTemplate = `SELECT t.typname as type, pg_catalog.pg_get_userbyid(t.typowner) as owner
3334
FROM pg_type t
@@ -38,6 +39,43 @@ AND n.nspname = '%s';`
3839
DuplicateDatabaseErrorCode = "42P04"
3940
)
4041

42+
func (c *pg) GetColumnNamesFromTable(ctx context.Context, database string, schemaName string, tableName string) ([]string, error) {
43+
err := c.connect(database)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
rows, err := c.db.QueryContext(ctx, fmt.Sprintf(GetColumnsFromTableSQLTemplate, schemaName, tableName))
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
defer rows.Close()
54+
55+
res := []string{}
56+
57+
for rows.Next() {
58+
it := ""
59+
// Scan
60+
err = rows.Scan(&it)
61+
// Check error
62+
if err != nil {
63+
return nil, err
64+
}
65+
// Save
66+
res = append(res, it)
67+
}
68+
69+
// Rows error
70+
err = rows.Err()
71+
// Check error
72+
if err != nil {
73+
return nil, err
74+
}
75+
76+
return res, nil
77+
}
78+
4179
func (c *pg) GetDatabaseOwner(ctx context.Context, dbname string) (string, error) {
4280
err := c.connect(c.defaultDatabase)
4381
if err != nil {

internal/controller/postgresql/postgres/postgres.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,11 @@ type PG interface { //nolint:interfacebloat // This is needed
6868
CreatePublication(ctx context.Context, dbname string, builder *CreatePublicationBuilder) error
6969
UpdatePublication(ctx context.Context, dbname, publicationName string, builder *UpdatePublicationBuilder) error
7070
ChangePublicationOwner(ctx context.Context, dbname string, publicationName string, owner string) error
71+
GetPublicationTablesDetails(ctx context.Context, db, publicationName string) ([]*PublicationTableDetail, error)
7172
DropReplicationSlot(ctx context.Context, name string) error
7273
CreateReplicationSlot(ctx context.Context, dbname, name, plugin string) error
7374
GetReplicationSlot(ctx context.Context, name string) (*ReplicationSlotResult, error)
75+
GetColumnNamesFromTable(ctx context.Context, database string, schemaName string, tableName string) ([]string, error)
7476
GetUser() string
7577
GetHost() string
7678
GetPort() int

internal/controller/postgresql/postgres/publication.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
pg_catalog.pg_get_userbyid(pubowner), puballtables, pubinsert, pubupdate, pubdelete, pubtruncate, pubviaroot
1919
FROM pg_catalog.pg_publication
2020
WHERE pubname = '%s';`
21+
GetPublicationTablesSQLTemplate = `SELECT schemaname, tablename, attnames, rowfilter FROM pg_publication_tables WHERE pubname = '%s'`
2122
GetReplicationSlotSQLTemplate = `SELECT slot_name,plugin,database FROM pg_replication_slots WHERE slot_name = '%s'`
2223
CreateReplicationSlotSQLTemplate = `SELECT pg_create_logical_replication_slot('%s', '%s')`
2324
DropReplicationSlotSQLTemplate = `SELECT pg_drop_replication_slot('%s')`
@@ -46,6 +47,48 @@ type ReplicationSlotResult struct {
4647
Database string
4748
}
4849

50+
func (c *pg) GetPublicationTablesDetails(ctx context.Context, db, publicationName string) ([]*PublicationTableDetail, error) {
51+
err := c.connect(db)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
rows, err := c.db.QueryContext(ctx, fmt.Sprintf(GetPublicationTablesSQLTemplate, publicationName))
57+
if err != nil {
58+
return nil, err
59+
}
60+
61+
defer rows.Close()
62+
63+
res := []*PublicationTableDetail{}
64+
65+
for rows.Next() {
66+
var it PublicationTableDetail
67+
var pqSA pq.StringArray
68+
// Scan
69+
err = rows.Scan(&it.SchemaName, &it.TableName, &pqSA, &it.AdditionalWhere)
70+
// Check error
71+
if err != nil {
72+
return nil, err
73+
}
74+
// Save
75+
// ? Note: getting a list of string from pg imply a decode
76+
// ? See issue: https://github.com/cockroachdb/cockroach/issues/39770#issuecomment-576170805
77+
it.Columns = pqSA
78+
// Save
79+
res = append(res, &it)
80+
}
81+
82+
// Rows error
83+
err = rows.Err()
84+
// Check error
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
return res, nil
90+
}
91+
4992
func (c *pg) DropReplicationSlot(ctx context.Context, name string) error {
5093
err := c.connect(c.defaultDatabase)
5194
if err != nil {

internal/controller/postgresql/postgres/update-publication-builder.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ func (b *UpdatePublicationBuilder) SetWith(publish string, publishViaPartitionRo
7777
// Check if publish is set
7878
if publish != "" {
7979
with += "publish = '" + publish + "'"
80+
} else {
81+
// Set default for reconcile cases
82+
with += "publish = 'insert, update, delete, truncate'"
8083
}
84+
8185
// Check publish via partition root
8286
if publishViaPartitionRoot != nil {
8387
// Check if there is already a with set
@@ -91,10 +95,25 @@ func (b *UpdatePublicationBuilder) SetWith(publish string, publishViaPartitionRo
9195
} else {
9296
with += "false"
9397
}
98+
} else {
99+
// Check if there is already a with set
100+
if with != "" {
101+
with += ", "
102+
}
103+
// Set default for reconcile cases
104+
with += "publish_via_partition_root = false"
94105
}
95106

96107
// Save
97108
b.withPart = fmt.Sprintf(" (%s)", with)
98109

99110
return b
100111
}
112+
113+
func (b *UpdatePublicationBuilder) SetDefaultWith() *UpdatePublicationBuilder {
114+
fV := false
115+
// Call other method without parameters to inject default values
116+
b.SetWith("", &fV)
117+
118+
return b
119+
}

internal/controller/postgresql/postgresqlpublication_controller.go

Lines changed: 183 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package postgresql
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"reflect"
23+
"strings"
2224
"time"
2325

2426
"k8s.io/apimachinery/pkg/api/errors"
@@ -257,6 +259,20 @@ func (r *PostgresqlPublicationReconciler) mainReconcile(
257259
if err != nil {
258260
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
259261
}
262+
} else {
263+
// Check if reconcile from PG state is necessary because spec haven't been changed
264+
need, err := r.isReconcileOnPGNecessary(ctx, instance, pg, pgDB, pubRes, nameToSearch)
265+
// Check error
266+
if err != nil {
267+
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
268+
}
269+
270+
// Check if it is needed
271+
if need {
272+
reqLogger.Info("PG state have been changed but not via operator, update need to be done")
273+
274+
err = r.manageUpdate(ctx, instance, pg, pgDB, pubRes, nameToSearch)
275+
}
260276
}
261277

262278
// Check if owner are aligned
@@ -316,6 +332,167 @@ func (r *PostgresqlPublicationReconciler) mainReconcile(
316332
return r.manageSuccess(ctx, reqLogger, instance, originalPatch)
317333
}
318334

335+
func (*PostgresqlPublicationReconciler) isReconcileOnPGNecessary(
336+
ctx context.Context,
337+
instance *v1alpha1.PostgresqlPublication,
338+
pg postgres.PG,
339+
pgDB *v1alpha1.PostgresqlDatabase,
340+
pubRes *postgres.PublicationResult,
341+
currentPublicationName string,
342+
) (bool, error) {
343+
instanceSpec := instance.Spec
344+
345+
// Check with parameters
346+
// nil spec case
347+
if instanceSpec.WithParameters == nil && (pubRes.PublicationViaRoot || !pubRes.Delete || !pubRes.Insert || !pubRes.Truncate || !pubRes.Update) {
348+
return true, nil
349+
}
350+
// Non nil spec case
351+
if instanceSpec.WithParameters != nil {
352+
// publication via root check
353+
if (instanceSpec.WithParameters.PublishViaPartitionRoot == nil && pubRes.PublicationViaRoot) ||
354+
(instanceSpec.WithParameters.PublishViaPartitionRoot != nil && *instanceSpec.WithParameters.PublishViaPartitionRoot != pubRes.PublicationViaRoot) {
355+
return true, nil
356+
}
357+
358+
// Now check publish parameters
359+
// Save
360+
publish := strings.ToLower(instanceSpec.WithParameters.Publish)
361+
// Empty spec case
362+
if publish == "" && (!pubRes.Delete || !pubRes.Insert || !pubRes.Truncate || !pubRes.Update) {
363+
return true, nil
364+
}
365+
// Not empty case
366+
if publish != "" &&
367+
(strings.Contains(publish, "insert") != pubRes.Insert ||
368+
strings.Contains(publish, "update") != pubRes.Update ||
369+
strings.Contains(publish, "delete") != pubRes.Delete ||
370+
strings.Contains(publish, "truncate") != pubRes.Truncate) {
371+
return true, nil
372+
}
373+
}
374+
375+
// Check if it is a all tables
376+
if instanceSpec.AllTables {
377+
// This state cannot be updated so.. Ignoring it
378+
return false, nil
379+
}
380+
381+
// Get publication details
382+
details, err := pg.GetPublicationTablesDetails(ctx, pgDB.Status.Database, currentPublicationName)
383+
if err != nil {
384+
return false, err
385+
}
386+
387+
// Check if we are in the all tables in schema case
388+
if len(instanceSpec.TablesInSchema) != 0 {
389+
// Compute list of schema coming from publication tables and compare list length.
390+
// The computed list must be <= with the desired list
391+
// Why <= ? Because we can list a schema without any tables in
392+
// So we need to check > to go out quickly
393+
// After that, we need to check that computed list is included in the desired list
394+
// Finally, check that all tables from all schema are found
395+
396+
// Compute list of schema
397+
computedSchemaList := []string{}
398+
currentTableNames := []string{}
399+
400+
for _, it := range details {
401+
if !lo.Contains(computedSchemaList, it.SchemaName) {
402+
computedSchemaList = append(computedSchemaList, it.SchemaName)
403+
}
404+
if !lo.Contains(currentTableNames, it.TableName) {
405+
currentTableNames = append(currentTableNames, it.TableName)
406+
}
407+
}
408+
409+
// Check length
410+
if len(computedSchemaList) > len(instanceSpec.TablesInSchema) {
411+
return true, nil
412+
}
413+
414+
// Check include/subset
415+
if !lo.Every(instanceSpec.TablesInSchema, computedSchemaList) {
416+
return true, nil
417+
}
418+
419+
// Loop over all schema listed
420+
for _, sch := range instanceSpec.TablesInSchema {
421+
// Get all tables in this schema
422+
tableDetails, err := pg.GetTablesInSchema(ctx, pgDB.Status.Database, sch)
423+
if err != nil {
424+
return false, err
425+
}
426+
427+
// Transform in string slice
428+
allTableNamesInSchema := lo.Map(tableDetails, func(it *postgres.TableOwnership, _ int) string { return it.TableName })
429+
// Now check differences
430+
r1, r2 := lo.Difference(allTableNamesInSchema, currentTableNames)
431+
if len(r1) != 0 || len(r2) != 0 {
432+
return true, nil
433+
}
434+
}
435+
} else {
436+
// Need to check with tables
437+
// Loop over spec table list
438+
for _, st := range instanceSpec.Tables {
439+
// Check if table isn't in the current list
440+
detail, found := lo.Find(details, func(it *postgres.PublicationTableDetail) bool {
441+
return st.TableName == it.TableName || st.TableName == fmt.Sprintf("%s.%s", it.SchemaName, it.TableName)
442+
})
443+
if !found {
444+
return true, nil
445+
}
446+
447+
// Check if additional where aren't identical (nil case)
448+
if st.AdditionalWhere != detail.AdditionalWhere {
449+
return true, nil
450+
}
451+
// Check if additional where aren't identical (phase 2)
452+
if st.AdditionalWhere != nil && detail.AdditionalWhere != nil && fmt.Sprintf("(%s)", *st.AdditionalWhere) != *detail.AdditionalWhere {
453+
return true, nil
454+
}
455+
456+
// Now need to check columns
457+
458+
columnNamesToCheck := []string{}
459+
460+
// Check if columns aren't set in spec
461+
if st.Columns == nil {
462+
// If so, get real columns from table and check if list aren't identical
463+
// Split spec table name
464+
spl := strings.Split(st.TableName, ".")
465+
var schemaName, tableName string
466+
467+
// Check split size
468+
if len(spl) == 1 {
469+
schemaName = defaultPGPublicSchemaName
470+
tableName = spl[0]
471+
} else {
472+
schemaName = spl[0]
473+
tableName = spl[1]
474+
}
475+
476+
columnNamesToCheck, err = pg.GetColumnNamesFromTable(ctx, pgDB.Status.Database, schemaName, tableName)
477+
if err != nil {
478+
return false, err
479+
}
480+
} else {
481+
columnNamesToCheck = *st.Columns
482+
}
483+
484+
// Check difference
485+
r1, r2 := lo.Difference(columnNamesToCheck, detail.Columns)
486+
if len(r1) != 0 || len(r2) != 0 {
487+
return true, nil
488+
}
489+
}
490+
}
491+
492+
// Default
493+
return false, nil
494+
}
495+
319496
func (*PostgresqlPublicationReconciler) manageUpdate(
320497
ctx context.Context,
321498
instance *v1alpha1.PostgresqlPublication,
@@ -350,11 +527,15 @@ func (*PostgresqlPublicationReconciler) manageUpdate(
350527
if instance.Spec.WithParameters != nil {
351528
// Change with
352529
builder = builder.SetWith(instance.Spec.WithParameters.Publish, instance.Spec.WithParameters.PublishViaPartitionRoot)
530+
} else {
531+
// Potential reconcile case to manage
532+
if pubRes.PublicationViaRoot || !pubRes.Delete || !pubRes.Insert || !pubRes.Truncate || !pubRes.Update {
533+
// Set default
534+
builder = builder.SetDefaultWith()
535+
}
353536
}
354537

355538
// Perform update
356-
// ? Note: this will do an alter even if it is unnecessary
357-
// ? Detecting real diff will be long and painful, perform an alter with what is asked will ensure that nothing can be changed
358539
err := pg.UpdatePublication(ctx, pgDB.Status.Database, currentPublicationName, builder)
359540
// Check error
360541
if err != nil {

0 commit comments

Comments
 (0)