Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## 3.4.1

### Fixes

- Fix with linter returns
- postgresqlpublication : Fix update & create with parameters builders, add forgotten auto heal on publications
- postgresqlpublication: Add forgotten management of publication ownership
- Remove misleading log
- Change resync default period & reconcile default timeout to avoid postgresql high load
- postgresqldatabase: Avoid some alter if possible

## 3.4.0

### Features
Expand Down
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func main() {

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.StringVar(&resyncPeriodStr, "resync-period", "30s", "The resync period to reload all resources for auto-heal procedures.")
flag.StringVar(&reconcileTimeoutStr, "reconcile-timeout", "5s", "The reconcile max timeout.")
flag.StringVar(&resyncPeriodStr, "resync-period", "60s", "The resync period to reload all resources for auto-heal procedures.")
flag.StringVar(&reconcileTimeoutStr, "reconcile-timeout", "10s", "The reconcile max timeout.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type CreatePublicationBuilder struct {
tablesPart string
allTables string
withPart string
owner string
tables []string
schemaList []string
}
Expand Down Expand Up @@ -79,6 +80,12 @@ func (b *CreatePublicationBuilder) SetForAllTables() *CreatePublicationBuilder {
return b
}

func (b *CreatePublicationBuilder) SetOwner(n string) *CreatePublicationBuilder {
b.owner = n

return b
}

func (b *CreatePublicationBuilder) SetName(n string) *CreatePublicationBuilder {
b.name = n

Expand Down Expand Up @@ -106,8 +113,11 @@ func (b *CreatePublicationBuilder) SetWith(publish string, publishViaPartitionRo
}
}

// Save
b.withPart = fmt.Sprintf("WITH (%s)", with)
// Check if there isn't something
if with != "" {
// Save
b.withPart = fmt.Sprintf("WITH (%s)", with)
}

return b
}
168 changes: 160 additions & 8 deletions internal/controller/postgresql/postgres/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package postgres

import (
"context"
"errors"
"fmt"

"github.com/lib/pq"
Expand All @@ -12,19 +13,22 @@ const (
RestrictKeyword = "RESTRICT"
CreateDBSQLTemplate = `CREATE DATABASE "%s" WITH OWNER = "%s"`
ChangeDBOwnerSQLTemplate = `ALTER DATABASE "%s" OWNER TO "%s"`
IsDatabaseExistSQLTemplate = `SELECT 1 FROM pg_database WHERE datname='%s'`
GetDatabaseOwnerSQLTemplate = `SELECT pg_catalog.pg_get_userbyid(datdba) as owner FROM pg_database WHERE datname='%s'`
RenameDatabaseSQLTemplate = `ALTER DATABASE "%s" RENAME TO "%s"`
CreateSchemaSQLTemplate = `CREATE SCHEMA IF NOT EXISTS "%s" AUTHORIZATION "%s"`
CreateExtensionSQLTemplate = `CREATE EXTENSION IF NOT EXISTS "%s"`
DropDatabaseSQLTemplate = `DROP DATABASE "%s"`
GetExtensionListSQLTemplate = `SELECT extname FROM pg_extension;`
DropExtensionSQLTemplate = `DROP EXTENSION IF EXISTS "%s" %s`
GetSchemaListSQLTemplate = `SELECT schema_name FROM information_schema.schemata`
DropSchemaSQLTemplate = `DROP SCHEMA IF EXISTS "%s" %s`
GrantUsageSchemaSQLTemplate = `GRANT USAGE ON SCHEMA "%s" TO "%s"`
GrantAllTablesSQLTemplate = `GRANT %s ON ALL TABLES IN SCHEMA "%s" TO "%s"`
DefaultPrivsSchemaSQLTemplate = `ALTER DEFAULT PRIVILEGES FOR ROLE "%s" IN SCHEMA "%s" GRANT %s ON TABLES TO "%s"`
GetTablesFromSchemaSQLTemplate = `SELECT tablename,tableowner FROM pg_tables WHERE schemaname = '%s'`
ChangeTableOwnerSQLTemplate = `ALTER TABLE IF EXISTS "%s" OWNER TO "%s"`
ChangeTypeOwnerSQLTemplate = `ALTER TYPE "%s"."%s" OWNER TO "%s"`
GetColumnsFromTableSQLTemplate = `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
// Got and edited from : https://stackoverflow.com/questions/3660787/how-to-list-custom-types-using-postgres-information-schema
GetTypesFromSchemaSQLTemplate = `SELECT t.typname as type, pg_catalog.pg_get_userbyid(t.typowner) as owner
FROM pg_type t
Expand All @@ -35,23 +39,97 @@ AND n.nspname = '%s';`
DuplicateDatabaseErrorCode = "42P04"
)

func (c *pg) IsDatabaseExist(ctx context.Context, dbname string) (bool, error) {
func (c *pg) GetColumnNamesFromTable(ctx context.Context, database string, schemaName string, tableName string) ([]string, error) {
err := c.connect(database)
if err != nil {
return nil, err
}

rows, err := c.db.QueryContext(ctx, fmt.Sprintf(GetColumnsFromTableSQLTemplate, schemaName, tableName))
if err != nil {
return nil, err
}

defer rows.Close()

res := []string{}

for rows.Next() {
it := ""
// Scan
err = rows.Scan(&it)
// Check error
if err != nil {
return nil, err
}
// Save
res = append(res, it)
}

// Rows error
err = rows.Err()
// Check error
if err != nil {
return nil, err
}

return res, nil
}

func (c *pg) GetDatabaseOwner(ctx context.Context, dbname string) (string, error) {
err := c.connect(c.defaultDatabase)
if err != nil {
return false, err
return "", err
}

res, err := c.db.ExecContext(ctx, fmt.Sprintf(IsDatabaseExistSQLTemplate, dbname))
rows, err := c.db.QueryContext(ctx, fmt.Sprintf(GetDatabaseOwnerSQLTemplate, dbname))
if err != nil {
return false, err
return "", err
}

defer rows.Close()

res := []string{}

for rows.Next() {
it := ""
// Scan
err = rows.Scan(&it)
// Check error
if err != nil {
return "", err
}
// Save
res = append(res, it)
}
// Get affected rows
nb, err := res.RowsAffected()

// Rows error
err = rows.Err()
// Check error
if err != nil {
return "", err
}

if len(res) != 1 {
return "", errors.New("select on database mustn't give more than one result. there is a severe issue somewhere")
}

// Check length
if len(res) == 0 {
return "", nil
}

return res[0], nil
}

func (c *pg) IsDatabaseExist(ctx context.Context, dbname string) (bool, error) {
o, err := c.GetDatabaseOwner(ctx, dbname)
// Check error
if err != nil {
return false, err
}

return nb == 1, nil
return o != "", nil
}

func (c *pg) RenameDatabase(ctx context.Context, oldname, newname string) error {
Expand Down Expand Up @@ -280,6 +358,80 @@ func (c *pg) DropSchema(ctx context.Context, database, schema string, cascade bo
return nil
}

func (c *pg) ListSchema(ctx context.Context, database string) ([]string, error) {
err := c.connect(database)
if err != nil {
return nil, err
}

rows, err := c.db.QueryContext(ctx, GetSchemaListSQLTemplate)
if err != nil {
return nil, err
}

defer rows.Close()

res := []string{}

for rows.Next() {
it := ""
// Scan
err = rows.Scan(&it)
// Check error
if err != nil {
return nil, err
}
// Save
res = append(res, it)
}

// Rows error
err = rows.Err()
// Check error
if err != nil {
return nil, err
}

return res, nil
}

func (c *pg) ListExtensions(ctx context.Context, database string) ([]string, error) {
err := c.connect(database)
if err != nil {
return nil, err
}

rows, err := c.db.QueryContext(ctx, GetExtensionListSQLTemplate)
if err != nil {
return nil, err
}

defer rows.Close()

res := []string{}

for rows.Next() {
it := ""
// Scan
err = rows.Scan(&it)
// Check error
if err != nil {
return nil, err
}
// Save
res = append(res, it)
}

// Rows error
err = rows.Err()
// Check error
if err != nil {
return nil, err
}

return res, nil
}

func (c *pg) CreateExtension(ctx context.Context, db, extension string) error {
err := c.connect(db)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/postgresql/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type TypeOwnership struct {

type PG interface { //nolint:interfacebloat // This is needed
CreateDB(ctx context.Context, dbname, username string) error
GetDatabaseOwner(ctx context.Context, dbname string) (string, error)
ChangeDBOwner(ctx context.Context, dbname, owner string) error
IsDatabaseExist(ctx context.Context, dbname string) (bool, error)
RenameDatabase(ctx context.Context, oldname, newname string) error
Expand All @@ -53,6 +54,8 @@ type PG interface { //nolint:interfacebloat // This is needed
GetSetRoleOnDatabasesRoleSettings(ctx context.Context, role string) ([]*SetRoleOnDatabaseRoleSetting, error)
DropRole(ctx context.Context, role string) error
DropSchema(ctx context.Context, database, schema string, cascade bool) error
ListSchema(ctx context.Context, database string) ([]string, error)
ListExtensions(ctx context.Context, database string) ([]string, error)
DropExtension(ctx context.Context, database, extension string, cascade bool) error
GetRoleMembership(ctx context.Context, role string) ([]string, error)
GetTablesInSchema(ctx context.Context, db, schema string) ([]*TableOwnership, error)
Expand All @@ -64,9 +67,12 @@ type PG interface { //nolint:interfacebloat // This is needed
GetPublication(ctx context.Context, dbname, name string) (*PublicationResult, error)
CreatePublication(ctx context.Context, dbname string, builder *CreatePublicationBuilder) error
UpdatePublication(ctx context.Context, dbname, publicationName string, builder *UpdatePublicationBuilder) error
ChangePublicationOwner(ctx context.Context, dbname string, publicationName string, owner string) error
GetPublicationTablesDetails(ctx context.Context, db, publicationName string) ([]*PublicationTableDetail, error)
DropReplicationSlot(ctx context.Context, name string) error
CreateReplicationSlot(ctx context.Context, dbname, name, plugin string) error
GetReplicationSlot(ctx context.Context, name string) (*ReplicationSlotResult, error)
GetColumnNamesFromTable(ctx context.Context, database string, schemaName string, tableName string) ([]string, error)
GetUser() string
GetHost() string
GetPort() int
Expand Down
Loading
Loading